[GitHub] [kafka] kkonstantine commented on pull request #11309: MINOR: Remove unsupported rsync and ssh commands from release.py
kkonstantine commented on pull request #11309: URL: https://github.com/apache/kafka/pull/11309#issuecomment-915783471 Created https://issues.apache.org/jira/browse/KAFKA-13284 to track. Merged to trunk, 3.0, 2.8, 2.7 and 2.6 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13284) Use sftp protocol in release.py to upload release candidate artifacts
Konstantine Karantasis created KAFKA-13284: -- Summary: Use sftp protocol in release.py to upload release candidate artifacts Key: KAFKA-13284 URL: https://issues.apache.org/jira/browse/KAFKA-13284 Project: Kafka Issue Type: Improvement Reporter: Konstantine Karantasis Fix For: 3.1.0 {{home.apache.org}} has restricted access recently to {{sftp}} only. This prevents {{release.py}} from uploading a single archive with the artifacts of a release candidate using {{rsync}} and then unpacking the archive with {{ssh}} The script could be changed to mirror the contents and upload / delete files individually using the {{sftp}} protocol. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] satishd commented on a change in pull request #11033: KAFKA-12988 Asynchronous API support for RemoteLogMetadataManager add/update methods.
satishd commented on a change in pull request #11033: URL: https://github.com/apache/kafka/pull/11033#discussion_r704956559 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java ## @@ -129,37 +132,44 @@ public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate segmen } // Publish the message to the topic. - doPublishMetadata(segmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(), segmentMetadataUpdate); +return doPublishMetadata(segmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(), segmentMetadataUpdate); } finally { lock.readLock().unlock(); } } @Override -public void putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) +public CompletableFuture putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) throws RemoteStorageException { Objects.requireNonNull(remotePartitionDeleteMetadata, "remotePartitionDeleteMetadata can not be null"); lock.readLock().lock(); try { ensureInitializedAndNotClosed(); - doPublishMetadata(remotePartitionDeleteMetadata.topicIdPartition(), remotePartitionDeleteMetadata); +return doPublishMetadata(remotePartitionDeleteMetadata.topicIdPartition(), remotePartitionDeleteMetadata); } finally { lock.readLock().unlock(); } } -private void doPublishMetadata(TopicIdPartition topicIdPartition, RemoteLogMetadata remoteLogMetadata) +private CompletableFuture doPublishMetadata(TopicIdPartition topicIdPartition, + RemoteLogMetadata remoteLogMetadata) throws RemoteStorageException { log.debug("Publishing metadata for partition: [{}] with context: [{}]", topicIdPartition, remoteLogMetadata); try { // Publish the message to the topic. -RecordMetadata recordMetadata = producerManager.publishMessage(remoteLogMetadata); -// Wait until the consumer catches up with this offset. This will ensure read-after-write consistency -// semantics. -consumerManager.waitTillConsumptionCatchesUp(recordMetadata); +CompletableFuture produceFuture = new CompletableFuture<>(); +producerManager.publishMessage(remoteLogMetadata, produceFuture); +return produceFuture.thenApplyAsync((Function) recordMetadata -> { +try { + consumerManager.waitTillConsumptionCatchesUp(recordMetadata); Review comment: `produceFuture` is completed after `ProducerRecordMetadata` is completed. But `doPublishMetadata` takes `produceFuture` and composes with `.thenApplyAsync()` and returns the `CompletableFuture` which will be completed only after the `consumerManager.waitTillConsumptionCatchesUp(recordMetadata);` is returned. So, the returned `CompletableFuture` from `doPublishMetadata` is completed only after the consumer is caughtup until the produced record offset. I will document it in the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #11033: KAFKA-12988 Asynchronous API support for RemoteLogMetadataManager add/update methods.
satishd commented on a change in pull request #11033: URL: https://github.com/apache/kafka/pull/11033#discussion_r704953596 ## File path: storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java ## @@ -62,16 +63,17 @@ * @param remoteLogSegmentMetadata metadata about the remote log segment. * @throws RemoteStorageException if there are any storage related errors occurred. * @throws IllegalArgumentException if the given metadata instance does not have the state as {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED} + * @return a CompletableFuture which will complete once this operation is finished. */ -void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException; +CompletableFuture addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException; /** - * This method is used to update the {@link RemoteLogSegmentMetadata}. Currently, it allows to update with the new + * This method is used to update the {@link RemoteLogSegmentMetadata} asynchronously. Currently, it allows to update with the new * state based on the life cycle of the segment. It can go through the below state transitions. * * * +-++--+ - * |COPY_SEGMENT_STARTED |---|COPY_SEGMENT_FINISHED | + * |COPY_SEGMENT_STARTED |--->|COPY_SEGMENT_FINISHED | Review comment: I verified that the correct HTML is generated in the javadoc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11272: KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides
ableegoldman commented on a change in pull request #11272: URL: https://github.com/apache/kafka/pull/11272#discussion_r704916241 ## File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java ## @@ -611,6 +611,7 @@ public synchronized Topology build() { */ public synchronized Topology build(final Properties props) { internalStreamsBuilder.buildAndOptimizeTopology(props); +internalTopologyBuilder.setTopologyProperties(props); Review comment: To clarify, these are the props that the user passes in when building the topology, eg needed for `TOPOLOGY_OPTIMIZATION` which is the only actual topology-level override today. So the overrides are only set the same as the global props if the user decides to pass the same set of configs in. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11272: KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides
ableegoldman commented on a change in pull request #11272: URL: https://github.com/apache/kafka/pull/11272#discussion_r704909650 ## File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java ## @@ -611,6 +611,7 @@ public synchronized Topology build() { */ public synchronized Topology build(final Properties props) { internalStreamsBuilder.buildAndOptimizeTopology(props); +internalTopologyBuilder.setTopologyProperties(props); Review comment: Like this https://github.com/apache/kafka/pull/11272/files#diff-0e5e608831150c058e2ad1b45d38ad941739562588ec0fdb97cc9f742919fb1fR139 ? Or were you referring to something else -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11272: KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides
ableegoldman commented on a change in pull request #11272: URL: https://github.com/apache/kafka/pull/11272#discussion_r704909244 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java ## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.namedtopology; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.DeserializationExceptionHandler; +import org.apache.kafka.streams.processor.TimestampExtractor; +import org.apache.kafka.streams.processor.internals.StreamThread; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Properties; +import java.util.function.Supplier; + +import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_DOC; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC; +import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_DOC; +import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_DOC; + +/** + * Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the + * {@link org.apache.kafka.streams.KafkaStreams} or {@link KafkaStreamsNamedTopologyWrapper} constructors will + * determine the defaults, which can then be overridden for specific topologies by passing them in when creating the + * topology via the {@link org.apache.kafka.streams.StreamsBuilder#build(Properties)} or + * {@link NamedTopologyStreamsBuilder#buildNamedTopology(Properties)} methods. + */ +public class TopologyConfig extends AbstractConfig { Review comment: I guess, but honestly for now it's going to be on us/me to keep track of any new configs that could be topology overrides. It's not a problem if someone introduces a config that could be scoped to a single topology and isn't, it's just a feature we can expand later. We need to work out a clean API before taking this to the KIP stage anyways -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11272: KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides
ableegoldman commented on a change in pull request #11272: URL: https://github.com/apache/kafka/pull/11272#discussion_r704907727 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java ## @@ -262,7 +262,7 @@ private StreamTask createActiveTask(final TaskId taskId, inputPartitions, topology, consumer, -config, +topologyMetadata.getTaskConfigFor(taskId), Review comment: Can you clarify? Not sure if this is what you meant, but this should return the actual task configs, with any overrides already applied. I tried to keep all the override & config resolution logic inside the TopologyConfig class -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11272: KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides
ableegoldman commented on a change in pull request #11272: URL: https://github.com/apache/kafka/pull/11272#discussion_r704907727 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java ## @@ -262,7 +262,7 @@ private StreamTask createActiveTask(final TaskId taskId, inputPartitions, topology, consumer, -config, +topologyMetadata.getTaskConfigFor(taskId), Review comment: Sorry can you clarify? This should return the actual task configs, with any overrides already applied. I tried to keep all the override & config resolution logic inside the TopologyConfig class -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon edited a comment on pull request #11292: KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order
showuon edited a comment on pull request #11292: URL: https://github.com/apache/kafka/pull/11292#issuecomment-915712946 @guozhangwang , those are good questions. Let me answer them below: > 1. Do you know why the original test cases in AbstractWindowBytesStoreTest, like `shouldGetBackwardAll` and `testBackwardFetchRange` did not capture this bug? This test class is leveraged by the in-memory stores as well. That's right, those tests also tested in-memory stores, but it didn't test multiple records in the same window cases. Currently, in Window store, we store records in [segments -> [records] ]. For example: window size = 500, input records: key: "a", value: "aa", timestamp: 0 ==> will be in [0, 500] window key: "b", value: "bb", timestamp: 10 ==> will be in [0, 500] window key: "c", value: "cc", timestamp: 510 ==> will be in [500, 1000] window So, internally, the "a" and "b" will be in the same segment, and "c" in another segments. segments: [0 /* window start */, records], [500, records]. And the records for window start 0 will be "a" and "b". the records for window start 500 will be "c". Before this change, we did have a reverse iterator for segments, but not in "records". So, when doing `backwardFetchAll`, we'll have the records returned in order: "c", "a", "b", which should be "c", "b", "a" obviously. So, back to the question: why did the original test cases not catch this issue? It's because the test input are all in different window start timestamp, which will have different different segments: ``` private void putFirstBatch(final WindowStore store, @SuppressWarnings("SameParameterValue") final long startTime, final InternalMockProcessorContext context) { context.setRecordContext(createRecordContext(startTime)); store.put(0, "zero", startTime); store.put(1, "one", startTime + 1L); store.put(2, "two", startTime + 2L); store.put(3, "three", startTime + 2L); // <-- this is the new record I added, to test multiple records in the same segment case store.put(4, "four", startTime + 4L); store.put(5, "five", startTime + 5L); } ``` > 2. Related to 1), what additional coverage does the new `WindowStoreFetchTest` provides in addition to the above two test cases? I think I've added above. I added an additional record for `AbstractWindowBytesStoreTest` test. In `WindowStoreFetchTest`, we will produce records in timestamp:0, 1, 500, 501, 502, which will be put into window: [0, 500] * 2 and [500, 1000] * 3. And we try to fetch them forward/backward, to see if the results are as expected, i.e.: in reverse order should be 502, 501, 500, 1, 0. The behavior works as expected in `RocksDBWindowStore`. Hope that's clear. I also updated in the PR description. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11272: KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides
ableegoldman commented on a change in pull request #11272: URL: https://github.com/apache/kafka/pull/11272#discussion_r704906629 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java ## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.namedtopology; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.DeserializationExceptionHandler; +import org.apache.kafka.streams.processor.TimestampExtractor; +import org.apache.kafka.streams.processor.internals.StreamThread; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Properties; +import java.util.function.Supplier; + +import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_DOC; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC; +import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_DOC; +import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_DOC; + +/** + * Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the + * {@link org.apache.kafka.streams.KafkaStreams} or {@link KafkaStreamsNamedTopologyWrapper} constructors will + * determine the defaults, which can then be overridden for specific topologies by passing them in when creating the + * topology via the {@link org.apache.kafka.streams.StreamsBuilder#build(Properties)} or + * {@link NamedTopologyStreamsBuilder#buildNamedTopology(Properties)} methods. + */ +public class TopologyConfig extends AbstractConfig { +private static final ConfigDef CONFIG; +static { +CONFIG = new ConfigDef() + .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG, + Type.INT, + null, + Importance.LOW, + BUFFERED_RECORDS_PER_PARTITION_DOC) +.define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, +Type.CLASS, +null, +Importance.MEDIUM, +DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC) + .define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, + Type.CLASS, + null, + Importance.MEDIUM, + DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC) + .define(MAX_TASK_IDLE_MS_CONFIG, + Type.LONG, + null, + Importance.MEDIUM, + MAX_TASK_IDLE_MS_DOC) + .define(TASK_TIMEOUT_MS_CONFIG, + Type.LONG, + null, + Importance.MEDIUM, + TASK_TIMEOUT_MS_DOC); +} +private final Logger log = LoggerFactory.getLogger(TopologyConfig.class); + +public final String topologyName; +public final boolean eosEnabled; + +final long maxTaskIdleMs; +final long taskTimeoutMs; +final int maxBufferedSize; +final Supplier timestampExtractorSupplier; +final Supplier deserializationExceptionHandlerSupplier; + +public TopologyConfig(final String topologyName, final StreamsConfig globalAppConfigs, final Properties topologyOverrides) { +super(CONFIG, topologyOverrides, false); + +
[jira] [Created] (KAFKA-13283) Migrate experimental feature to public API
A. Sophie Blee-Goldman created KAFKA-13283: -- Summary: Migrate experimental feature to public API Key: KAFKA-13283 URL: https://issues.apache.org/jira/browse/KAFKA-13283 Project: Kafka Issue Type: Sub-task Components: streams Reporter: A. Sophie Blee-Goldman -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon commented on pull request #11292: KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order
showuon commented on pull request #11292: URL: https://github.com/apache/kafka/pull/11292#issuecomment-915712946 @guozhangwang , those are good questions. Let me answer them below: > 1. Do you know why the original test cases in AbstractWindowBytesStoreTest, like `shouldGetBackwardAll` and `testBackwardFetchRange` did not capture this bug? This test class is leveraged by the in-memory stores as well. That's right, those tests also tested in-memory stores, but it didn't test multiple records in the same window cases. Currently, in Window store, we store records in [segments -> [records] ]. For example: window size = 500, input records: key: "a", value: "aa", timestamp: 0 ==> will be in [0, 500] window key: "b", value: "bb", timestamp: 10 ==> will be in [0, 500] window key: "c", value: "cc", timestamp: 510 ==> will be in [500, 1000] window So, internally, the "a" and "b" will be in the same segment, and "c" in another segments. segments: [0 /* window start */, records], [500, records]. And the records for window start 0 will be "a" and "b". the records for window start 500 will be "c". Before this change, we did have a reverse iterator for segments, but not in "records". So, when doing `backwardFetchAll`, we'll have the records returned in order: "c", "a", "b", which should be "c", "b", "a" obviously. So, back to the question: why did the original test cases not catch this issue? It's because the test input are all in different window start timestamp, which will have different different segments: ``` private void putFirstBatch(final WindowStore store, @SuppressWarnings("SameParameterValue") final long startTime, final InternalMockProcessorContext context) { context.setRecordContext(createRecordContext(startTime)); store.put(0, "zero", startTime); store.put(1, "one", startTime + 1L); store.put(2, "two", startTime + 2L); store.put(3, "three", startTime + 2L); // <-- this is the new record I added, to test multiple records in the same segment case store.put(4, "four", startTime + 4L); store.put(5, "five", startTime + 5L); } ``` > 2. Related to 1), what additional coverage does the new `WindowStoreFetchTest` provides in addition to the above two test cases? I think I've added above. I added an additional record for AbstractWindowBytesStoreTest` test. In `WindowStoreFetchTest`, we will produce records in timestamp:0, 1, 500, 501, 502, which will be put into window: [0, 500] * 2 and [500, 1000] * 3. And we try to fetch them forward/backward, to see if the results are as expected, i.e.: in reverse order should be 502, 501, 500, 1, 0. The behavior works as expected in `RocksDBWindowStore`. Hope that's clear. I also updated in the PR description. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13282) Draft final NamedTopology API and publish a KIP
A. Sophie Blee-Goldman created KAFKA-13282: -- Summary: Draft final NamedTopology API and publish a KIP Key: KAFKA-13282 URL: https://issues.apache.org/jira/browse/KAFKA-13282 Project: Kafka Issue Type: Sub-task Components: streams Reporter: A. Sophie Blee-Goldman The pre-KIP experimental phase has left quite a few open questions around the API of this new feature, we need to hash that that out and then write it up into a KIP before introducing this in the public interface -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12648) Experiment with resilient isomorphic topologies
[ https://issues.apache.org/jira/browse/KAFKA-12648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-12648: --- Parent: KAFKA-13281 Issue Type: Sub-task (was: New Feature) > Experiment with resilient isomorphic topologies > --- > > Key: KAFKA-12648 > URL: https://issues.apache.org/jira/browse/KAFKA-12648 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Major > > We're not ready to make this a public feature yet, but I want to start > experimenting with some ways to make Streams applications more resilient in > the face of isomorphic topological changes (eg adding/removing/reordering > subtopologies). > If this turns out to be stable and useful, we can circle back on doing a KIP > to bring this feature into the public API -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13281) Support upgrades with dynamic addition/removal of disjoint "named" topologies
A. Sophie Blee-Goldman created KAFKA-13281: -- Summary: Support upgrades with dynamic addition/removal of disjoint "named" topologies Key: KAFKA-13281 URL: https://issues.apache.org/jira/browse/KAFKA-13281 Project: Kafka Issue Type: New Feature Components: streams Reporter: A. Sophie Blee-Goldman -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #11272: KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides
ableegoldman commented on a change in pull request #11272: URL: https://github.com/apache/kafka/pull/11272#discussion_r704890418 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java ## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.namedtopology; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.DeserializationExceptionHandler; +import org.apache.kafka.streams.processor.TimestampExtractor; +import org.apache.kafka.streams.processor.internals.StreamThread; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Properties; +import java.util.function.Supplier; + +import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_DOC; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC; +import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_DOC; +import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_DOC; + +/** + * Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the + * {@link org.apache.kafka.streams.KafkaStreams} or {@link KafkaStreamsNamedTopologyWrapper} constructors will + * determine the defaults, which can then be overridden for specific topologies by passing them in when creating the + * topology via the {@link org.apache.kafka.streams.StreamsBuilder#build(Properties)} or + * {@link NamedTopologyStreamsBuilder#buildNamedTopology(Properties)} methods. + */ +public class TopologyConfig extends AbstractConfig { +private static final ConfigDef CONFIG; +static { +CONFIG = new ConfigDef() + .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG, + Type.INT, + null, + Importance.LOW, + BUFFERED_RECORDS_PER_PARTITION_DOC) +.define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, +Type.CLASS, +null, +Importance.MEDIUM, +DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC) + .define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, + Type.CLASS, + null, + Importance.MEDIUM, + DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC) + .define(MAX_TASK_IDLE_MS_CONFIG, + Type.LONG, + null, + Importance.MEDIUM, + MAX_TASK_IDLE_MS_DOC) + .define(TASK_TIMEOUT_MS_CONFIG, + Type.LONG, + null, + Importance.MEDIUM, + TASK_TIMEOUT_MS_DOC); +} +private final Logger log = LoggerFactory.getLogger(TopologyConfig.class); + +public final String topologyName; +public final boolean eosEnabled; + +final long maxTaskIdleMs; +final long taskTimeoutMs; +final int maxBufferedSize; +final Supplier timestampExtractorSupplier; +final Supplier deserializationExceptionHandlerSupplier; + +public TopologyConfig(final String topologyName, final StreamsConfig globalAppConfigs, final Properties topologyOverrides) { +super(CONFIG, topologyOverrides, false); + +
[GitHub] [kafka] ableegoldman commented on a change in pull request #11272: KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides
ableegoldman commented on a change in pull request #11272: URL: https://github.com/apache/kafka/pull/11272#discussion_r704888258 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java ## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.namedtopology; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.DeserializationExceptionHandler; +import org.apache.kafka.streams.processor.TimestampExtractor; +import org.apache.kafka.streams.processor.internals.StreamThread; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Properties; +import java.util.function.Supplier; + +import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_DOC; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC; +import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_DOC; +import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_DOC; + +/** + * Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the + * {@link org.apache.kafka.streams.KafkaStreams} or {@link KafkaStreamsNamedTopologyWrapper} constructors will + * determine the defaults, which can then be overridden for specific topologies by passing them in when creating the + * topology via the {@link org.apache.kafka.streams.StreamsBuilder#build(Properties)} or + * {@link NamedTopologyStreamsBuilder#buildNamedTopology(Properties)} methods. + */ +public class TopologyConfig extends AbstractConfig { +private static final ConfigDef CONFIG; +static { +CONFIG = new ConfigDef() + .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG, + Type.INT, + null, + Importance.LOW, + BUFFERED_RECORDS_PER_PARTITION_DOC) +.define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, Review comment: where's checkstyle when you need it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11272: KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides
ableegoldman commented on a change in pull request #11272: URL: https://github.com/apache/kafka/pull/11272#discussion_r704887980 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -359,15 +362,24 @@ public final InternalTopologyBuilder setApplicationId(final String applicationId return this; } -public synchronized final InternalTopologyBuilder setStreamsConfig(final StreamsConfig config) { -Objects.requireNonNull(config, "config can't be null"); -this.config = config; +public synchronized final void setTopologyProperties(final Properties props) { +this.topologyProperties = props; +} -return this; +public synchronized final void setStreamsConfig(final StreamsConfig config) { +Objects.requireNonNull(config, "config can't be null"); +this.applicationConfig = config; +topologyConfigs = new TopologyConfig( +topologyName, +applicationConfig, +topologyProperties == null ? Review comment: Technically Optional is not supposed to be used for fields, only return values...so they say. I'm definitely a little concerned about this pattern of null fields but it's already used throughout `InternalTopologyBuilder` and often unavoidable due to the particular order of things in Streams. So yet another thing to clean up at a later time (think guozhang filed a ticket for this already) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11272: KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides
ableegoldman commented on a change in pull request #11272: URL: https://github.com/apache/kafka/pull/11272#discussion_r704886569 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ## @@ -67,7 +68,7 @@ stateDirectory, stateMgr, inputPartitions, -config.getLong(StreamsConfig.TASK_TIMEOUT_MS_CONFIG), Review comment: Do you mean like a separate `TaskConfig` public class for users to configure the same way we do for `StreamsConfig`? I've been going back and forth but ultimately I think we should keep all configs under StreamsConfig so that all configs for each part of the stack are handled in one place (eg it would be confusing if within the Consumer client, there were multiple sets of configs)...still I think we can clean this up eventually, when we do the KIP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13269) Kafka Streams Aggregation data loss between instance restarts and rebalances
[ https://issues.apache.org/jira/browse/KAFKA-13269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17412261#comment-17412261 ] Rohit Bobade commented on KAFKA-13269: -- Thanks [~ableegoldman], will check this and also debug further. Just to add details - I noticed the same behavior (data loss between instance restarts and rebalances) even without EOS > Kafka Streams Aggregation data loss between instance restarts and rebalances > > > Key: KAFKA-13269 > URL: https://issues.apache.org/jira/browse/KAFKA-13269 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.2 >Reporter: Rohit Bobade >Priority: Major > > Using Kafka Streams 2.6.2 and doing count based aggregation of messages. Also > setting Processing Guarantee - EXACTLY_ONCE_BETA and > NUM_STANDBY_REPLICAS_CONFIG = 1. Sending some messages and restarting > instances in middle while processing to test fault tolerance. The output > count is incorrect because of data loss while restoring state. > It looks like the streams task becomes active and starts processing even when > the state is not fully restored but is within the acceptable recovery lag > (default is 1) This results in data loss > {quote}A stateful active task is assigned to an instance only when its state > is within the configured acceptable.recovery.lag, if one exists > {quote} > [https://docs.confluent.io/platform/current/streams/developer-guide/running-app.html?_ga=2.33073014.912824567.1630441414-1598368976.1615841473#state-restoration-during-workload-rebalance] > [https://docs.confluent.io/platform/current/installation/configuration/streams-configs.html#streamsconfigs_acceptable.recovery.lag] > Setting acceptable.recovery.lag to 0 and re-running the chaos tests gives the > correct result. > Related KIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP441:SmoothScalingOutforKafkaStreams-Computingthemost-caught-upinstances] > Just want to get some thoughts on this use case from the Kafka team or if > anyone has encountered similar issue -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file
guozhangwang commented on pull request #11283: URL: https://github.com/apache/kafka/pull/11283#issuecomment-915657410 > but there was a relevant failure in the build: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11283/5/testReport/org.apache.kafka.streams.integration/EosIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldWriteLatestOffsetsToCheckpointOnShutdown_at_least_once_/ > Guessing it's just some flakiness in the test, can you check that out before I merge? @hutchiko I looked at the test code, and it seems to me there's indeed a timing-related flakiness. Could you try to fix it before we merge (you can first try to reproduce it, e.g. on IDE with repeated runs and see how often it could fail; and after you fix it usually we would try to verify that after say 1000 runs, there's no more failure). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13272) KStream offset stuck after brokers outage
[ https://issues.apache.org/jira/browse/KAFKA-13272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17412243#comment-17412243 ] Guozhang Wang commented on KAFKA-13272: --- cc [~hachikuji], have you observed this before? > KStream offset stuck after brokers outage > - > > Key: KAFKA-13272 > URL: https://issues.apache.org/jira/browse/KAFKA-13272 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 > Environment: Kafka running on Kubernetes > centos >Reporter: F Méthot >Priority: Major > > Our KStream app offset stay stuck on 1 partition after outage possibly when > exactly_once is enabled. > Running with KStream 2.8, kafka broker 2.8, > 3 brokers. > commands topic is 10 partitions (replication 2, min-insync 2) > command-expiry-store-changelog topic is 10 partitions (replication 2, > min-insync 2) > events topic is 10 partitions (replication 2, min-insync 2) > with this topology > Topologies: > > {code:java} > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [commands]) > --> KSTREAM-TRANSFORM-01 > Processor: KSTREAM-TRANSFORM-01 (stores: []) > --> KSTREAM-TRANSFORM-02 > <-- KSTREAM-SOURCE-00 > Processor: KSTREAM-TRANSFORM-02 (stores: [command-expiry-store]) > --> KSTREAM-SINK-03 > <-- KSTREAM-TRANSFORM-01 > Sink: KSTREAM-SINK-03 (topic: events) > <-- KSTREAM-TRANSFORM-02 > {code} > h3. > h3. Attempt 1 at reproducing this issue > > Our stream app runs with processing.guarantee *exactly_once* > After a Kafka test outage where all 3 brokers pod were deleted at the same > time, > Brokers restarted and initialized succesfuly. > When restarting the topology above, one of the tasks would never initialize > fully, the restore phase would keep outputting this messages every few > minutes: > > {code:java} > 2021-08-16 14:20:33,421 INFO stream-thread > [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] > Restoration in progress for 1 partitions. > {commands-processor-expiry-store-changelog-8: position=11775908, > end=11775911, totalRestored=2002076} > [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] > (org.apache.kafka.streams.processor.internals.StoreChangelogReader) > {code} > Task for partition 8 would never initialize, no more data would be read from > the source commands topic for that partition. > > In an attempt to recover, we restarted the stream app with stream > processing.guarantee back to at_least_once, than it proceed with reading the > changelog and restoring partition 8 fully. > But we noticed afterward, for the next hour until we rebuilt the system, that > partition 8 from command-expiry-store-changelog would not be > cleaned/compacted by the log cleaner/compacter compared to other partitions. > (could be unrelated, because we have seen that before) > So we resorted to delete/recreate our command-expiry-store-changelog topic > and events topic and regenerate it from the commands, reading from beginning. > Things went back to normal > h3. Attempt 2 at reproducing this issue > kstream runs with *exactly-once* > We force-deleted all 3 pod running kafka. > After that, one of the partition can’t be restored. (like reported in > previous attempt) > For that partition, we noticed these logs on the broker > {code:java} > [2021-08-27 17:45:32,799] INFO [Transaction Marker Channel Manager 1002]: > Couldn’t find leader endpoint for partitions Set(__consumer_offsets-11, > command-expiry-store-changelog-9) while trying to send transaction markers > for commands-processor-0_9, these partitions are likely deleted already and > hence can be skipped > (kafka.coordinator.transaction.TransactionMarkerChannelManager){code} > Then > - we stop the kstream app, > - restarted kafka brokers cleanly > - Restarting the Kstream app, > Those logs messages showed up on the kstream app log: > > {code:java} > 2021-08-27 18:34:42,413 INFO [Consumer > clientId=commands-processor-76602c87-f682-4648-859b-8fa9b6b937f3-StreamThread-1-consumer, > groupId=commands-processor] The following partitions still have unstable > offsets which are not cleared on the broker side: [commands-9], this could be > either transactional offsets waiting for completion, or normal offsets > waiting for replication after appending to local log > [commands-processor-76602c87-f682-4648-859b-8fa9b6b937f3-StreamThread-1] > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) > > {code} > This would cause our processor to not consume from that specific source > topic-partition. > Deleting downstream topic and replaying data would NOT fix the issue > (EXACTLY_ONCE or AT_LEAST_ONCE) > Workaround found: > Deleted the group
[jira] [Commented] (KAFKA-13272) KStream offset stuck after brokers outage
[ https://issues.apache.org/jira/browse/KAFKA-13272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17412242#comment-17412242 ] Guozhang Wang commented on KAFKA-13272: --- Hello [~fmethot], I looked at the ticket and I suspect it is caused by the same issue as reported in https://issues.apache.org/jira/browse/KAFKA-13174. In a quick sum, this seems to be a broker-side issue that after an unclean shutdown the producer's dangling txn was not yet aborted, and hence causing the consumer to not be able to consume forward. Note that by default, on Streams the transaction timeout defaults to 10 secs (please also double check that you did not override this config, "transaction.timeout.ms"), i.e. after there's no activity on that dangling transaction the broker should abort proactively that transaction so that consumers can see the txn marker and proceed. However, after the broker's unclean shutdown, that broker seems not be able to abort that txn after the timeout has elapsed. > KStream offset stuck after brokers outage > - > > Key: KAFKA-13272 > URL: https://issues.apache.org/jira/browse/KAFKA-13272 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 > Environment: Kafka running on Kubernetes > centos >Reporter: F Méthot >Priority: Major > > Our KStream app offset stay stuck on 1 partition after outage possibly when > exactly_once is enabled. > Running with KStream 2.8, kafka broker 2.8, > 3 brokers. > commands topic is 10 partitions (replication 2, min-insync 2) > command-expiry-store-changelog topic is 10 partitions (replication 2, > min-insync 2) > events topic is 10 partitions (replication 2, min-insync 2) > with this topology > Topologies: > > {code:java} > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [commands]) > --> KSTREAM-TRANSFORM-01 > Processor: KSTREAM-TRANSFORM-01 (stores: []) > --> KSTREAM-TRANSFORM-02 > <-- KSTREAM-SOURCE-00 > Processor: KSTREAM-TRANSFORM-02 (stores: [command-expiry-store]) > --> KSTREAM-SINK-03 > <-- KSTREAM-TRANSFORM-01 > Sink: KSTREAM-SINK-03 (topic: events) > <-- KSTREAM-TRANSFORM-02 > {code} > h3. > h3. Attempt 1 at reproducing this issue > > Our stream app runs with processing.guarantee *exactly_once* > After a Kafka test outage where all 3 brokers pod were deleted at the same > time, > Brokers restarted and initialized succesfuly. > When restarting the topology above, one of the tasks would never initialize > fully, the restore phase would keep outputting this messages every few > minutes: > > {code:java} > 2021-08-16 14:20:33,421 INFO stream-thread > [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] > Restoration in progress for 1 partitions. > {commands-processor-expiry-store-changelog-8: position=11775908, > end=11775911, totalRestored=2002076} > [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] > (org.apache.kafka.streams.processor.internals.StoreChangelogReader) > {code} > Task for partition 8 would never initialize, no more data would be read from > the source commands topic for that partition. > > In an attempt to recover, we restarted the stream app with stream > processing.guarantee back to at_least_once, than it proceed with reading the > changelog and restoring partition 8 fully. > But we noticed afterward, for the next hour until we rebuilt the system, that > partition 8 from command-expiry-store-changelog would not be > cleaned/compacted by the log cleaner/compacter compared to other partitions. > (could be unrelated, because we have seen that before) > So we resorted to delete/recreate our command-expiry-store-changelog topic > and events topic and regenerate it from the commands, reading from beginning. > Things went back to normal > h3. Attempt 2 at reproducing this issue > kstream runs with *exactly-once* > We force-deleted all 3 pod running kafka. > After that, one of the partition can’t be restored. (like reported in > previous attempt) > For that partition, we noticed these logs on the broker > {code:java} > [2021-08-27 17:45:32,799] INFO [Transaction Marker Channel Manager 1002]: > Couldn’t find leader endpoint for partitions Set(__consumer_offsets-11, > command-expiry-store-changelog-9) while trying to send transaction markers > for commands-processor-0_9, these partitions are likely deleted already and > hence can be skipped > (kafka.coordinator.transaction.TransactionMarkerChannelManager){code} > Then > - we stop the kstream app, > - restarted kafka brokers cleanly > - Restarting the Kstream app, > Those logs messages showed up on the kstream app log: > > {code:java} > 2021-08-27 18:34:42,413 INFO [Consumer >
[GitHub] [kafka] hachikuji commented on a change in pull request #11313: MINOR: GroupMetadataManager#shutdown should remove metrics
hachikuji commented on a change in pull request #11313: URL: https://github.com/apache/kafka/pull/11313#discussion_r704847225 ## File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ## @@ -977,6 +977,9 @@ class GroupMetadataManager(brokerId: Int, shuttingDown.set(true) if (scheduler.isStarted) scheduler.shutdown() +metrics.removeSensor(GroupMetadataManager.LoadTimeSensor) +metrics.removeSensor("OffsetCommits") Review comment: nit: maybe we could add a constant for these two similar to `GroupMetadataManager.LoadTimeSensor`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file
guozhangwang commented on a change in pull request #11283: URL: https://github.com/apache/kafka/pull/11283#discussion_r704825002 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -565,7 +565,7 @@ public void closeCleanAndRecycleState() { protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) { // commitNeeded indicates we may have processed some records since last commit // and hence we need to refresh checkpointable offsets regardless whether we should checkpoint or not -if (commitNeeded) { +if (commitNeeded || enforceCheckpoint) { Review comment: The reason that I added this check is that `checkpointableOffsets()` can potentially be expensive. I think the fix to have `commitNeeded || enforceCheckpoint` is actually elegant as we did not introduce extra unnecessary overhead much, since it is only true when closing the task. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #11256: KAFKA-13224: Expose consistent broker.id and node.id in config values/originals maps
cmccabe commented on pull request #11256: URL: https://github.com/apache/kafka/pull/11256#issuecomment-915620653 I found some issues with the current PR (for example, it doesn't cover stuff like originalsWithPrefix, etc.) so I opened https://github.com/apache/kafka/pull/11312 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe opened a new pull request #11312: KAFKA-13224: Ensure that broker.id is set in KafkaConfig#originals
cmccabe opened a new pull request #11312: URL: https://github.com/apache/kafka/pull/11312 Some plugins make use of KafkaConfig#originals rather than the KafkaConfig object. We should ensure that these plugins see the correct value for broker.id if the broker is running in KRaft mode and node.id has been configured, but not broker.id. This PR does this by ensuring that both node.id and broker.id are set in the orignals map if either one is set. We also check that they are set to the same value in KafkaConfig#validateValues. Co-author: Ron Dagostino -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11278: KAFKA-12648: Enforce size limits for each task's cache
guozhangwang commented on a change in pull request #11278: URL: https://github.com/apache/kafka/pull/11278#discussion_r704796291 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ## @@ -233,6 +234,14 @@ public boolean hasNamedTopologies() { return !builders.containsKey(UNNAMED_TOPOLOGY); } +/** + * @return true iff the app is using named topologies, or was started up with no topology at all + * and the max buffer was set for the named topologies Review comment: This may be related to @ableegoldman 's meta question: do we set `maxBufferSize = true` in the future if one of the named topology has it overridden, or only when all topologies inside the `TopologyMetadata` has this config overridden? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -151,9 +152,26 @@ void handleRebalanceComplete() { releaseLockedUnassignedTaskDirectories(); +if (topologyMetadata.hasNamedTopologies()) { +for (final Task task : tasks.allTasks()) { +tasksTotalMaxBuffer.put( +task.id().topologyName(), +task.maxBuffer() + tasksTotalMaxBuffer.getOrDefault(task.id().topologyName(), 0L) Review comment: Not sure I understand this logic: why we add these two values to update the `tasksTotalMaxBuffer`? How would `task.maxBuffer()` be inferred in the future? Since now they are only 0 I cannot tell how would this impact the update logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal
junrao commented on a change in pull request #10914: URL: https://github.com/apache/kafka/pull/10914#discussion_r704755926 ## File path: core/src/main/scala/kafka/log/LogCleanerManager.scala ## @@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], val cleanableLogs = dirtyLogs.filter { ltc => (ltc.needCompactionNow && ltc.cleanableBytes > 0) || ltc.cleanableRatio > ltc.log.config.minCleanableRatio } + if(cleanableLogs.isEmpty) { -None +val logsWithTombstonesExpired = dirtyLogs.filter { + case ltc => +// in this case, we are probably in a low throughput situation +// therefore, we should take advantage of this fact and remove tombstones if we can +// under the condition that the log's latest delete horizon is less than the current time +// tracked +ltc.log.latestDeleteHorizon != RecordBatch.NO_TIMESTAMP && ltc.log.latestDeleteHorizon <= time.milliseconds() Review comment: We could store some additional stats related to tombstone in the logcleaner checkpoint file. It seems that to support downgrade, we can't change the version number since the existing code expects the version in the file to match that in the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe opened a new pull request #11311: KAFKA-13280: Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds
cmccabe opened a new pull request #11311: URL: https://github.com/apache/kafka/pull/11311 Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds and KRaftMetadataCache#topicIdsToNames by returning a map subclass that exposes the TopicsImage data structures without copying them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13280) Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds
Colin McCabe created KAFKA-13280: Summary: Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds Key: KAFKA-13280 URL: https://issues.apache.org/jira/browse/KAFKA-13280 Project: Kafka Issue Type: Improvement Reporter: Colin McCabe Assignee: Colin McCabe Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds and KRaftMetadataCache#topicIdsToNames -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kkonstantine merged pull request #11309: MINOR: Remove unsupported rsync and ssh commands from release.py
kkonstantine merged pull request #11309: URL: https://github.com/apache/kafka/pull/11309 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #11309: MINOR: Remove unsupported rsync and ssh commands from release.py
kkonstantine commented on pull request #11309: URL: https://github.com/apache/kafka/pull/11309#issuecomment-915522116 @dajac I'd rather not include a dependency on a specific client right now. The standard client does not support recursive upload/download. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #11301: KAFKA-13276: Prefer KafkaFuture in admin Result constructors
ijuma merged pull request #11301: URL: https://github.com/apache/kafka/pull/11301 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #11301: KAFKA-13276: Prefer KafkaFuture in admin Result constructors
ijuma commented on pull request #11301: URL: https://github.com/apache/kafka/pull/11301#issuecomment-915500735 LGTM, thanks. Merging to 3.0 and trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #11301: KAFKA-13276: Prefer KafkaFuture in admin Result constructors
ijuma commented on a change in pull request #11301: URL: https://github.com/apache/kafka/pull/11301#discussion_r704701395 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -3178,7 +3178,8 @@ public DescribeConsumerGroupsResult describeConsumerGroups(final Collection
[GitHub] [kafka] junrao commented on a change in pull request #11033: KAFKA-12988 Asynchronous API support for RemoteLogMetadataManager add/update methods.
junrao commented on a change in pull request #11033: URL: https://github.com/apache/kafka/pull/11033#discussion_r704689875 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java ## @@ -129,37 +132,44 @@ public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate segmen } // Publish the message to the topic. - doPublishMetadata(segmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(), segmentMetadataUpdate); +return doPublishMetadata(segmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(), segmentMetadataUpdate); } finally { lock.readLock().unlock(); } } @Override -public void putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) +public CompletableFuture putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) throws RemoteStorageException { Objects.requireNonNull(remotePartitionDeleteMetadata, "remotePartitionDeleteMetadata can not be null"); lock.readLock().lock(); try { ensureInitializedAndNotClosed(); - doPublishMetadata(remotePartitionDeleteMetadata.topicIdPartition(), remotePartitionDeleteMetadata); +return doPublishMetadata(remotePartitionDeleteMetadata.topicIdPartition(), remotePartitionDeleteMetadata); } finally { lock.readLock().unlock(); } } -private void doPublishMetadata(TopicIdPartition topicIdPartition, RemoteLogMetadata remoteLogMetadata) +private CompletableFuture doPublishMetadata(TopicIdPartition topicIdPartition, + RemoteLogMetadata remoteLogMetadata) throws RemoteStorageException { log.debug("Publishing metadata for partition: [{}] with context: [{}]", topicIdPartition, remoteLogMetadata); try { // Publish the message to the topic. -RecordMetadata recordMetadata = producerManager.publishMessage(remoteLogMetadata); -// Wait until the consumer catches up with this offset. This will ensure read-after-write consistency -// semantics. -consumerManager.waitTillConsumptionCatchesUp(recordMetadata); +CompletableFuture produceFuture = new CompletableFuture<>(); +producerManager.publishMessage(remoteLogMetadata, produceFuture); +return produceFuture.thenApplyAsync((Function) recordMetadata -> { +try { + consumerManager.waitTillConsumptionCatchesUp(recordMetadata); Review comment: Thanks for the explanation. Semantically, for the CompletableFuture returned from doPublishMetadata(), when do we expect it to complete? The implementation completes it after the metadata is acked in the producer. However, I thought it should be completed after the consumer has caught up on the offset? It would be useful to document this in the API clearly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wycccccc commented on pull request #10881: KAFKA-12947 Replace EasyMock and PowerMock with Mockito for Streams…
wycc commented on pull request #10881: URL: https://github.com/apache/kafka/pull/10881#issuecomment-915451252 @ijuma Sure,I have resolved the problem.If there are other problems, I will solve them immediately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wycccccc commented on a change in pull request #10881: KAFKA-12947 Replace EasyMock and PowerMock with Mockito for Streams…
wycc commented on a change in pull request #10881: URL: https://github.com/apache/kafka/pull/10881#discussion_r704650439 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java ## @@ -42,77 +39,65 @@ import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; -import static org.easymock.EasyMock.createStrictControl; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; -import static org.powermock.api.easymock.PowerMock.mockStatic; -import static org.powermock.api.easymock.PowerMock.replayAll; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.doThrow; -@RunWith(PowerMockRunner.class) -@PrepareForTest(Utils.class) public class StateManagerUtilTest { -@Mock(type = MockType.NICE) +@Mock private ProcessorStateManager stateManager; -@Mock(type = MockType.NICE) +@Mock private StateDirectory stateDirectory; -@Mock(type = MockType.NICE) +@Mock private ProcessorTopology topology; -@Mock(type = MockType.NICE) +@Mock private InternalProcessorContext processorContext; -private IMocksControl ctrl; private Logger logger = new LogContext("test").logger(AbstractTask.class); private final TaskId taskId = new TaskId(0, 0); @Before public void setup() { -ctrl = createStrictControl(); -topology = ctrl.createMock(ProcessorTopology.class); -processorContext = ctrl.createMock(InternalProcessorContext.class); +topology = mock(ProcessorTopology.class); +processorContext = mock(InternalProcessorContext.class); -stateManager = ctrl.createMock(ProcessorStateManager.class); -stateDirectory = ctrl.createMock(StateDirectory.class); +stateManager = mock(ProcessorStateManager.class); +stateDirectory = mock(StateDirectory.class); } @Test public void testRegisterStateStoreWhenTopologyEmpty() { -expect(topology.stateStores()).andReturn(emptyList()); - -ctrl.checkOrder(true); -ctrl.replay(); +when(topology.stateStores()).thenReturn(emptyList()); +inOrder(topology); StateManagerUtil.registerStateStores(logger, "logPrefix:", topology, stateManager, stateDirectory, processorContext); - -ctrl.verify(); } @Test public void testRegisterStateStoreFailToLockStateDirectory() { -expect(topology.stateStores()).andReturn(singletonList(new MockKeyValueStore("store", false))); +when(topology.stateStores()).thenReturn(singletonList(new MockKeyValueStore("store", false))); + +when(stateManager.taskId()).thenReturn(taskId); -expect(stateManager.taskId()).andReturn(taskId); +when(stateDirectory.lock(taskId)).thenReturn(false); -expect(stateDirectory.lock(taskId)).andReturn(false); - -ctrl.checkOrder(true); -ctrl.replay(); +inOrder(topology, stateManager, stateDirectory); Review comment: I have gotten the method how to use inorder and the problem has been solved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11309: MINOR: Remove unsupported rsync and ssh commands from release.py
dajac commented on pull request #11309: URL: https://github.com/apache/kafka/pull/11309#issuecomment-915448147 @kkonstantine Could we use sftp in the script directly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13279) Implement CreateTopicsPolicy for KRaft
Colin McCabe created KAFKA-13279: Summary: Implement CreateTopicsPolicy for KRaft Key: KAFKA-13279 URL: https://issues.apache.org/jira/browse/KAFKA-13279 Project: Kafka Issue Type: Improvement Reporter: Colin McCabe Assignee: Colin McCabe Implement CreateTopicsPolicy for KRaft -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-8406) kafka-topics throws wrong error on invalid configuration with bootstrap-server and alter config
[ https://issues.apache.org/jira/browse/KAFKA-8406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski resolved KAFKA-8406. Fix Version/s: 2.4.0 Resolution: Fixed > kafka-topics throws wrong error on invalid configuration with > bootstrap-server and alter config > --- > > Key: KAFKA-8406 > URL: https://issues.apache.org/jira/browse/KAFKA-8406 > Project: Kafka > Issue Type: Improvement >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Minor > Fix For: 2.4.0 > > > Running > {code:java} > ./kafka-topics --bootstrap-server --alter --config > retention.ms=360 --topic topic{code} > Results in > {code:java} > Missing required argument "[partitions]"{code} > Running > {code:java} > ./kafka-topics --bootstrap-server --alter --config > retention.ms=360 --topic topic --partitions 25{code} > Results in > {code:java} > Option combination "[bootstrap-server],[config]" can't be used with option > "[alter]"{code} > For better clarity, we should just throw the last error outright. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8406) kafka-topics throws wrong error on invalid configuration with bootstrap-server and alter config
[ https://issues.apache.org/jira/browse/KAFKA-8406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17412103#comment-17412103 ] Stanislav Kozlovski commented on KAFKA-8406: Fixed in 2.4 - https://github.com/apache/kafka/pull/6786/files > kafka-topics throws wrong error on invalid configuration with > bootstrap-server and alter config > --- > > Key: KAFKA-8406 > URL: https://issues.apache.org/jira/browse/KAFKA-8406 > Project: Kafka > Issue Type: Improvement >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Minor > > Running > {code:java} > ./kafka-topics --bootstrap-server --alter --config > retention.ms=360 --topic topic{code} > Results in > {code:java} > Missing required argument "[partitions]"{code} > Running > {code:java} > ./kafka-topics --bootstrap-server --alter --config > retention.ms=360 --topic topic --partitions 25{code} > Results in > {code:java} > Option combination "[bootstrap-server],[config]" can't be used with option > "[alter]"{code} > For better clarity, we should just throw the last error outright. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (KAFKA-8406) kafka-topics throws wrong error on invalid configuration with bootstrap-server and alter config
[ https://issues.apache.org/jira/browse/KAFKA-8406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-8406: --- Comment: was deleted (was: [~savulchik] are you sure? Can you try the exact same commands I listed in the description? I just tested this in 2.5 and it is still an issue) > kafka-topics throws wrong error on invalid configuration with > bootstrap-server and alter config > --- > > Key: KAFKA-8406 > URL: https://issues.apache.org/jira/browse/KAFKA-8406 > Project: Kafka > Issue Type: Improvement >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Minor > > Running > {code:java} > ./kafka-topics --bootstrap-server --alter --config > retention.ms=360 --topic topic{code} > Results in > {code:java} > Missing required argument "[partitions]"{code} > Running > {code:java} > ./kafka-topics --bootstrap-server --alter --config > retention.ms=360 --topic topic --partitions 25{code} > Results in > {code:java} > Option combination "[bootstrap-server],[config]" can't be used with option > "[alter]"{code} > For better clarity, we should just throw the last error outright. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kkonstantine opened a new pull request #11309: MINOR: Remove unsupported rsync and ssh commands from release.py
kkonstantine opened a new pull request #11309: URL: https://github.com/apache/kafka/pull/11309 ssh and rsync access has been removed from home.apache.org. Removing the commands from release.py and replacing them with a note to make sure they are manually uploaded with an sftp client instead. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #11302: KAFKA-13243: KIP-773 Differentiate metric latency measured in ms and ns
jlprat commented on pull request #11302: URL: https://github.com/apache/kafka/pull/11302#issuecomment-915413829 Thanks for the reviews @tombentley and @guozhangwang -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley merged pull request #11302: KAFKA-13243: KIP-773 Differentiate metric latency measured in ms and ns
tombentley merged pull request #11302: URL: https://github.com/apache/kafka/pull/11302 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #11302: KAFKA-13243: KIP-773 Differentiate metric latency measured in ms and ns
tombentley commented on pull request #11302: URL: https://github.com/apache/kafka/pull/11302#issuecomment-915409290 Test failures are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #11301: KAFKA-13276: Prefer KafkaFuture in admin Result constructors
tombentley commented on pull request #11301: URL: https://github.com/apache/kafka/pull/11301#issuecomment-915405715 Any more comments @ijuma @dajac @showuon ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13237) Add ActiveBrokerCount and FencedBrokerCount metrics to the ZK controller (KIP-748)
[ https://issues.apache.org/jira/browse/KAFKA-13237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-13237. - Fix Version/s: 3.1.0 Reviewer: Jason Gustafson Resolution: Fixed > Add ActiveBrokerCount and FencedBrokerCount metrics to the ZK controller > (KIP-748) > -- > > Key: KAFKA-13237 > URL: https://issues.apache.org/jira/browse/KAFKA-13237 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > Fix For: 3.1.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac merged pull request #11273: KAFKA-13237; Add ActiveBrokerCount and FencedBrokerCount metrics to the ZK controller (KIP-748)
dajac merged pull request #11273: URL: https://github.com/apache/kafka/pull/11273 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed
rhauch commented on pull request #10112: URL: https://github.com/apache/kafka/pull/10112#issuecomment-915395143 @C0urante wrote: > @rhauch If this is all agreeable I think we're ready to start implementing. Since you've provided a lot of the code yourself I'm happy to let you take on that work if you'd like; otherwise, I'll get started and see if I can have a new PR with these changes out by early next week. Sounds good to me! I'm looking forward to your new PR; please link here and ping me. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11294: KAFKA-13266; `InitialFetchState` should be created after partition is removed from the fetchers
dajac commented on pull request #11294: URL: https://github.com/apache/kafka/pull/11294#issuecomment-915393112 Merged to master and to 3.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #11294: KAFKA-13266; `InitialFetchState` should be created after partition is removed from the fetchers
dajac merged pull request #11294: URL: https://github.com/apache/kafka/pull/11294 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #11033: KAFKA-12988 Asynchronous API support for RemoteLogMetadataManager add/update methods.
satishd commented on a change in pull request #11033: URL: https://github.com/apache/kafka/pull/11033#discussion_r704559745 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ProducerManager.java ## @@ -50,37 +51,38 @@ public ProducerManager(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig, topicPartitioner = rlmmTopicPartitioner; } -public RecordMetadata publishMessage(RemoteLogMetadata remoteLogMetadata) throws KafkaException { +public CompletableFuture publishMessage(RemoteLogMetadata remoteLogMetadata) { +CompletableFuture future = new CompletableFuture<>(); + TopicIdPartition topicIdPartition = remoteLogMetadata.topicIdPartition(); int metadataPartitionNum = topicPartitioner.metadataPartition(topicIdPartition); log.debug("Publishing metadata message of partition:[{}] into metadata topic partition:[{}] with payload: [{}]", -topicIdPartition, metadataPartitionNum, remoteLogMetadata); + topicIdPartition, metadataPartitionNum, remoteLogMetadata); if (metadataPartitionNum >= rlmmConfig.metadataTopicPartitionsCount()) { // This should never occur as long as metadata partitions always remain the same. throw new KafkaException("Chosen partition no " + metadataPartitionNum + " must be less than the partition count: " + rlmmConfig.metadataTopicPartitionsCount()); } -ProducerCallback callback = new ProducerCallback(); try { +Callback callback = new Callback() { +@Override +public void onCompletion(RecordMetadata metadata, + Exception exception) { +if (exception != null) { +future.completeExceptionally(exception); +} else { +future.complete(metadata); +} +} +}; producer.send(new ProducerRecord<>(rlmmConfig.remoteLogMetadataTopicName(), metadataPartitionNum, null, -serde.serialize(remoteLogMetadata)), callback).get(); -} catch (KafkaException e) { -throw e; -} catch (Exception e) { -throw new KafkaException("Exception occurred while publishing message for topicIdPartition: " + topicIdPartition, e); + serde.serialize(remoteLogMetadata)), callback); +} catch (Exception ex) { +future.completeExceptionally(ex); } Review comment: Throwing exception is removed as you can see in the diff. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dongjinleekr commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2
dongjinleekr commented on pull request #7898: URL: https://github.com/apache/kafka/pull/7898#issuecomment-915363359 @ashishpatil09 Many thanks for your interest in this feature. I think it will be released with AK 3.1, but I can't be certain yet. Please refer [here](http://home.apache.org/~dongjin/post/apache-kafka-log4j2-support/) if you need a preview or a custom patch for [2.6.1, 2.8.0]. cc/ @emveee @svudutala @priyavj08 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #11033: KAFKA-12988 Asynchronous API support for RemoteLogMetadataManager add/update methods.
satishd commented on pull request #11033: URL: https://github.com/apache/kafka/pull/11033#issuecomment-915357755 Thanks @junrao for the review comments, addressed them with the latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ccding commented on a change in pull request #11033: KAFKA-12988 Asynchronous API support for RemoteLogMetadataManager add/update methods.
ccding commented on a change in pull request #11033: URL: https://github.com/apache/kafka/pull/11033#discussion_r704546511 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ProducerManager.java ## @@ -50,37 +51,38 @@ public ProducerManager(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig, topicPartitioner = rlmmTopicPartitioner; } -public RecordMetadata publishMessage(RemoteLogMetadata remoteLogMetadata) throws KafkaException { +public CompletableFuture publishMessage(RemoteLogMetadata remoteLogMetadata) { +CompletableFuture future = new CompletableFuture<>(); + TopicIdPartition topicIdPartition = remoteLogMetadata.topicIdPartition(); int metadataPartitionNum = topicPartitioner.metadataPartition(topicIdPartition); log.debug("Publishing metadata message of partition:[{}] into metadata topic partition:[{}] with payload: [{}]", -topicIdPartition, metadataPartitionNum, remoteLogMetadata); + topicIdPartition, metadataPartitionNum, remoteLogMetadata); if (metadataPartitionNum >= rlmmConfig.metadataTopicPartitionsCount()) { // This should never occur as long as metadata partitions always remain the same. throw new KafkaException("Chosen partition no " + metadataPartitionNum + " must be less than the partition count: " + rlmmConfig.metadataTopicPartitionsCount()); } -ProducerCallback callback = new ProducerCallback(); try { +Callback callback = new Callback() { +@Override +public void onCompletion(RecordMetadata metadata, + Exception exception) { +if (exception != null) { +future.completeExceptionally(exception); +} else { +future.complete(metadata); +} +} +}; producer.send(new ProducerRecord<>(rlmmConfig.remoteLogMetadataTopicName(), metadataPartitionNum, null, -serde.serialize(remoteLogMetadata)), callback).get(); -} catch (KafkaException e) { -throw e; -} catch (Exception e) { -throw new KafkaException("Exception occurred while publishing message for topicIdPartition: " + topicIdPartition, e); + serde.serialize(remoteLogMetadata)), callback); +} catch (Exception ex) { +future.completeExceptionally(ex); } Review comment: do we want to remove printing the topic id in the exception? ## File path: storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java ## @@ -62,16 +63,17 @@ * @param remoteLogSegmentMetadata metadata about the remote log segment. * @throws RemoteStorageException if there are any storage related errors occurred. * @throws IllegalArgumentException if the given metadata instance does not have the state as {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED} + * @return a CompletableFuture which will complete once this operation is finished. */ -void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException; +CompletableFuture addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException; /** - * This method is used to update the {@link RemoteLogSegmentMetadata}. Currently, it allows to update with the new + * This method is used to update the {@link RemoteLogSegmentMetadata} asynchronously. Currently, it allows to update with the new * state based on the life cycle of the segment. It can go through the below state transitions. * * * +-++--+ - * |COPY_SEGMENT_STARTED |---|COPY_SEGMENT_FINISHED | + * |COPY_SEGMENT_STARTED |--->|COPY_SEGMENT_FINISHED | Review comment: Can you verify if this could result in a correct HTML doc? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wycccccc commented on a change in pull request #11017: KAFKA-12950 Replace EasyMock and PowerMock with Mockito for KafkaStream
wycc commented on a change in pull request #11017: URL: https://github.com/apache/kafka/pull/11017#discussion_r704545691 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -111,9 +104,24 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.atMostOnce; +import static org.mockito.Mockito.isA; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.withSettings; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.MockitoAnnotations.openMocks; -@RunWith(PowerMockRunner.class) Review comment: I remember that I have modified all the streams and I have updated the module build dependencies. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed
C0urante commented on pull request #10112: URL: https://github.com/apache/kafka/pull/10112#issuecomment-915350922 > I also think that the behavior with the suggested approach and your option 3 is still a lot better than the current situation. Agreed > IIUC the `offset.flush.timeout.ms` would actually not be used anymore, as there actually are no timeouts as the offset commit thread doesn't block anymore. That's mostly correct--we wouldn't be waiting on a blocking operation while iterating through the dequeue(s), although we might still choose to block on the actual write to the offset topic in the [same way that we currently](https://github.com/apache/kafka/blob/fb77da941ac2a34513cf2cd5d11137ba9b275575/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L565-L586) do just for the sake of metrics and allowing users to monitor the health of the connection between the Connect worker and the offsets topic. Not a huge deal though, and the point that we wouldn't be blocking on the task's producer is still valid. I think the issue is less that we'd end up timing out and more that we'd end up violating the guarantee that's provided right now by the framework that each task gets to take up only `offset.flush.timeout.ms` milliseconds per offset commit attempt before aborting the attempt and yielding control to the next task. A dequeue-based approach may actually be worse than the current behavior in that regard if there's no check in place to ensure that iterating over the dequeue doesn't exceed the offset flush timeout. Probably worth the tradeoff, but we can probably satisfy both objectives with your suggestion: > another option might be to incur the iteration on the worker source task thread. I think this'd be great, especially with the snapshotting logic you mention, which should basically eliminate any blocking between the two threads except to prevent race conditions while simple operations like clearing a hash map or assigning a new value to an instance variable take place. One thing that gave me pause initially was the realization that we'd be double-iterating over every source record at this point: once to transform, convert, and dispatch the record to the producer, and then once to verify that it had been acknowledged while iterating over the dequeue it's in. But I can't imagine it'd make a serious difference with CPU utilization given that transformation, conversion, and dispatching to a producer are likely to be at least an order of magnitude more expensive than just checking a boolean flag and possibly inserting the record's offset into a hash map. And memory utilization should be very close to the existing approach, which already tracks every single unacknowledged record in the `outstandingMessages` and `outstandingMessagesBacklog` fields. I think this buys us enough that my earlier-mentioned option 2 (multiple threads for offset commits) isn't called for, since the only blocking operation that would be performed during offset commit at this point is a write to the offsets topic. If the offsets topic is unavailable, it's likely that the impact would be the same across all tasks (unless the task is using a separate offsets topic, which will become possible once the changes for KIP-618 are merged), and even if not, things wouldn't be made any worse than they already are: the offset flush timeout would expire, and the next task in line would get its chance to commit offsets. @rhauch If this is all agreeable I think we're ready to start implementing. Since you've provided a lot of the code yourself I'm happy to let you take on that work if you'd like; otherwise, I'll get started and see if I can have a new PR with these changes out by early next week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-10643) Static membership - repetitive PreparingRebalance with updating metadata for member reason
[ https://issues.apache.org/jira/browse/KAFKA-10643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17411926#comment-17411926 ] John Gray edited comment on KAFKA-10643 at 9/8/21, 2:53 PM: We were having this same issue with our new static consumers once their changelog topics got large enough. The group would never stabilize because of these looping metadata updates. We ended up stabilizing our groups by increasing max.poll.interval.ms and metadata.max.age.ms in our streams apps to longer than however long we expected our restore consumer to take restoring our large stores. 30 minutes ended up working for us. I am not sure if it is expected that a metadata update should trigger a rebalance for a static consumer group with lots of restoring threads, but it certainly sent our groups with large state into a frenzy. It has been a while so you may have moved on from this, but I would be curious to see if these configs help your group, [~maatdeamon]. was (Author: gray.john): We were having this same issue with our new static consumers once their changelog topics got large enough. The group would never stabilize because of these looping metadata updates. We ended up stabilizing our groups by increasing max.poll.record.ms and metadata.max.age.ms in our streams apps to longer than however long we expected our restore consumer to take restoring our large stores. 30 minutes ended up working for us. I am not sure if it is expected that a metadata update should trigger a rebalance for a static consumer group with lots of restoring threads, but it certainly sent our groups with large state into a frenzy. It has been a while so you may have moved on from this, but I would be curious to see if these configs help your group, [~maatdeamon]. > Static membership - repetitive PreparingRebalance with updating metadata for > member reason > -- > > Key: KAFKA-10643 > URL: https://issues.apache.org/jira/browse/KAFKA-10643 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Eran Levy >Priority: Major > Attachments: broker-4-11.csv, client-4-11.csv, > client-d-9-11-11-2020.csv > > > Kafka streams 2.6.0, brokers version 2.6.0. Kafka nodes are healthy, kafka > streams app is healthy. > Configured with static membership. > Every 10 minutes (I assume cause of topic.metadata.refresh.interval.ms), I > see the following group coordinator log for different stream consumers: > INFO [GroupCoordinator 2]: Preparing to rebalance group **--**-stream in > state PreparingRebalance with old generation 12244 (__consumer_offsets-45) > (reason: Updating metadata for member > -stream-11-1-013edd56-ed93-4370-b07c-1c29fbe72c9a) > (kafka.coordinator.group.GroupCoordinator) > and right after that the following log: > INFO [GroupCoordinator 2]: Assignment received from leader for group > **-**-stream for generation 12246 (kafka.coordinator.group.GroupCoordinator) > > Looked a bit on the kafka code and Im not sure that I get why such a thing > happening - is this line described the situation that happens here re the > "reason:"?[https://github.com/apache/kafka/blob/7ca299b8c0f2f3256c40b694078e422350c20d19/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L311] > I also dont see it happening too often in other kafka streams applications > that we have. > The only thing suspicious that I see around every hour that different pods of > that kafka streams application throw this exception: > {"timestamp":"2020-10-25T06:44:20.414Z","level":"INFO","thread":"**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1","logger":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer > > clientId=**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1-restore-consumer, > groupId=null] Error sending fetch request (sessionId=34683236, epoch=2872) > to node > 3:","context":"default","exception":"org.apache.kafka.common.errors.DisconnectException: > null\n"} > I came across this strange behaviour after stated to investigate a strange > stuck rebalancing state after one of the members left the group and caused > the rebalance to stuck - the only thing that I found is that maybe because > that too often preparing to rebalance states, the app might affected of this > bug - KAFKA-9752 ? > I dont understand why it happens, it wasn't before I applied static > membership to that kafka streams application (since around 2 weeks ago). > Will be happy if you can help me > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10643) Static membership - repetitive PreparingRebalance with updating metadata for member reason
[ https://issues.apache.org/jira/browse/KAFKA-10643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17411982#comment-17411982 ] John Gray commented on KAFKA-10643: --- [~maatdeamon] we did set both max.poll.interval.ms and metadata.max.age.ms equal at 30minutes, but I think the key is that it be larger than however long your state stores take to restore. Assuming you have state stores to restore. We could certainly be running into similar but different problems. > Static membership - repetitive PreparingRebalance with updating metadata for > member reason > -- > > Key: KAFKA-10643 > URL: https://issues.apache.org/jira/browse/KAFKA-10643 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Eran Levy >Priority: Major > Attachments: broker-4-11.csv, client-4-11.csv, > client-d-9-11-11-2020.csv > > > Kafka streams 2.6.0, brokers version 2.6.0. Kafka nodes are healthy, kafka > streams app is healthy. > Configured with static membership. > Every 10 minutes (I assume cause of topic.metadata.refresh.interval.ms), I > see the following group coordinator log for different stream consumers: > INFO [GroupCoordinator 2]: Preparing to rebalance group **--**-stream in > state PreparingRebalance with old generation 12244 (__consumer_offsets-45) > (reason: Updating metadata for member > -stream-11-1-013edd56-ed93-4370-b07c-1c29fbe72c9a) > (kafka.coordinator.group.GroupCoordinator) > and right after that the following log: > INFO [GroupCoordinator 2]: Assignment received from leader for group > **-**-stream for generation 12246 (kafka.coordinator.group.GroupCoordinator) > > Looked a bit on the kafka code and Im not sure that I get why such a thing > happening - is this line described the situation that happens here re the > "reason:"?[https://github.com/apache/kafka/blob/7ca299b8c0f2f3256c40b694078e422350c20d19/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L311] > I also dont see it happening too often in other kafka streams applications > that we have. > The only thing suspicious that I see around every hour that different pods of > that kafka streams application throw this exception: > {"timestamp":"2020-10-25T06:44:20.414Z","level":"INFO","thread":"**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1","logger":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer > > clientId=**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1-restore-consumer, > groupId=null] Error sending fetch request (sessionId=34683236, epoch=2872) > to node > 3:","context":"default","exception":"org.apache.kafka.common.errors.DisconnectException: > null\n"} > I came across this strange behaviour after stated to investigate a strange > stuck rebalancing state after one of the members left the group and caused > the rebalance to stuck - the only thing that I found is that maybe because > that too often preparing to rebalance states, the app might affected of this > bug - KAFKA-9752 ? > I dont understand why it happens, it wasn't before I applied static > membership to that kafka streams application (since around 2 weeks ago). > Will be happy if you can help me > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13278) Deserialization behavior of the Fetcher class does not match up with API contract of the Deserializer interface
Julian Reichinger created KAFKA-13278: - Summary: Deserialization behavior of the Fetcher class does not match up with API contract of the Deserializer interface Key: KAFKA-13278 URL: https://issues.apache.org/jira/browse/KAFKA-13278 Project: Kafka Issue Type: Bug Components: clients, documentation Affects Versions: 2.6.0 Reporter: Julian Reichinger The documentation of the {noformat} org.apache.kafka.common.serialization.Deserializer{noformat} interface states that implementations have to expect null byte-arrays and should handle them in a meaningful way. However, at least in the kafka client it seems to be impossible to actually get a null value into a deserializer because the class {noformat} org.apache.kafka.clients.consumer.internals.Fetcher{noformat} does not call the registered deserializer in case of a null value. {code:java} private ConsumerRecord parseRecord(TopicPartition partition, RecordBatch batch, Record record) { try { long offset = record.offset(); long timestamp = record.timestamp(); Optional leaderEpoch = maybeLeaderEpoch(batch.partitionLeaderEpoch()); TimestampType timestampType = batch.timestampType(); Headers headers = new RecordHeaders(record.headers()); ByteBuffer keyBytes = record.key(); byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes); K key = keyBytes == null ? null : this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray); ByteBuffer valueBytes = record.value(); byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes); V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray); return new ConsumerRecord<>(partition.topic(), partition.partition(), offset, timestamp, timestampType, record.checksumOrNull(), keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length, valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length, key, value, headers, leaderEpoch); } catch (RuntimeException e) { throw new SerializationException("Error deserializing key/value for partition " + partition + " at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", e); } } {code} I implemented an ErrorHandlingDeserializer which I use to wrap the actual deserializers and which records the result (value or exception) in a container object. {code:java} /** * Handles exceptions during de-serializations thrown by a delegate {@link Deserializer}. * * @param type of the deserialized object */ final class ErrorHandlingDeserializer implements Deserializer> { private final Deserializer> delegate; private ErrorHandlingDeserializer(Deserializer> delegate) { this.delegate = requireNonNull(delegate); } static ErrorHandlingDeserializer wrap(Deserializer> delegate) { return new ErrorHandlingDeserializer<>(delegate); } @Override public ReadResult deserialize(String topic, @Nullable byte[] data) { try { return ReadResult.successful(delegate.deserialize(topic, data)); } catch (Exception e) { return ReadResult.failed(e); } } } {code} This deserializer cannot produce a null value. However, because of the Fetcher behavior I still have to check for null values in the consumer records at every usage and additionally I also have to check for a null value inside the ReadResult container class, because the Deserializer API says so and I have no guarantee that the Fetcher behavior will never change. In my opinion this behavior is a bug, because everyone implementing a Deserializer would expect to actually receive null values (for example in case of deletions). There should either be a guarantee on the client side that Deserializers always receive null values or that they never receive null values. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10643) Static membership - repetitive PreparingRebalance with updating metadata for member reason
[ https://issues.apache.org/jira/browse/KAFKA-10643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17411932#comment-17411932 ] Maatari edited comment on KAFKA-10643 at 9/8/21, 1:36 PM: -- [~gray.john] Thank you so much. this is still relevant to my team. We have not solved the problems. Quick question, to how long did you set metadata.max.age.ms ? We already increase the max.poll.records.ms, so i am curious how the metadata.max.age.ms actually impact the situation, given that by default it is already 5 minutes, should it be proprtional to max.poll.records.ms somehow ? was (Author: maatdeamon): [~gray.john] Thank you so much. this is still relevant to my team. We have not solved the problems. Quick question, to how long did you set metadata.max.age.ms ? > Static membership - repetitive PreparingRebalance with updating metadata for > member reason > -- > > Key: KAFKA-10643 > URL: https://issues.apache.org/jira/browse/KAFKA-10643 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Eran Levy >Priority: Major > Attachments: broker-4-11.csv, client-4-11.csv, > client-d-9-11-11-2020.csv > > > Kafka streams 2.6.0, brokers version 2.6.0. Kafka nodes are healthy, kafka > streams app is healthy. > Configured with static membership. > Every 10 minutes (I assume cause of topic.metadata.refresh.interval.ms), I > see the following group coordinator log for different stream consumers: > INFO [GroupCoordinator 2]: Preparing to rebalance group **--**-stream in > state PreparingRebalance with old generation 12244 (__consumer_offsets-45) > (reason: Updating metadata for member > -stream-11-1-013edd56-ed93-4370-b07c-1c29fbe72c9a) > (kafka.coordinator.group.GroupCoordinator) > and right after that the following log: > INFO [GroupCoordinator 2]: Assignment received from leader for group > **-**-stream for generation 12246 (kafka.coordinator.group.GroupCoordinator) > > Looked a bit on the kafka code and Im not sure that I get why such a thing > happening - is this line described the situation that happens here re the > "reason:"?[https://github.com/apache/kafka/blob/7ca299b8c0f2f3256c40b694078e422350c20d19/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L311] > I also dont see it happening too often in other kafka streams applications > that we have. > The only thing suspicious that I see around every hour that different pods of > that kafka streams application throw this exception: > {"timestamp":"2020-10-25T06:44:20.414Z","level":"INFO","thread":"**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1","logger":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer > > clientId=**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1-restore-consumer, > groupId=null] Error sending fetch request (sessionId=34683236, epoch=2872) > to node > 3:","context":"default","exception":"org.apache.kafka.common.errors.DisconnectException: > null\n"} > I came across this strange behaviour after stated to investigate a strange > stuck rebalancing state after one of the members left the group and caused > the rebalance to stuck - the only thing that I found is that maybe because > that too often preparing to rebalance states, the app might affected of this > bug - KAFKA-9752 ? > I dont understand why it happens, it wasn't before I applied static > membership to that kafka streams application (since around 2 weeks ago). > Will be happy if you can help me > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12857) Using Connect Sink with CooperativeStickyAssignor results in commit offsets failure
[ https://issues.apache.org/jira/browse/KAFKA-12857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton resolved KAFKA-12857. --- Assignee: (was: dgd_contributor) Resolution: Duplicate > Using Connect Sink with CooperativeStickyAssignor results in commit offsets > failure > --- > > Key: KAFKA-12857 > URL: https://issues.apache.org/jira/browse/KAFKA-12857 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.7.1 > Environment: Linux >Reporter: Oliver Hsu >Priority: Major > > We are attempting to use a Kafka Connect Sink Connector with > {{CooperativeStickyAssignor}} assignment strategy. When we use > {{CooperativeStickyAssignor}} offset commits sometimes fail with > {{[2021-05-26 22:03:36,435] WARN WorkerSinkTask\{id=sink-connector-7} > Ignoring invalid task provided offset > mytopic-0/OffsetAndMetadata\{offset=16305575, leaderEpoch=null, metadata=''} > – partition not assigned, assignment=[mytopic-0] > (org.apache.kafka.connect.runtime.WorkerSinkTask:434)}} > Note that the invalid partition in the warning message matches the partition > assignment. > *Config changes* > {{consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor}} > *Cooperative vs Eager Assignment Strategy background* > > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP429:KafkaConsumerIncrementalRebalanceProtocol-ConsumerRebalanceListenerandConsumerPartitionAssignorSemantics] > With eager assignment: > {quote}Listener#onPartitionsAssigned: called on the full set of assigned > partitions (may have overlap with the partitions passed to > #onPartitionsRevoked > {quote} > With cooperative assignment: > {quote}Listener#onPartitionsAssigned: called on the subset of assigned > partitions that were not previously owned before this rebalance. There should > be no overlap with the revoked partitions (if any). This will always be > called, even if there are no new partitions being assigned to a given member. > {quote} > This means with cooperative assignment, `onPartitionsAssigned` may be called > with a partial assignment or an empty collection. > However, the > [WorkerSinkTask.HandleRebalance|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L669-L680] > class makes the assumption that `onPartitionsAssigned` is called with the > full set of assigned partitions which is true for eager but not coooperative. > {code:java|title=WorkerSinkTask.HandleRebalance.java|borderStyle=solid} > public void onPartitionsAssigned(Collection > partitions) { > log.debug("{} Partitions assigned {}", WorkerSinkTask.this, > partitions); > lastCommittedOffsets = new HashMap<>(); > currentOffsets = new HashMap<>(); > for (TopicPartition tp : partitions) { > long pos = consumer.position(tp); > lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos)); > currentOffsets.put(tp, new OffsetAndMetadata(pos)); > log.debug("{} Assigned topic partition {} with offset {}", > WorkerSinkTask.this, tp, pos); > } > {code} > The {{onPartitionsAssigned}} creates a new empty {{HashMap}} and puts the > offsets of the {{partitions}} in that {{HashMap}}. > In the logs we see > {{[2021-05-26 22:02:09,785] DEBUG WorkerSinkTask\{id=sink-connector-7} > Partitions assigned [myTopic-0] > (org.apache.kafka.connect.runtime.WorkerSinkTask:677)}} > {{[2021-05-26 22:02:13,063] DEBUG WorkerSinkTask\{id=sink-connector-7} > Partitions assigned [] (org.apache.kafka.connect.runtime.WorkerSinkTask:677)}} > {{[2021-05-26 22:02:16,074] DEBUG WorkerSinkTask\{id=sink-connector-7} }} > Partitions assigned [] (org.apache.kafka.connect.runtime.WorkerSinkTask:677)}} > These logs show that the {{CooperativeStickyAssignor}} calls > {{onPartitionsAssigned}} first with the partition assigned to it followed by > additional calls with an empty {{partitions}} collection. > When {{HandleRebalance.onPartitionsAssigned}} is called first with the > assigned partition followed by empty collections, the result will be > {{lastCommittedOffsets}} initialized to an empty {{HashMap}}. > Inside > [WorkerSinkTask.commitOffsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L415-L419], > the current {{committableOffsets}} are based on the > {{lastCommittedOffsets}}, which is an empty {{HashMap}}: > {code:java|title=WorkerSinkTask.java|borderStyle=solid} > private void commitOffsets(long now, boolean closing) { > ... >
[jira] [Commented] (KAFKA-10643) Static membership - repetitive PreparingRebalance with updating metadata for member reason
[ https://issues.apache.org/jira/browse/KAFKA-10643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17411932#comment-17411932 ] Maatari commented on KAFKA-10643: - [~gray.john] Thank you so much. this is still relevant to my team. We have not solved the problems. Quick question, to how long did you set metadata.max.age.ms ? > Static membership - repetitive PreparingRebalance with updating metadata for > member reason > -- > > Key: KAFKA-10643 > URL: https://issues.apache.org/jira/browse/KAFKA-10643 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Eran Levy >Priority: Major > Attachments: broker-4-11.csv, client-4-11.csv, > client-d-9-11-11-2020.csv > > > Kafka streams 2.6.0, brokers version 2.6.0. Kafka nodes are healthy, kafka > streams app is healthy. > Configured with static membership. > Every 10 minutes (I assume cause of topic.metadata.refresh.interval.ms), I > see the following group coordinator log for different stream consumers: > INFO [GroupCoordinator 2]: Preparing to rebalance group **--**-stream in > state PreparingRebalance with old generation 12244 (__consumer_offsets-45) > (reason: Updating metadata for member > -stream-11-1-013edd56-ed93-4370-b07c-1c29fbe72c9a) > (kafka.coordinator.group.GroupCoordinator) > and right after that the following log: > INFO [GroupCoordinator 2]: Assignment received from leader for group > **-**-stream for generation 12246 (kafka.coordinator.group.GroupCoordinator) > > Looked a bit on the kafka code and Im not sure that I get why such a thing > happening - is this line described the situation that happens here re the > "reason:"?[https://github.com/apache/kafka/blob/7ca299b8c0f2f3256c40b694078e422350c20d19/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L311] > I also dont see it happening too often in other kafka streams applications > that we have. > The only thing suspicious that I see around every hour that different pods of > that kafka streams application throw this exception: > {"timestamp":"2020-10-25T06:44:20.414Z","level":"INFO","thread":"**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1","logger":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer > > clientId=**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1-restore-consumer, > groupId=null] Error sending fetch request (sessionId=34683236, epoch=2872) > to node > 3:","context":"default","exception":"org.apache.kafka.common.errors.DisconnectException: > null\n"} > I came across this strange behaviour after stated to investigate a strange > stuck rebalancing state after one of the members left the group and caused > the rebalance to stuck - the only thing that I found is that maybe because > that too often preparing to rebalance states, the app might affected of this > bug - KAFKA-9752 ? > I dont understand why it happens, it wasn't before I applied static > membership to that kafka streams application (since around 2 weeks ago). > Will be happy if you can help me > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10643) Static membership - repetitive PreparingRebalance with updating metadata for member reason
[ https://issues.apache.org/jira/browse/KAFKA-10643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17411926#comment-17411926 ] John Gray edited comment on KAFKA-10643 at 9/8/21, 1:23 PM: We were having this same issue with our new static consumers once their changelog topics got large enough. The group would never stabilize because of these looping metadata updates. We ended up stabilizing our groups by increasing max.poll.record.ms and metadata.max.age.ms in our streams apps to longer than however long we expected our restore consumer to take restoring our large stores. 30 minutes ended up working for us. I am not sure if it is expected that a metadata update should trigger a rebalance for a static consumer group with lots of restoring threads, but it certainly sent our groups with large state into a frenzy. It has been a while so you may have moved on from this, but I would be curious to see if these configs help your group, [~maatdeamon]. was (Author: gray.john): We were having this same issue with our new static consumers once their changelog topics got large enough. The group would never stabilize because of these looping metadata updates. We ended up stabilizing our groups by increasing max.poll.record.ms and metadata.max.age.ms in our streams apps to longer than however long we expected our restore consumer to take restoring our large stores. 30 minutes ended up working for us. I am not sure if this is expected that a metadata update should trigger a rebalance for a static consumer group with lots of restoring threads, but it certainly sent our groups with large state into a frenzy. It has been a while so you may have moved on from this, but I would be curious to see if these configs help your group, [~maatdeamon]. > Static membership - repetitive PreparingRebalance with updating metadata for > member reason > -- > > Key: KAFKA-10643 > URL: https://issues.apache.org/jira/browse/KAFKA-10643 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Eran Levy >Priority: Major > Attachments: broker-4-11.csv, client-4-11.csv, > client-d-9-11-11-2020.csv > > > Kafka streams 2.6.0, brokers version 2.6.0. Kafka nodes are healthy, kafka > streams app is healthy. > Configured with static membership. > Every 10 minutes (I assume cause of topic.metadata.refresh.interval.ms), I > see the following group coordinator log for different stream consumers: > INFO [GroupCoordinator 2]: Preparing to rebalance group **--**-stream in > state PreparingRebalance with old generation 12244 (__consumer_offsets-45) > (reason: Updating metadata for member > -stream-11-1-013edd56-ed93-4370-b07c-1c29fbe72c9a) > (kafka.coordinator.group.GroupCoordinator) > and right after that the following log: > INFO [GroupCoordinator 2]: Assignment received from leader for group > **-**-stream for generation 12246 (kafka.coordinator.group.GroupCoordinator) > > Looked a bit on the kafka code and Im not sure that I get why such a thing > happening - is this line described the situation that happens here re the > "reason:"?[https://github.com/apache/kafka/blob/7ca299b8c0f2f3256c40b694078e422350c20d19/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L311] > I also dont see it happening too often in other kafka streams applications > that we have. > The only thing suspicious that I see around every hour that different pods of > that kafka streams application throw this exception: > {"timestamp":"2020-10-25T06:44:20.414Z","level":"INFO","thread":"**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1","logger":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer > > clientId=**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1-restore-consumer, > groupId=null] Error sending fetch request (sessionId=34683236, epoch=2872) > to node > 3:","context":"default","exception":"org.apache.kafka.common.errors.DisconnectException: > null\n"} > I came across this strange behaviour after stated to investigate a strange > stuck rebalancing state after one of the members left the group and caused > the rebalance to stuck - the only thing that I found is that maybe because > that too often preparing to rebalance states, the app might affected of this > bug - KAFKA-9752 ? > I dont understand why it happens, it wasn't before I applied static > membership to that kafka streams application (since around 2 weeks ago). > Will be happy if you can help me > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10643) Static membership - repetitive PreparingRebalance with updating metadata for member reason
[ https://issues.apache.org/jira/browse/KAFKA-10643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17411926#comment-17411926 ] John Gray commented on KAFKA-10643: --- We were having this same issue with our new static consumers once their changelog topics got large enough. The group would never stabilize because of these looping metadata updates. We ended up stabilizing our groups by increasing max.poll.record.ms and metadata.max.age.ms in our streams apps to longer than however long we expected our restore consumer to take restoring our large stores. 30 minutes ended up working for us. I am not sure if this is expected that a metadata update should trigger a rebalance for a static consumer group with lots of restoring threads, but it certainly sent our groups with large state into a frenzy. It has been a while so you may have moved on from this, but I would be curious to see if these configs help your group, [~maatdeamon]. > Static membership - repetitive PreparingRebalance with updating metadata for > member reason > -- > > Key: KAFKA-10643 > URL: https://issues.apache.org/jira/browse/KAFKA-10643 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Eran Levy >Priority: Major > Attachments: broker-4-11.csv, client-4-11.csv, > client-d-9-11-11-2020.csv > > > Kafka streams 2.6.0, brokers version 2.6.0. Kafka nodes are healthy, kafka > streams app is healthy. > Configured with static membership. > Every 10 minutes (I assume cause of topic.metadata.refresh.interval.ms), I > see the following group coordinator log for different stream consumers: > INFO [GroupCoordinator 2]: Preparing to rebalance group **--**-stream in > state PreparingRebalance with old generation 12244 (__consumer_offsets-45) > (reason: Updating metadata for member > -stream-11-1-013edd56-ed93-4370-b07c-1c29fbe72c9a) > (kafka.coordinator.group.GroupCoordinator) > and right after that the following log: > INFO [GroupCoordinator 2]: Assignment received from leader for group > **-**-stream for generation 12246 (kafka.coordinator.group.GroupCoordinator) > > Looked a bit on the kafka code and Im not sure that I get why such a thing > happening - is this line described the situation that happens here re the > "reason:"?[https://github.com/apache/kafka/blob/7ca299b8c0f2f3256c40b694078e422350c20d19/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L311] > I also dont see it happening too often in other kafka streams applications > that we have. > The only thing suspicious that I see around every hour that different pods of > that kafka streams application throw this exception: > {"timestamp":"2020-10-25T06:44:20.414Z","level":"INFO","thread":"**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1","logger":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer > > clientId=**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1-restore-consumer, > groupId=null] Error sending fetch request (sessionId=34683236, epoch=2872) > to node > 3:","context":"default","exception":"org.apache.kafka.common.errors.DisconnectException: > null\n"} > I came across this strange behaviour after stated to investigate a strange > stuck rebalancing state after one of the members left the group and caused > the rebalance to stuck - the only thing that I found is that maybe because > that too often preparing to rebalance states, the app might affected of this > bug - KAFKA-9752 ? > I dont understand why it happens, it wasn't before I applied static > membership to that kafka streams application (since around 2 weeks ago). > Will be happy if you can help me > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13257) KafkaStreams Support For Latest RocksDB Version
[ https://issues.apache.org/jira/browse/KAFKA-13257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17411910#comment-17411910 ] Bruno Cadonna commented on KAFKA-13257: --- If you want to know the progress of the AK 3.0 release you can follow the dev mailing list to which you can subscribe here. The release is in its last phase, but there is no specific date. Can you not use a different OS image that provides glibc to move forward? > KafkaStreams Support For Latest RocksDB Version > --- > > Key: KAFKA-13257 > URL: https://issues.apache.org/jira/browse/KAFKA-13257 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Alagukannan >Priority: Major > Attachments: hs_err_pid6.log > > > Hi, > Can you please let us know if there is any plan for adding the latest > versions of rocksDB in kafka streams. If your planning it what's the timeline > we are looking at. If not planning to upgrade what's the reason behind it. Is > there any significant impact on upgrading like backward combability etc.. > Just to remind this general query to know about the rocksdb upgrade and its > impact on streams application. > The main pain point behind asking this upgrade is, We tried to build an > application with kafka streams 2.8.0 on an alpine based OS and the docker > base image is as follows > azul/zulu-openjdk-alpine:11.0.12-11.50.19-jre-headless. The streams > application worked fine until it had an interaction with state > store(rocksdb). The jvm crashed with the following error: > # > # A fatal error has been detected by the Java Runtime Environment: > # > # SIGSEGV (0xb) at pc=0x7f9551951b27, pid=6, tid=207 > # > # JRE version: OpenJDK Runtime Environment Zulu11.45+27-CA (11.0.10+9) > (build 11.0.10+9-LTS) > # Java VM: OpenJDK 64-Bit Server VM Zulu11.45+27-CA (11.0.10+9-LTS, mixed > mode, tiered, compressed oops, g1 gc, linux-amd64) > # Problematic frame: > # C [librocksdbjni15322693993163550519.so+0x271b27] > std::_Rb_tree, > std::less, std::allocator > >::_M_erase(std::_Rb_tree_node*)+0x27 > Then we found out rocksdb works well on glibc and not musl lib, where as > alpine supports musl lib alone for native dependencies. Further looking into > rocksdb for a solution we found that they have started supporting both glib > and musl native libs from 6.5.x versions. > But latest kafka streams(2.8.0) is having rocksdb(5.18.x) version. This is > the main reason behind asking for the rocksDB upgrade in kafka streams as > well. > Have attached the PID log where JVM failures are happening. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jlprat commented on pull request #11302: KAFKA-13243: KIP-773 Differentiate metric latency measured in ms and ns
jlprat commented on pull request #11302: URL: https://github.com/apache/kafka/pull/11302#issuecomment-915206008 Failure was https://issues.apache.org/jira/browse/KAFKA-13128 The issue was already closed, but now the test failed for another different reason. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Reopened] (KAFKA-13128) Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
[ https://issues.apache.org/jira/browse/KAFKA-13128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josep Prat reopened KAFKA-13128: Sorry to reopen this issue, it just occurred in this PR [https://github.com/apache/kafka/pull/11302] It's a different error though: {{}} {code:java} java.lang.AssertionError: Unexpected exception thrown while getting the value from store. Expected: is (a string containing "Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING" or a string containing "The state store, source-table, may have migrated to another instance" or a string containing "Cannot get state store source-table because the stream thread is STARTING, not RUNNING") but: was "Cannot get state store source-table because the stream thread is PARTITIONS_REVOKED, not RUNNING"{code} [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11302/3/testReport/junit/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldQueryStoresAfterAddingAndRemovingStreamThread/?cloudbees-analytics-link=scm-reporting%2Ftests%2Ffailed] Let me know if I should have opened a new issue instead of reopening this one. {{}} > Flaky Test > StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread > > > Key: KAFKA-13128 > URL: https://issues.apache.org/jira/browse/KAFKA-13128 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.0.0, 2.8.1 >Reporter: A. Sophie Blee-Goldman >Assignee: Walker Carlson >Priority: Blocker > Labels: flaky-test > Fix For: 3.1.0 > > > h3. Stacktrace > java.lang.AssertionError: Expected: is not null but: was null > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$19(StoreQueryIntegrationTest.java:461) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:506) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:455) > > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11085/5/testReport/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread_2/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vamossagar12 commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
vamossagar12 commented on a change in pull request #10798: URL: https://github.com/apache/kafka/pull/10798#discussion_r704376366 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ## @@ -505,6 +506,14 @@ private void closeOpenIterators() { } } +private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) { +ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length); Review comment: @patrickstuedi , I resorted to creating only 1 instance of DirectByteBuffer(one for putAll keys and values and one for range. I have checked in that code for reference. However, I don't see much of a difference in terms of throughput numbers. I ran it on the same setup(pushing 100 messages on a single partition topic and for each message, push 1M keys using putAll and read through range() call). I did 3 runs per approach (original v/s ByteBuffer) Here are the numbers: ``` ByteBuffer Operator: putAll, Real: 87.206 CPU: 371.460 GC: 11.048 GCCount: 30 avg throughput: 1206503.194 op/s p95 throughput: 1423370.427 op/s p99 throughput: 1437098.797 op/s Operator: range, Real: 115.121 CPU: 328.090 GC: 7.925 GCCount: 20 avg throughput: 923332.052 op/s p95 throughput: 1261332.193 op/s p99 throughput: 1308671.917 op/s Operator: putAll, Real: 87.274 CPU: 382.920 GC: 11.477 GCCount: 29 avg throughput: 1211901.123 op/s p95 throughput: 1432304.571 op/s p99 throughput: 1447671.672 op/s Operator: range, Real: 116.886 CPU: 335.610 GC: 8.230 GCCount: 21 avg throughput: 911159.182 op/s p95 throughput: 1234183.861 op/s p99 throughput: 1255574.938 op/s putAll, Real: 84.438 CPU: 366.390 GC: 10.992 GCCount: 29 avg throughput: 1254820.856 op/s p95 throughput: 1481654.517 op/s p99 throughput: 1508043.762 op/s Operator: range, Real: 114.877 CPU: 338.090 GC: 8.465 GCCount: 21 avg throughput: 935155.065 op/s p95 throughput: 1254738.341 op/s p99 throughput: 1280340.688 op/s Original Operator: putAll, Real: 95.037 CPU: 378.850 GC: 11.078 GCCount: 29 avg throughput: 1100406.576 op/s p95 throughput: 1292393.480 op/s p99 throughput: 1297963.396 op/s Operator: range, Real: 111.177 CPU: 328.180 GC: 8.299 GCCount: 21 avg throughput: 967757.168 op/s p95 throughput: 1281079.326 op/s p99 throughput: 1296172.722 op/s Operator: putAll, Real: 95.186 CPU: 356.040 GC: 10.132 GCCount: 28 avg throughput: 1092794.645 op/s p95 throughput: 1257861.812 op/s p99 throughput: 1286016.834 op/s Operator: range, Real: 112.568 CPU: 347.350 GC: 9.163 GCCount: 25 avg throughput: 952179.810 op/s p95 throughput: 1298717.792 op/s p99 throughput: 1323234.359 op/s Operator: putAll, Real: 97.332 CPU: 400.690 GC: 12.000 GCCount: 30 avg throughput: 1079682.386 op/s p95 throughput: 1283925.766 op/s p99 throughput: 1290499.661 op/s Operator: range, Real: 109.653 CPU: 319.020 GC: 7.995 GCCount: 20 avg throughput: 980557.905 op/s p95 throughput: 1298695.144 op/s p99 throughput: 1313645.941 op/s ``` One thing that I am noticing is that range is better with the original implementation. This is contrary to the initial number s I had posted [here](https://github.com/apache/kafka/pull/10798#issuecomment-872398600) which was wrong due to a bug in my implementation back then. Also, the discussion we had about the capacity allocation rocksdb places a limit of 8MB on keys and 3 GB for values. Either ways, it doesn't recommend storing large keys/values. Wanted to know if there are any configs for keys/values sizes for stores? Don't think I could find it. Lastlty, with the pre-allocation approach for Interactive queries which allows multi threaded access, do we want to restirct by N as you specified above? What do you. and (@guozhangwang / @cadonna ) think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
vamossagar12 commented on pull request #10798: URL: https://github.com/apache/kafka/pull/10798#issuecomment-915194995 > > Okay, let me re-trigger the tests. > > Thanks.. This time there's a compilation error due to benchmarks. I will remove that class. @guozhangwang , i removed that. class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna merged pull request #11250: Kafka 12766 - Disabling WAL-related Options in RocksDB
cadonna merged pull request #11250: URL: https://github.com/apache/kafka/pull/11250 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #11250: Kafka 12766 - Disabling WAL-related Options in RocksDB
cadonna commented on pull request #11250: URL: https://github.com/apache/kafka/pull/11250#issuecomment-915167984 Build failures are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…
vamossagar12 commented on a change in pull request #11211: URL: https://github.com/apache/kafka/pull/11211#discussion_r704335821 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java ## @@ -292,13 +408,46 @@ public V fetch(final K key, time); } +private long getActualWindowStartTime(final long timeFrom) { +return Math.max(timeFrom, ((PersistentWindowStore) wrapped()).getObservedStreamTime() - retentionPeriod + 1); +} + +private KeyValueIterator, V> filterExpiredRecords(final boolean forward) { +final KeyValueIterator, byte[]> allWindowedKeyValueIterator = forward ? wrapped().all() : wrapped().backwardAll(); + +final long observedStreamTime = ((PersistentWindowStore) wrapped()).getObservedStreamTime(); +if (!allWindowedKeyValueIterator.hasNext() || observedStreamTime == ConsumerRecord.NO_TIMESTAMP) +return new MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, streamsMetrics, serdes, time); + +final long windowStartBoundary = observedStreamTime - retentionPeriod + 1; +final List, byte[]>> windowedKeyValuesInBoundary = new ArrayList<>(); + +while (allWindowedKeyValueIterator.hasNext()) { +final KeyValue, byte[]> next = allWindowedKeyValueIterator.next(); +if (next.key.window().endTime().isBefore(Instant.ofEpochMilli(windowStartBoundary))) { +continue; +} +windowedKeyValuesInBoundary.add(next); +} +return new MeteredWindowedKeyValueIterator<>(new WindowedKeyValueIterator(windowedKeyValuesInBoundary.iterator()), fetchSensor, streamsMetrics, serdes, time); +} Review comment: hey.. let me know what you guys think about the above comment of mine. Based upon that, I will proceed with fixing the issue with the test case(which doesn't seem possible with the way I have implemented). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon edited a comment on pull request #11292: KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order
showuon edited a comment on pull request #11292: URL: https://github.com/apache/kafka/pull/11292#issuecomment-915106066 @ableegoldman @guozhangwang , thanks for your comments. I agree that it doesn't need to be integration test. I've moved it out from integration test. Thank you. Failed tests are unrelated. ``` Build / JDK 8 and Scala 2.12 / kafka.server.KRaftClusterTest.testCreateClusterAndPerformReassignment() Build / JDK 8 and Scala 2.12 / kafka.server.KRaftClusterTest.testCreateClusterAndCreateListDeleteTopic() Build / JDK 11 and Scala 2.13 / kafka.server.DelayedOperationTest.testDelayedFuture() -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11292: KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order
showuon commented on pull request #11292: URL: https://github.com/apache/kafka/pull/11292#issuecomment-915106066 @ableegoldman @guozhangwang , thanks for your comments. I agree that it doesn't need to be integration test. I've moved it out from integration test. Thank you. Failed tests are unrelated. ``` Build / JDK 8 and Scala 2.12 / kafka.server.KRaftClusterTest.testCreateClusterAndPerformReassignment() Build / JDK 8 and Scala 2.12 / kafka.server.KRaftClusterTest.testCreateClusterAndCreateListDeleteTopic() Build / JDK 11 and Scala 2.13 / kafka.server.DelayedOperationTest.testDelayedFuture() 111 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-13260) FindCoordinator errorCounts does not handle v4
[ https://issues.apache.org/jira/browse/KAFKA-13260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17411824#comment-17411824 ] Mickael Maison edited comment on KAFKA-13260 at 9/8/21, 9:25 AM: - As things stood last week, it was affecting 3.0.0 RC0 and it wasn't clear if the fix would be merged into 3.0. This is why I tagged this JIRA this way. Thanks for updating it! was (Author: mimaison): As things stood last week, it was affecting 3.0.0 RC0 and it wasn't clear if the fix would be merged into 3.0. This is why I tagged this JIRA this way. > FindCoordinator errorCounts does not handle v4 > -- > > Key: KAFKA-13260 > URL: https://issues.apache.org/jira/browse/KAFKA-13260 > Project: Kafka > Issue Type: Improvement >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Blocker > Fix For: 3.0.0 > > > When using batch find coordinator (>=v4), errorCounts() does not correctly > compute the error count. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13260) FindCoordinator errorCounts does not handle v4
[ https://issues.apache.org/jira/browse/KAFKA-13260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17411824#comment-17411824 ] Mickael Maison commented on KAFKA-13260: As things stood last week, it was affecting 3.0.0 RC0 and it wasn't clear if the fix would be merged into 3.0. This is why I tagged this JIRA this way. > FindCoordinator errorCounts does not handle v4 > -- > > Key: KAFKA-13260 > URL: https://issues.apache.org/jira/browse/KAFKA-13260 > Project: Kafka > Issue Type: Improvement >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Blocker > Fix For: 3.0.0 > > > When using batch find coordinator (>=v4), errorCounts() does not correctly > compute the error count. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mimaison merged pull request #11288: KAFKA-13258/13259/13260: Fix error response generation
mimaison merged pull request #11288: URL: https://github.com/apache/kafka/pull/11288 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13256) Possible NPE in ConfigDef when rendering (enriched) RST or HTML when documentation is not set/NULL
[ https://issues.apache.org/jira/browse/KAFKA-13256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17411820#comment-17411820 ] René Kerner commented on KAFKA-13256: - [~mjsax] Currently there's no way to prevent that users of ConfigDef do NOT provide a documentation. I don't know who's consuming `org.apache.kafka.common.config.ConfigDef#toRst`, `org.apache.kafka.common.config.ConfigDef#toEnrichedRst` and `org.apache.kafka.common.config.ConfigDef#toHtml` but calling these methods in ConfigDef will fail with an NPE when there are ConfigKeys that don't have `documentation` set. An alternative would be to add null-checks into all `org.apache.kafka.common.config.ConfigDef#define` methods, but this would be a drastic API breaking change. > Possible NPE in ConfigDef when rendering (enriched) RST or HTML when > documentation is not set/NULL > -- > > Key: KAFKA-13256 > URL: https://issues.apache.org/jira/browse/KAFKA-13256 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.8.0 >Reporter: René Kerner >Priority: Major > Original Estimate: 0.5h > Remaining Estimate: 0.5h > > While working on Debezium I discovered the following issue: > When Kafka's ConfigDef renders the HTML or RST documentation representation > of the config definition, it requires `ConfigKey.documentation` member > variable to be a java.lang.String instance that's set to an actual value > different than NULL, else NPE happens: > {code:java} > b.append(key.documentation.replaceAll("\n", "")); > {code} > {code:java} > for (String docLine : key.documentation.split("\n")) { > {code} > > When `documentation` is not set/NULL I suggest to either set a valid String > like "No documentation available" or skip that config key. > > I could provide a PR to fix this soon. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mimaison merged pull request #11300: KAFKA-13258/13259/13260: Fix error response generation
mimaison merged pull request #11300: URL: https://github.com/apache/kafka/pull/11300 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison merged pull request #11299: KAFKA-13258/KAFKA-13259: Fix error response generation
mimaison merged pull request #11299: URL: https://github.com/apache/kafka/pull/11299 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13277) Serialization of long tagged string in request/response throws BufferOverflowException
[ https://issues.apache.org/jira/browse/KAFKA-13277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-13277: --- Fix Version/s: 2.7.2 > Serialization of long tagged string in request/response throws > BufferOverflowException > -- > > Key: KAFKA-13277 > URL: https://issues.apache.org/jira/browse/KAFKA-13277 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Blocker > Fix For: 3.0.0, 2.7.2, 2.8.1 > > > Size computation for tagged strings in the message generator is incorrect and > hence it works only for small strings (126 bytes or so) where the length > happens to be correct. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13277) Serialization of long tagged string in request/response throws BufferOverflowException
[ https://issues.apache.org/jira/browse/KAFKA-13277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-13277: --- Affects Version/s: 2.4.1 2.5.1 2.8.0 2.7.1 2.6.2 > Serialization of long tagged string in request/response throws > BufferOverflowException > -- > > Key: KAFKA-13277 > URL: https://issues.apache.org/jira/browse/KAFKA-13277 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.4.1, 2.5.1, 2.8.0, 2.7.1, 2.6.2 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Blocker > Fix For: 3.0.0, 2.7.2, 2.8.1 > > > Size computation for tagged strings in the message generator is incorrect and > hence it works only for small strings (126 bytes or so) where the length > happens to be correct. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13264) backwardFetch in InMemoryWindowStore doesn't return in reverse order
[ https://issues.apache.org/jira/browse/KAFKA-13264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-13264: -- Description: When working on another PR, I found currently, the backwardFetch in InMemoryWindowStore doesn't return in reverse order when there are records in the same window. ex: window size = 500, input records: key: "a", value: "aa", timestamp: 0 ==> will be in [0, 500] window key: "b", value: "bb", timestamp: 10 ==> will be in [0, 500] window So, internally, the "a" and "b" will be in the same segment. when fetch in forward order: "a" -> "b", which is expected when fetch in backward order: "a" -> "b", which is NOT expected (because we didn't make the segment iterator as descendingMap) was: When working on another PR, I found currently, the backwardFetch in InMemoryWindowStore doesn't return in reverse order when there are records in the same window. ex: window size = 500, input records: key: "a", value: "aa", timestamp: 0 ==> will be in [0, 500\] window key: "b", value: "bb", timestamp: 10 ==> will be in [0, 500\] window when fetch in forward order: "a" -> "b", which is expected when fetch in backward order: "a" -> "b", which is NOT expected > backwardFetch in InMemoryWindowStore doesn't return in reverse order > > > Key: KAFKA-13264 > URL: https://issues.apache.org/jira/browse/KAFKA-13264 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > When working on another PR, I found currently, the backwardFetch in > InMemoryWindowStore doesn't return in reverse order when there are records in > the same window. > ex: window size = 500, > input records: > key: "a", value: "aa", timestamp: 0 ==> will be in [0, 500] window > key: "b", value: "bb", timestamp: 10 ==> will be in [0, 500] window > > So, internally, the "a" and "b" will be in the same segment. > when fetch in forward order: > "a" -> "b", which is expected > when fetch in backward order: > "a" -> "b", which is NOT expected (because we didn't make the segment > iterator as descendingMap) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon commented on a change in pull request #11292: KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order
showuon commented on a change in pull request #11292: URL: https://github.com/apache/kafka/pull/11292#discussion_r704188256 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java ## @@ -80,6 +80,15 @@ static final long SEGMENT_INTERVAL = 60_000L; static final long RETENTION_PERIOD = 2 * SEGMENT_INTERVAL; +final long defaultStartTime = SEGMENT_INTERVAL - 4L; + +final KeyValue, String> zero = windowedPair(0, "zero", defaultStartTime); +final KeyValue, String> one = windowedPair(1, "one", defaultStartTime + 1); +final KeyValue, String> two = windowedPair(2, "two", defaultStartTime + 2); +final KeyValue, String> three = windowedPair(3, "three", defaultStartTime + 2); Review comment: Thanks for your comment, but here, I make it as `defaultStartTime + 2` on purpose, to test the case that when window starts time is the same, the forward/backward query API can return the order as expected. I updated the PR description to make it clear. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13277) Serialization of long tagged string in request/response throws BufferOverflowException
[ https://issues.apache.org/jira/browse/KAFKA-13277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-13277: --- Fix Version/s: 2.8.1 > Serialization of long tagged string in request/response throws > BufferOverflowException > -- > > Key: KAFKA-13277 > URL: https://issues.apache.org/jira/browse/KAFKA-13277 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Blocker > Fix For: 3.0.0, 2.8.1 > > > Size computation for tagged strings in the message generator is incorrect and > hence it works only for small strings (126 bytes or so) where the length > happens to be correct. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] comptooladmin commented on a change in pull request #11292: KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order
comptooladmin commented on a change in pull request #11292: URL: https://github.com/apache/kafka/pull/11292#discussion_r704180358 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java ## @@ -80,6 +80,15 @@ static final long SEGMENT_INTERVAL = 60_000L; static final long RETENTION_PERIOD = 2 * SEGMENT_INTERVAL; +final long defaultStartTime = SEGMENT_INTERVAL - 4L; + +final KeyValue, String> zero = windowedPair(0, "zero", defaultStartTime); +final KeyValue, String> one = windowedPair(1, "one", defaultStartTime + 1); +final KeyValue, String> two = windowedPair(2, "two", defaultStartTime + 2); +final KeyValue, String> three = windowedPair(3, "three", defaultStartTime + 2); Review comment: Is this supposed to be defaultStartTime + **_3_**? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
vamossagar12 commented on pull request #10798: URL: https://github.com/apache/kafka/pull/10798#issuecomment-915026729 > Okay, let me re-trigger the tests. Thanks.. This time there's a compilation error due to benchmarks. I will remove that class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #11308: KAFKA-13277; Fix size calculation for tagged string fields in message generator
rajinisivaram commented on pull request #11308: URL: https://github.com/apache/kafka/pull/11308#issuecomment-915008347 @cmccabe Thanks for reviewing and merging to master. @ijuma @dajac Yes, will cherry-pick to 3.0 and 2.8, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11294: KAFKA-13266; `InitialFetchState` should be created after partition is removed from the fetchers
dajac commented on pull request #11294: URL: https://github.com/apache/kafka/pull/11294#issuecomment-915005946 @hachikuji Thanks for your review. I have addressed your comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11294: KAFKA-13266; `InitialFetchState` should be created after partition is removed from the fetchers
dajac commented on a change in pull request #11294: URL: https://github.com/apache/kafka/pull/11294#discussion_r704137955 ## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ## @@ -3242,6 +3260,92 @@ class ReplicaManagerTest { } } + @Test + def testDeltaFollowerStopFetcherBeforeCreatingInitialFetchOffset(): Unit = { +val localId = 1 +val otherId = localId + 1 +val topicPartition = new TopicPartition("foo", 0) + +val mockReplicaFetcherManager = Mockito.mock(classOf[ReplicaFetcherManager]) +val replicaManager = setupReplicaManagerWithMockedPurgatories( + timer = new MockTimer(time), + brokerId = localId, + mockReplicaFetcherManager = Some(mockReplicaFetcherManager) +) + +try { + // The first call to removeFetcherForPartitions should be ignored. + Mockito.when(mockReplicaFetcherManager.removeFetcherForPartitions( +Set(topicPartition)) + ).thenReturn(Map.empty[TopicPartition, PartitionFetchState]) + + // Make the local replica the follower + var followerTopicsDelta = topicsCreateDelta(localId, false) + var followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) + replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta) + + // Check the state of that partition + val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition) + assertFalse(followerPartition.isLeader) + assertEquals(0, followerPartition.getLeaderEpoch) + assertEquals(0, followerPartition.localLogOrException.logEndOffset) + + // Verify that addFetcherForPartitions was called with the correct + // init offset. + Mockito.verify(mockReplicaFetcherManager, Mockito.times(1)) +.addFetcherForPartitions( + Map(topicPartition -> InitialFetchState( +leader = BrokerEndPoint(otherId, "localhost", 9093), +currentLeaderEpoch = 0, +initOffset = 0 + )) +) + + // The second call to removeFetcherForPartitions simulate the case + // where the fetcher write to the log before being shutdown. + Mockito.when(mockReplicaFetcherManager.removeFetcherForPartitions( +Set(topicPartition)) + ).thenAnswer { _ => +replicaManager.getPartition(topicPartition) match { + case HostedPartition.Online(partition) => +partition.appendRecordsToFollowerOrFutureReplica( + records = MemoryRecords.withRecords(CompressionType.NONE, 0, +new SimpleRecord("first message".getBytes)), + isFuture = false +) + + case _ => +} + +Map.empty[TopicPartition, PartitionFetchState] + } + + // Apply changes that bumps the leader epoch. + followerTopicsDelta = topicsChangeDelta(followerMetadataImage.topics(), localId, false) + followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) + replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta) + + assertFalse(followerPartition.isLeader) + assertEquals(1, followerPartition.getLeaderEpoch) + assertEquals(1, followerPartition.localLogOrException.logEndOffset) + + // Verify that addFetcherForPartitions was called with the correct + // init offset. + Mockito.verify(mockReplicaFetcherManager, Mockito.times(1)) +.addFetcherForPartitions( + Map(topicPartition -> InitialFetchState( +leader = BrokerEndPoint(otherId, "localhost", 9093), +currentLeaderEpoch = 1, +initOffset = 1 + )) +) +} finally { + replicaManager.shutdown() +} + +TestUtils.assertNoNonDaemonThreads(this.getClass.getName) Review comment: I was also thinking about this as most of the tests have it but wanted to test/refactor this separately if you don't mind. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on a change in pull request #11302: KAFKA-13243: KIP-773 Differentiate metric latency measured in ms and ns
jlprat commented on a change in pull request #11302: URL: https://github.com/apache/kafka/pull/11302#discussion_r704133811 ## File path: docs/upgrade.html ## @@ -22,9 +22,12 @@ Notable changes in 3.1.0 Apache Kafka supports Java 17. +The following metrics have been deprecated: bufferpool-wait-time-total, io-waittime-total, Review comment: Added the link to the KIP! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11308: KAFKA-13277; Fix size calculation for tagged string fields in message generator
dajac commented on pull request #11308: URL: https://github.com/apache/kafka/pull/11308#issuecomment-914988398 We could also include it in 2.8.1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org