[GitHub] [kafka] kkonstantine commented on pull request #11309: MINOR: Remove unsupported rsync and ssh commands from release.py

2021-09-08 Thread GitBox


kkonstantine commented on pull request #11309:
URL: https://github.com/apache/kafka/pull/11309#issuecomment-915783471


   Created https://issues.apache.org/jira/browse/KAFKA-13284 to track. 
   Merged to trunk, 3.0, 2.8, 2.7 and 2.6


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13284) Use sftp protocol in release.py to upload release candidate artifacts

2021-09-08 Thread Konstantine Karantasis (Jira)
Konstantine Karantasis created KAFKA-13284:
--

 Summary: Use sftp protocol in release.py to upload release 
candidate artifacts 
 Key: KAFKA-13284
 URL: https://issues.apache.org/jira/browse/KAFKA-13284
 Project: Kafka
  Issue Type: Improvement
Reporter: Konstantine Karantasis
 Fix For: 3.1.0


{{home.apache.org}} has restricted access recently to {{sftp}} only.

This prevents {{release.py}} from uploading a single archive with the artifacts 
of a release candidate using {{rsync}} and then unpacking the archive with 
{{ssh}}



The script could be changed to mirror the contents and upload / delete files 
individually using the {{sftp}} protocol. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] satishd commented on a change in pull request #11033: KAFKA-12988 Asynchronous API support for RemoteLogMetadataManager add/update methods.

2021-09-08 Thread GitBox


satishd commented on a change in pull request #11033:
URL: https://github.com/apache/kafka/pull/11033#discussion_r704956559



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
##
@@ -129,37 +132,44 @@ public void 
updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate segmen
 }
 
 // Publish the message to the topic.
-
doPublishMetadata(segmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(),
 segmentMetadataUpdate);
+return 
doPublishMetadata(segmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(),
 segmentMetadataUpdate);
 } finally {
 lock.readLock().unlock();
 }
 }
 
 @Override
-public void putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata 
remotePartitionDeleteMetadata)
+public CompletableFuture 
putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata 
remotePartitionDeleteMetadata)
 throws RemoteStorageException {
 Objects.requireNonNull(remotePartitionDeleteMetadata, 
"remotePartitionDeleteMetadata can not be null");
 
 lock.readLock().lock();
 try {
 ensureInitializedAndNotClosed();
 
-
doPublishMetadata(remotePartitionDeleteMetadata.topicIdPartition(), 
remotePartitionDeleteMetadata);
+return 
doPublishMetadata(remotePartitionDeleteMetadata.topicIdPartition(), 
remotePartitionDeleteMetadata);
 } finally {
 lock.readLock().unlock();
 }
 }
 
-private void doPublishMetadata(TopicIdPartition topicIdPartition, 
RemoteLogMetadata remoteLogMetadata)
+private CompletableFuture doPublishMetadata(TopicIdPartition 
topicIdPartition,
+  RemoteLogMetadata 
remoteLogMetadata)
 throws RemoteStorageException {
 log.debug("Publishing metadata for partition: [{}] with context: 
[{}]", topicIdPartition, remoteLogMetadata);
 
 try {
 // Publish the message to the topic.
-RecordMetadata recordMetadata = 
producerManager.publishMessage(remoteLogMetadata);
-// Wait until the consumer catches up with this offset. This will 
ensure read-after-write consistency
-// semantics.
-consumerManager.waitTillConsumptionCatchesUp(recordMetadata);
+CompletableFuture produceFuture = new 
CompletableFuture<>();
+producerManager.publishMessage(remoteLogMetadata, produceFuture);
+return produceFuture.thenApplyAsync((Function) recordMetadata -> {
+try {
+
consumerManager.waitTillConsumptionCatchesUp(recordMetadata);

Review comment:
   `produceFuture` is completed after `ProducerRecordMetadata` is 
completed. But `doPublishMetadata` takes `produceFuture` and composes with 
`.thenApplyAsync()` and returns the `CompletableFuture` which will be completed 
only after the `consumerManager.waitTillConsumptionCatchesUp(recordMetadata);` 
is returned. 
   So, the returned `CompletableFuture` from `doPublishMetadata` is completed 
only after the consumer is caughtup until the produced record offset. I will 
document it in the code. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #11033: KAFKA-12988 Asynchronous API support for RemoteLogMetadataManager add/update methods.

2021-09-08 Thread GitBox


satishd commented on a change in pull request #11033:
URL: https://github.com/apache/kafka/pull/11033#discussion_r704953596



##
File path: 
storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
##
@@ -62,16 +63,17 @@
  * @param remoteLogSegmentMetadata metadata about the remote log segment.
  * @throws RemoteStorageException   if there are any storage related 
errors occurred.
  * @throws IllegalArgumentException if the given metadata instance does 
not have the state as {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}
+ * @return a CompletableFuture which will complete once this operation is 
finished.
  */
-void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) throws RemoteStorageException;
+CompletableFuture 
addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) 
throws RemoteStorageException;
 
 /**
- * This method is used to update the {@link RemoteLogSegmentMetadata}. 
Currently, it allows to update with the new
+ * This method is used to update the {@link RemoteLogSegmentMetadata} 
asynchronously. Currently, it allows to update with the new
  * state based on the life cycle of the segment. It can go through the 
below state transitions.
  * 
  * 
  * +-++--+
- * |COPY_SEGMENT_STARTED |---|COPY_SEGMENT_FINISHED |
+ * |COPY_SEGMENT_STARTED |--->|COPY_SEGMENT_FINISHED |

Review comment:
   I verified that the correct HTML is generated in the javadoc. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11272: KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides

2021-09-08 Thread GitBox


ableegoldman commented on a change in pull request #11272:
URL: https://github.com/apache/kafka/pull/11272#discussion_r704916241



##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##
@@ -611,6 +611,7 @@ public synchronized Topology build() {
  */
 public synchronized Topology build(final Properties props) {
 internalStreamsBuilder.buildAndOptimizeTopology(props);
+internalTopologyBuilder.setTopologyProperties(props);

Review comment:
   To clarify, these are the props that the user passes in when building 
the topology, eg needed for `TOPOLOGY_OPTIMIZATION` which is the only actual 
topology-level override today. So the overrides are only set the same as the 
global props if the user decides to pass the same set of configs in. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11272: KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides

2021-09-08 Thread GitBox


ableegoldman commented on a change in pull request #11272:
URL: https://github.com/apache/kafka/pull/11272#discussion_r704909650



##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##
@@ -611,6 +611,7 @@ public synchronized Topology build() {
  */
 public synchronized Topology build(final Properties props) {
 internalStreamsBuilder.buildAndOptimizeTopology(props);
+internalTopologyBuilder.setTopologyProperties(props);

Review comment:
   Like this 
https://github.com/apache/kafka/pull/11272/files#diff-0e5e608831150c058e2ad1b45d38ad941739562588ec0fdb97cc9f742919fb1fR139
 ? Or were you referring to something else




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11272: KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides

2021-09-08 Thread GitBox


ableegoldman commented on a change in pull request #11272:
URL: https://github.com/apache/kafka/pull/11272#discussion_r704909244



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java
##
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.namedtopology;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static 
org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG;
+import static 
org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_DOC;
+import static 
org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
+import static 
org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC;
+import static 
org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC;
+import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_CONFIG;
+import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_DOC;
+import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_CONFIG;
+import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_DOC;
+
+/**
+ * Streams configs that apply at the topology level. The values in the {@link 
StreamsConfig} parameter of the
+ * {@link org.apache.kafka.streams.KafkaStreams} or {@link 
KafkaStreamsNamedTopologyWrapper} constructors will
+ * determine the defaults, which can then be overridden for specific 
topologies by passing them in when creating the
+ * topology via the {@link 
org.apache.kafka.streams.StreamsBuilder#build(Properties)} or
+ * {@link NamedTopologyStreamsBuilder#buildNamedTopology(Properties)} methods.
+ */
+public class TopologyConfig extends AbstractConfig {

Review comment:
   I guess, but honestly for now it's going to be on us/me to keep track of 
any new configs that could be topology overrides. It's not a problem if someone 
introduces a config that could be scoped to a single topology and isn't, it's 
just a feature we can expand later. We need to work out a clean API before 
taking this to the KIP stage anyways




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11272: KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides

2021-09-08 Thread GitBox


ableegoldman commented on a change in pull request #11272:
URL: https://github.com/apache/kafka/pull/11272#discussion_r704907727



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
##
@@ -262,7 +262,7 @@ private StreamTask createActiveTask(final TaskId taskId,
 inputPartitions,
 topology,
 consumer,
-config,
+topologyMetadata.getTaskConfigFor(taskId),

Review comment:
   Can you clarify? Not sure if this is what you meant, but this should 
return the actual task configs, with any overrides already applied. I tried to 
keep all the override & config resolution logic inside the TopologyConfig class




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11272: KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides

2021-09-08 Thread GitBox


ableegoldman commented on a change in pull request #11272:
URL: https://github.com/apache/kafka/pull/11272#discussion_r704907727



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
##
@@ -262,7 +262,7 @@ private StreamTask createActiveTask(final TaskId taskId,
 inputPartitions,
 topology,
 consumer,
-config,
+topologyMetadata.getTaskConfigFor(taskId),

Review comment:
   Sorry can you clarify? This should return the actual task configs, with 
any overrides already applied. I tried to keep all the override & config 
resolution logic inside the TopologyConfig class




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon edited a comment on pull request #11292: KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order

2021-09-08 Thread GitBox


showuon edited a comment on pull request #11292:
URL: https://github.com/apache/kafka/pull/11292#issuecomment-915712946


   @guozhangwang , those are good questions. Let me answer them below:
   
   > 1. Do you know why the original test cases in 
AbstractWindowBytesStoreTest, like `shouldGetBackwardAll` and 
`testBackwardFetchRange` did not capture this bug? This test class is leveraged 
by the in-memory stores as well.
   
   That's right, those tests also tested in-memory stores, but it didn't test 
multiple records in the same window cases. Currently, in Window store, we store 
records in [segments -> [records] ]. 
   
   For example:
   window size = 500,
   input records:
   
   key: "a", value: "aa", timestamp: 0 ==> will be in [0, 500] window
   key: "b", value: "bb", timestamp: 10 ==> will be in [0, 500] window
   key: "c", value: "cc", timestamp: 510 ==> will be in [500, 1000] window
   
   So, internally, the "a" and "b" will be in the same segment, and "c" in 
another segments.
   segments: [0 /* window start */, records], [500, records].
   And the records for window start 0 will be "a" and "b".
   the records for window start 500 will be "c".
   
   Before this change, we did have a reverse iterator for segments, but not in 
"records". So, when doing `backwardFetchAll`, we'll have the records returned 
in order: "c", "a", "b", which should be "c", "b", "a" obviously. 
   
   So, back to the question: why did the original test cases not catch this 
issue?
   It's because the test input are all in different window start timestamp, 
which will have different different segments:
   ```
private void putFirstBatch(final WindowStore store,
  @SuppressWarnings("SameParameterValue") final 
long startTime,
  final InternalMockProcessorContext context) {
   context.setRecordContext(createRecordContext(startTime));
   store.put(0, "zero", startTime);
   store.put(1, "one", startTime + 1L);
   store.put(2, "two", startTime + 2L);
   store.put(3, "three", startTime + 2L);  // <-- this is the new 
record I added, to test multiple records in the same segment case
   
   store.put(4, "four", startTime + 4L);
   store.put(5, "five", startTime + 5L);
   }
   ```
   
   > 2. Related to 1), what additional coverage does the new 
`WindowStoreFetchTest` provides in addition to the above two test cases?
   
   I think I've added above. I added an additional record for 
`AbstractWindowBytesStoreTest` test. In `WindowStoreFetchTest`, we will produce 
records in timestamp:0, 1, 500, 501, 502, which will be put into window: [0, 
500] * 2 and [500, 1000] * 3. And we try to fetch them forward/backward, to see 
if the results are as expected, i.e.: in reverse order should be 502, 501, 500, 
1, 0. 
   
   The behavior works as expected in `RocksDBWindowStore`. 
   
   
   Hope that's clear. 
   I also updated in the PR description.
   Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11272: KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides

2021-09-08 Thread GitBox


ableegoldman commented on a change in pull request #11272:
URL: https://github.com/apache/kafka/pull/11272#discussion_r704906629



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java
##
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.namedtopology;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static 
org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG;
+import static 
org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_DOC;
+import static 
org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
+import static 
org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC;
+import static 
org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC;
+import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_CONFIG;
+import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_DOC;
+import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_CONFIG;
+import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_DOC;
+
+/**
+ * Streams configs that apply at the topology level. The values in the {@link 
StreamsConfig} parameter of the
+ * {@link org.apache.kafka.streams.KafkaStreams} or {@link 
KafkaStreamsNamedTopologyWrapper} constructors will
+ * determine the defaults, which can then be overridden for specific 
topologies by passing them in when creating the
+ * topology via the {@link 
org.apache.kafka.streams.StreamsBuilder#build(Properties)} or
+ * {@link NamedTopologyStreamsBuilder#buildNamedTopology(Properties)} methods.
+ */
+public class TopologyConfig extends AbstractConfig {
+private static final ConfigDef CONFIG;
+static {
+CONFIG = new ConfigDef()
+ .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
+ Type.INT,
+ null,
+ Importance.LOW,
+ BUFFERED_RECORDS_PER_PARTITION_DOC)
+.define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
+Type.CLASS,
+null,
+Importance.MEDIUM,
+DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
+ .define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
+ Type.CLASS,
+ null,
+ Importance.MEDIUM,
+ DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC)
+ .define(MAX_TASK_IDLE_MS_CONFIG,
+ Type.LONG,
+ null,
+ Importance.MEDIUM,
+ MAX_TASK_IDLE_MS_DOC)
+ .define(TASK_TIMEOUT_MS_CONFIG,
+ Type.LONG,
+ null,
+ Importance.MEDIUM,
+ TASK_TIMEOUT_MS_DOC);
+}
+private final Logger log = LoggerFactory.getLogger(TopologyConfig.class);
+
+public final String topologyName;
+public final boolean eosEnabled;
+
+final long maxTaskIdleMs;
+final long taskTimeoutMs;
+final int maxBufferedSize;
+final Supplier timestampExtractorSupplier;
+final Supplier 
deserializationExceptionHandlerSupplier;
+
+public TopologyConfig(final String topologyName, final StreamsConfig 
globalAppConfigs, final Properties topologyOverrides) {
+super(CONFIG, topologyOverrides, false);
+
+

[jira] [Created] (KAFKA-13283) Migrate experimental feature to public API

2021-09-08 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13283:
--

 Summary: Migrate experimental feature to public API
 Key: KAFKA-13283
 URL: https://issues.apache.org/jira/browse/KAFKA-13283
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: A. Sophie Blee-Goldman






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on pull request #11292: KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order

2021-09-08 Thread GitBox


showuon commented on pull request #11292:
URL: https://github.com/apache/kafka/pull/11292#issuecomment-915712946


   @guozhangwang , those are good questions. Let me answer them below:
   
   > 1. Do you know why the original test cases in 
AbstractWindowBytesStoreTest, like `shouldGetBackwardAll` and 
`testBackwardFetchRange` did not capture this bug? This test class is leveraged 
by the in-memory stores as well.
   
   That's right, those tests also tested in-memory stores, but it didn't test 
multiple records in the same window cases. Currently, in Window store, we store 
records in [segments -> [records] ]. 
   
   For example:
   window size = 500,
   input records:
   
   key: "a", value: "aa", timestamp: 0 ==> will be in [0, 500] window
   key: "b", value: "bb", timestamp: 10 ==> will be in [0, 500] window
   key: "c", value: "cc", timestamp: 510 ==> will be in [500, 1000] window
   
   So, internally, the "a" and "b" will be in the same segment, and "c" in 
another segments.
   segments: [0 /* window start */, records], [500, records].
   And the records for window start 0 will be "a" and "b".
   the records for window start 500 will be "c".
   
   Before this change, we did have a reverse iterator for segments, but not in 
"records". So, when doing `backwardFetchAll`, we'll have the records returned 
in order: "c", "a", "b", which should be "c", "b", "a" obviously. 
   
   So, back to the question: why did the original test cases not catch this 
issue?
   It's because the test input are all in different window start timestamp, 
which will have different different segments:
   ```
private void putFirstBatch(final WindowStore store,
  @SuppressWarnings("SameParameterValue") final 
long startTime,
  final InternalMockProcessorContext context) {
   context.setRecordContext(createRecordContext(startTime));
   store.put(0, "zero", startTime);
   store.put(1, "one", startTime + 1L);
   store.put(2, "two", startTime + 2L);
   store.put(3, "three", startTime + 2L);  // <-- this is the new 
record I added, to test multiple records in the same segment case
   
   store.put(4, "four", startTime + 4L);
   store.put(5, "five", startTime + 5L);
   }
   ```
   
   > 2. Related to 1), what additional coverage does the new 
`WindowStoreFetchTest` provides in addition to the above two test cases?
   
   I think I've added above. I added an additional record for 
AbstractWindowBytesStoreTest` test. In `WindowStoreFetchTest`, we will produce 
records in timestamp:0, 1, 500, 501, 502, which will be put into window: [0, 
500] * 2 and [500, 1000] * 3. And we try to fetch them forward/backward, to see 
if the results are as expected, i.e.: in reverse order should be 502, 501, 500, 
1, 0. 
   
   The behavior works as expected in `RocksDBWindowStore`. 
   
   
   Hope that's clear. 
   I also updated in the PR description.
   Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13282) Draft final NamedTopology API and publish a KIP

2021-09-08 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13282:
--

 Summary: Draft final NamedTopology API and publish a KIP
 Key: KAFKA-13282
 URL: https://issues.apache.org/jira/browse/KAFKA-13282
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: A. Sophie Blee-Goldman


The pre-KIP experimental phase has left quite a few open questions around the 
API of this new feature, we need to hash that that out and then write it up 
into a KIP before introducing this in the public interface



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12648) Experiment with resilient isomorphic topologies

2021-09-08 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12648:
---
Parent: KAFKA-13281
Issue Type: Sub-task  (was: New Feature)

> Experiment with resilient isomorphic topologies
> ---
>
> Key: KAFKA-12648
> URL: https://issues.apache.org/jira/browse/KAFKA-12648
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
>
> We're not ready to make this a public feature yet, but I want to start 
> experimenting with some ways to make Streams applications more resilient in 
> the face of isomorphic topological changes (eg adding/removing/reordering 
> subtopologies).
> If this turns out to be stable and useful, we can circle back on doing a KIP 
> to bring this feature into the public API



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13281) Support upgrades with dynamic addition/removal of disjoint "named" topologies

2021-09-08 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13281:
--

 Summary: Support upgrades with dynamic addition/removal of 
disjoint "named" topologies
 Key: KAFKA-13281
 URL: https://issues.apache.org/jira/browse/KAFKA-13281
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: A. Sophie Blee-Goldman






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #11272: KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides

2021-09-08 Thread GitBox


ableegoldman commented on a change in pull request #11272:
URL: https://github.com/apache/kafka/pull/11272#discussion_r704890418



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java
##
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.namedtopology;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static 
org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG;
+import static 
org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_DOC;
+import static 
org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
+import static 
org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC;
+import static 
org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC;
+import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_CONFIG;
+import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_DOC;
+import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_CONFIG;
+import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_DOC;
+
+/**
+ * Streams configs that apply at the topology level. The values in the {@link 
StreamsConfig} parameter of the
+ * {@link org.apache.kafka.streams.KafkaStreams} or {@link 
KafkaStreamsNamedTopologyWrapper} constructors will
+ * determine the defaults, which can then be overridden for specific 
topologies by passing them in when creating the
+ * topology via the {@link 
org.apache.kafka.streams.StreamsBuilder#build(Properties)} or
+ * {@link NamedTopologyStreamsBuilder#buildNamedTopology(Properties)} methods.
+ */
+public class TopologyConfig extends AbstractConfig {
+private static final ConfigDef CONFIG;
+static {
+CONFIG = new ConfigDef()
+ .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
+ Type.INT,
+ null,
+ Importance.LOW,
+ BUFFERED_RECORDS_PER_PARTITION_DOC)
+.define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
+Type.CLASS,
+null,
+Importance.MEDIUM,
+DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
+ .define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
+ Type.CLASS,
+ null,
+ Importance.MEDIUM,
+ DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC)
+ .define(MAX_TASK_IDLE_MS_CONFIG,
+ Type.LONG,
+ null,
+ Importance.MEDIUM,
+ MAX_TASK_IDLE_MS_DOC)
+ .define(TASK_TIMEOUT_MS_CONFIG,
+ Type.LONG,
+ null,
+ Importance.MEDIUM,
+ TASK_TIMEOUT_MS_DOC);
+}
+private final Logger log = LoggerFactory.getLogger(TopologyConfig.class);
+
+public final String topologyName;
+public final boolean eosEnabled;
+
+final long maxTaskIdleMs;
+final long taskTimeoutMs;
+final int maxBufferedSize;
+final Supplier timestampExtractorSupplier;
+final Supplier 
deserializationExceptionHandlerSupplier;
+
+public TopologyConfig(final String topologyName, final StreamsConfig 
globalAppConfigs, final Properties topologyOverrides) {
+super(CONFIG, topologyOverrides, false);
+
+

[GitHub] [kafka] ableegoldman commented on a change in pull request #11272: KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides

2021-09-08 Thread GitBox


ableegoldman commented on a change in pull request #11272:
URL: https://github.com/apache/kafka/pull/11272#discussion_r704888258



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java
##
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.namedtopology;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static 
org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG;
+import static 
org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_DOC;
+import static 
org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
+import static 
org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC;
+import static 
org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC;
+import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_CONFIG;
+import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_DOC;
+import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_CONFIG;
+import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_DOC;
+
+/**
+ * Streams configs that apply at the topology level. The values in the {@link 
StreamsConfig} parameter of the
+ * {@link org.apache.kafka.streams.KafkaStreams} or {@link 
KafkaStreamsNamedTopologyWrapper} constructors will
+ * determine the defaults, which can then be overridden for specific 
topologies by passing them in when creating the
+ * topology via the {@link 
org.apache.kafka.streams.StreamsBuilder#build(Properties)} or
+ * {@link NamedTopologyStreamsBuilder#buildNamedTopology(Properties)} methods.
+ */
+public class TopologyConfig extends AbstractConfig {
+private static final ConfigDef CONFIG;
+static {
+CONFIG = new ConfigDef()
+ .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
+ Type.INT,
+ null,
+ Importance.LOW,
+ BUFFERED_RECORDS_PER_PARTITION_DOC)
+.define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,

Review comment:
   where's checkstyle when you need it  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11272: KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides

2021-09-08 Thread GitBox


ableegoldman commented on a change in pull request #11272:
URL: https://github.com/apache/kafka/pull/11272#discussion_r704887980



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -359,15 +362,24 @@ public final InternalTopologyBuilder 
setApplicationId(final String applicationId
 return this;
 }
 
-public synchronized final InternalTopologyBuilder setStreamsConfig(final 
StreamsConfig config) {
-Objects.requireNonNull(config, "config can't be null");
-this.config = config;
+public synchronized final void setTopologyProperties(final Properties 
props) {
+this.topologyProperties = props;
+}
 
-return this;
+public synchronized final void setStreamsConfig(final StreamsConfig 
config) {
+Objects.requireNonNull(config, "config can't be null");
+this.applicationConfig = config;
+topologyConfigs = new TopologyConfig(
+topologyName,
+applicationConfig,
+topologyProperties == null ?

Review comment:
   Technically Optional is not supposed to be used for fields, only return 
values...so they say. I'm definitely a little concerned about this pattern of 
null fields but it's already used throughout `InternalTopologyBuilder` and 
often unavoidable due to the particular order of things in Streams. So yet 
another thing to clean up at a later time (think guozhang filed a ticket for 
this already)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11272: KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides

2021-09-08 Thread GitBox


ableegoldman commented on a change in pull request #11272:
URL: https://github.com/apache/kafka/pull/11272#discussion_r704886569



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##
@@ -67,7 +68,7 @@
 stateDirectory,
 stateMgr,
 inputPartitions,
-config.getLong(StreamsConfig.TASK_TIMEOUT_MS_CONFIG),

Review comment:
   Do you mean like a separate `TaskConfig` public class for users to 
configure the same way we do for `StreamsConfig`? I've been going back and 
forth but ultimately I think we should keep all configs under StreamsConfig so 
that all configs for each part of the stack are handled in one place (eg it 
would be confusing if within the Consumer client, there were multiple sets of 
configs)...still I think we can clean this up eventually, when we do the KIP. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13269) Kafka Streams Aggregation data loss between instance restarts and rebalances

2021-09-08 Thread Rohit Bobade (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17412261#comment-17412261
 ] 

Rohit Bobade commented on KAFKA-13269:
--

Thanks [~ableegoldman], will check this and also debug further. Just to add 
details - I noticed the same behavior (data loss between instance restarts and 
rebalances) even without EOS

> Kafka Streams Aggregation data loss between instance restarts and rebalances
> 
>
> Key: KAFKA-13269
> URL: https://issues.apache.org/jira/browse/KAFKA-13269
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.2
>Reporter: Rohit Bobade
>Priority: Major
>
> Using Kafka Streams 2.6.2 and doing count based aggregation of messages. Also 
> setting Processing Guarantee - EXACTLY_ONCE_BETA and 
> NUM_STANDBY_REPLICAS_CONFIG = 1. Sending some messages and restarting 
> instances in middle while processing to test fault tolerance. The output 
> count is incorrect because of data loss while restoring state.
> It looks like the streams task becomes active and starts processing even when 
> the state is not fully restored but is within the acceptable recovery lag 
> (default is 1) This results in data loss
> {quote}A stateful active task is assigned to an instance only when its state 
> is within the configured acceptable.recovery.lag, if one exists
> {quote}
> [https://docs.confluent.io/platform/current/streams/developer-guide/running-app.html?_ga=2.33073014.912824567.1630441414-1598368976.1615841473#state-restoration-during-workload-rebalance]
> [https://docs.confluent.io/platform/current/installation/configuration/streams-configs.html#streamsconfigs_acceptable.recovery.lag]
> Setting acceptable.recovery.lag to 0 and re-running the chaos tests gives the 
> correct result.
> Related KIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP441:SmoothScalingOutforKafkaStreams-Computingthemost-caught-upinstances]
> Just want to get some thoughts on this use case from the Kafka team or if 
> anyone has encountered similar issue



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

2021-09-08 Thread GitBox


guozhangwang commented on pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#issuecomment-915657410


   > but there was a relevant failure in the build: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11283/5/testReport/org.apache.kafka.streams.integration/EosIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldWriteLatestOffsetsToCheckpointOnShutdown_at_least_once_/
   
   > Guessing it's just some flakiness in the test, can you check that out 
before I merge?
   
   @hutchiko I looked at the test code, and it seems to me there's indeed a 
timing-related flakiness. Could you try to fix it before we merge (you can 
first try to reproduce it, e.g. on IDE with repeated runs and see how often it 
could fail; and after you fix it usually we would try to verify that after say 
1000 runs, there's no more failure).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13272) KStream offset stuck after brokers outage

2021-09-08 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17412243#comment-17412243
 ] 

Guozhang Wang commented on KAFKA-13272:
---

cc [~hachikuji], have you observed this before?

> KStream offset stuck after brokers outage
> -
>
> Key: KAFKA-13272
> URL: https://issues.apache.org/jira/browse/KAFKA-13272
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
> Environment: Kafka running on Kubernetes
> centos
>Reporter: F Méthot
>Priority: Major
>
> Our KStream app offset stay stuck on 1 partition after outage possibly when 
> exactly_once is enabled.
> Running with KStream 2.8, kafka broker 2.8,
>  3 brokers.
> commands topic is 10 partitions (replication 2, min-insync 2)
>  command-expiry-store-changelog topic is 10 partitions (replication 2, 
> min-insync 2)
>  events topic is 10 partitions (replication 2, min-insync 2)
> with this topology
> Topologies:
>  
> {code:java}
> Sub-topology: 0
>  Source: KSTREAM-SOURCE-00 (topics: [commands])
>  --> KSTREAM-TRANSFORM-01
>  Processor: KSTREAM-TRANSFORM-01 (stores: [])
>  --> KSTREAM-TRANSFORM-02
>  <-- KSTREAM-SOURCE-00
>  Processor: KSTREAM-TRANSFORM-02 (stores: [command-expiry-store])
>  --> KSTREAM-SINK-03
>  <-- KSTREAM-TRANSFORM-01
>  Sink: KSTREAM-SINK-03 (topic: events)
>  <-- KSTREAM-TRANSFORM-02
> {code}
> h3.  
> h3. Attempt 1 at reproducing this issue
>  
> Our stream app runs with processing.guarantee *exactly_once* 
> After a Kafka test outage where all 3 brokers pod were deleted at the same 
> time,
> Brokers restarted and initialized succesfuly.
> When restarting the topology above, one of the tasks would never initialize 
> fully, the restore phase would keep outputting this messages every few 
> minutes:
>  
> {code:java}
> 2021-08-16 14:20:33,421 INFO stream-thread 
> [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] 
> Restoration in progress for 1 partitions. 
> {commands-processor-expiry-store-changelog-8: position=11775908, 
> end=11775911, totalRestored=2002076} 
> [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader)
> {code}
> Task for partition 8 would never initialize, no more data would be read from 
> the source commands topic for that partition.
>  
> In an attempt to recover, we restarted the stream app with stream 
> processing.guarantee back to at_least_once, than it proceed with reading the 
> changelog and restoring partition 8 fully.
> But we noticed afterward, for the next hour until we rebuilt the system, that 
> partition 8 from command-expiry-store-changelog would not be 
> cleaned/compacted by the log cleaner/compacter compared to other partitions. 
> (could be unrelated, because we have seen that before)
> So we resorted to delete/recreate our command-expiry-store-changelog topic 
> and events topic and regenerate it from the commands, reading from beginning.
> Things went back to normal
> h3. Attempt 2 at reproducing this issue
> kstream runs with *exactly-once*
> We force-deleted all 3 pod running kafka.
>  After that, one of the partition can’t be restored. (like reported in 
> previous attempt)
>  For that partition, we noticed these logs on the broker
> {code:java}
> [2021-08-27 17:45:32,799] INFO [Transaction Marker Channel Manager 1002]: 
> Couldn’t find leader endpoint for partitions Set(__consumer_offsets-11, 
> command-expiry-store-changelog-9) while trying to send transaction markers 
> for commands-processor-0_9, these partitions are likely deleted already and 
> hence can be skipped 
> (kafka.coordinator.transaction.TransactionMarkerChannelManager){code}
> Then
>  - we stop the kstream app,
>  - restarted kafka brokers cleanly
>  - Restarting the Kstream app, 
> Those logs messages showed up on the kstream app log:
>  
> {code:java}
> 2021-08-27 18:34:42,413 INFO [Consumer 
> clientId=commands-processor-76602c87-f682-4648-859b-8fa9b6b937f3-StreamThread-1-consumer,
>  groupId=commands-processor] The following partitions still have unstable 
> offsets which are not cleared on the broker side: [commands-9], this could be 
> either transactional offsets waiting for completion, or normal offsets 
> waiting for replication after appending to local log 
> [commands-processor-76602c87-f682-4648-859b-8fa9b6b937f3-StreamThread-1] 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
>  
> {code}
> This would cause our processor to not consume from that specific source 
> topic-partition.
>   Deleting downstream topic and replaying data would NOT fix the issue 
> (EXACTLY_ONCE or AT_LEAST_ONCE)
> Workaround found:
> Deleted the group 

[jira] [Commented] (KAFKA-13272) KStream offset stuck after brokers outage

2021-09-08 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17412242#comment-17412242
 ] 

Guozhang Wang commented on KAFKA-13272:
---

Hello [~fmethot], I looked at the ticket and I suspect it is caused by the same 
issue as reported in https://issues.apache.org/jira/browse/KAFKA-13174. In a 
quick sum, this seems to be a broker-side issue that after an unclean shutdown 
the producer's dangling txn was not yet aborted, and hence causing the consumer 
to not be able to consume forward.

Note that by default, on Streams the transaction timeout defaults to 10 secs 
(please also double check that you did not override this config, 
"transaction.timeout.ms"), i.e. after there's no activity on that dangling 
transaction the broker should abort proactively that transaction so that 
consumers can see the txn marker and proceed. However, after the broker's 
unclean shutdown, that broker seems not be able to abort that txn after the 
timeout has elapsed.

> KStream offset stuck after brokers outage
> -
>
> Key: KAFKA-13272
> URL: https://issues.apache.org/jira/browse/KAFKA-13272
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
> Environment: Kafka running on Kubernetes
> centos
>Reporter: F Méthot
>Priority: Major
>
> Our KStream app offset stay stuck on 1 partition after outage possibly when 
> exactly_once is enabled.
> Running with KStream 2.8, kafka broker 2.8,
>  3 brokers.
> commands topic is 10 partitions (replication 2, min-insync 2)
>  command-expiry-store-changelog topic is 10 partitions (replication 2, 
> min-insync 2)
>  events topic is 10 partitions (replication 2, min-insync 2)
> with this topology
> Topologies:
>  
> {code:java}
> Sub-topology: 0
>  Source: KSTREAM-SOURCE-00 (topics: [commands])
>  --> KSTREAM-TRANSFORM-01
>  Processor: KSTREAM-TRANSFORM-01 (stores: [])
>  --> KSTREAM-TRANSFORM-02
>  <-- KSTREAM-SOURCE-00
>  Processor: KSTREAM-TRANSFORM-02 (stores: [command-expiry-store])
>  --> KSTREAM-SINK-03
>  <-- KSTREAM-TRANSFORM-01
>  Sink: KSTREAM-SINK-03 (topic: events)
>  <-- KSTREAM-TRANSFORM-02
> {code}
> h3.  
> h3. Attempt 1 at reproducing this issue
>  
> Our stream app runs with processing.guarantee *exactly_once* 
> After a Kafka test outage where all 3 brokers pod were deleted at the same 
> time,
> Brokers restarted and initialized succesfuly.
> When restarting the topology above, one of the tasks would never initialize 
> fully, the restore phase would keep outputting this messages every few 
> minutes:
>  
> {code:java}
> 2021-08-16 14:20:33,421 INFO stream-thread 
> [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] 
> Restoration in progress for 1 partitions. 
> {commands-processor-expiry-store-changelog-8: position=11775908, 
> end=11775911, totalRestored=2002076} 
> [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader)
> {code}
> Task for partition 8 would never initialize, no more data would be read from 
> the source commands topic for that partition.
>  
> In an attempt to recover, we restarted the stream app with stream 
> processing.guarantee back to at_least_once, than it proceed with reading the 
> changelog and restoring partition 8 fully.
> But we noticed afterward, for the next hour until we rebuilt the system, that 
> partition 8 from command-expiry-store-changelog would not be 
> cleaned/compacted by the log cleaner/compacter compared to other partitions. 
> (could be unrelated, because we have seen that before)
> So we resorted to delete/recreate our command-expiry-store-changelog topic 
> and events topic and regenerate it from the commands, reading from beginning.
> Things went back to normal
> h3. Attempt 2 at reproducing this issue
> kstream runs with *exactly-once*
> We force-deleted all 3 pod running kafka.
>  After that, one of the partition can’t be restored. (like reported in 
> previous attempt)
>  For that partition, we noticed these logs on the broker
> {code:java}
> [2021-08-27 17:45:32,799] INFO [Transaction Marker Channel Manager 1002]: 
> Couldn’t find leader endpoint for partitions Set(__consumer_offsets-11, 
> command-expiry-store-changelog-9) while trying to send transaction markers 
> for commands-processor-0_9, these partitions are likely deleted already and 
> hence can be skipped 
> (kafka.coordinator.transaction.TransactionMarkerChannelManager){code}
> Then
>  - we stop the kstream app,
>  - restarted kafka brokers cleanly
>  - Restarting the Kstream app, 
> Those logs messages showed up on the kstream app log:
>  
> {code:java}
> 2021-08-27 18:34:42,413 INFO [Consumer 
> 

[GitHub] [kafka] hachikuji commented on a change in pull request #11313: MINOR: GroupMetadataManager#shutdown should remove metrics

2021-09-08 Thread GitBox


hachikuji commented on a change in pull request #11313:
URL: https://github.com/apache/kafka/pull/11313#discussion_r704847225



##
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##
@@ -977,6 +977,9 @@ class GroupMetadataManager(brokerId: Int,
 shuttingDown.set(true)
 if (scheduler.isStarted)
   scheduler.shutdown()
+metrics.removeSensor(GroupMetadataManager.LoadTimeSensor)
+metrics.removeSensor("OffsetCommits")

Review comment:
   nit: maybe we could add a constant for these two similar to 
`GroupMetadataManager.LoadTimeSensor`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

2021-09-08 Thread GitBox


guozhangwang commented on a change in pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#discussion_r704825002



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -565,7 +565,7 @@ public void closeCleanAndRecycleState() {
 protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
 // commitNeeded indicates we may have processed some records since 
last commit
 // and hence we need to refresh checkpointable offsets regardless 
whether we should checkpoint or not
-if (commitNeeded) {
+if (commitNeeded || enforceCheckpoint) {

Review comment:
   The reason that I added this check is that `checkpointableOffsets()` can 
potentially be expensive. I think the fix to have `commitNeeded || 
enforceCheckpoint` is actually elegant as we did not introduce extra 
unnecessary overhead much, since it is only true when closing the task.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #11256: KAFKA-13224: Expose consistent broker.id and node.id in config values/originals maps

2021-09-08 Thread GitBox


cmccabe commented on pull request #11256:
URL: https://github.com/apache/kafka/pull/11256#issuecomment-915620653


   I found some issues with the current PR (for example, it doesn't cover stuff 
like originalsWithPrefix, etc.) so I opened 
https://github.com/apache/kafka/pull/11312


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe opened a new pull request #11312: KAFKA-13224: Ensure that broker.id is set in KafkaConfig#originals

2021-09-08 Thread GitBox


cmccabe opened a new pull request #11312:
URL: https://github.com/apache/kafka/pull/11312


   Some plugins make use of KafkaConfig#originals rather than the
   KafkaConfig object. We should ensure that these plugins see the
   correct value for broker.id if the broker is running in KRaft mode and
   node.id has been configured, but not broker.id.
   
   This PR does this by ensuring that both node.id and broker.id are set in
   the orignals map if either one is set.  We also check that they are set
   to the same value in KafkaConfig#validateValues.
   
   Co-author: Ron Dagostino 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11278: KAFKA-12648: Enforce size limits for each task's cache

2021-09-08 Thread GitBox


guozhangwang commented on a change in pull request #11278:
URL: https://github.com/apache/kafka/pull/11278#discussion_r704796291



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -233,6 +234,14 @@ public boolean hasNamedTopologies() {
 return !builders.containsKey(UNNAMED_TOPOLOGY);
 }
 
+/**
+ * @return true iff the app is using named topologies, or was started up 
with no topology at all
+ * and the max buffer was set for the named topologies

Review comment:
   This may be related to @ableegoldman 's meta question: do we set 
`maxBufferSize = true` in the future if one of the named topology has it 
overridden, or only when all topologies inside the `TopologyMetadata` has this 
config overridden?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -151,9 +152,26 @@ void handleRebalanceComplete() {
 
 releaseLockedUnassignedTaskDirectories();
 
+if (topologyMetadata.hasNamedTopologies()) {
+for (final Task task : tasks.allTasks()) {
+tasksTotalMaxBuffer.put(
+task.id().topologyName(),
+task.maxBuffer() + 
tasksTotalMaxBuffer.getOrDefault(task.id().topologyName(), 0L)

Review comment:
   Not sure I understand this logic: why we add these two values to update 
the `tasksTotalMaxBuffer`? How would `task.maxBuffer()` be inferred in the 
future? Since now they are only 0 I cannot tell how would this impact the 
update logic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10914: [KAFKA-8522] Streamline tombstone and transaction marker removal

2021-09-08 Thread GitBox


junrao commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r704755926



##
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##
@@ -198,8 +199,23 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
   val cleanableLogs = dirtyLogs.filter { ltc =>
 (ltc.needCompactionNow && ltc.cleanableBytes > 0) || 
ltc.cleanableRatio > ltc.log.config.minCleanableRatio
   }
+
   if(cleanableLogs.isEmpty) {
-None
+val logsWithTombstonesExpired = dirtyLogs.filter {
+  case ltc => 
+// in this case, we are probably in a low throughput situation
+// therefore, we should take advantage of this fact and remove 
tombstones if we can
+// under the condition that the log's latest delete horizon is 
less than the current time
+// tracked
+ltc.log.latestDeleteHorizon != RecordBatch.NO_TIMESTAMP && 
ltc.log.latestDeleteHorizon <= time.milliseconds()

Review comment:
   We could store some additional stats related to tombstone in the 
logcleaner checkpoint file. It seems that to support downgrade, we can't change 
the version number since the existing code expects the version in the file to 
match that in the code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe opened a new pull request #11311: KAFKA-13280: Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds

2021-09-08 Thread GitBox


cmccabe opened a new pull request #11311:
URL: https://github.com/apache/kafka/pull/11311


   Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds and
   KRaftMetadataCache#topicIdsToNames by returning a map subclass that
   exposes the TopicsImage data structures without copying them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13280) Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds

2021-09-08 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-13280:


 Summary: Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds
 Key: KAFKA-13280
 URL: https://issues.apache.org/jira/browse/KAFKA-13280
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin McCabe
Assignee: Colin McCabe


Avoid O(N) behavior in KRaftMetadataCache#topicNamesToIds and 
KRaftMetadataCache#topicIdsToNames



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kkonstantine merged pull request #11309: MINOR: Remove unsupported rsync and ssh commands from release.py

2021-09-08 Thread GitBox


kkonstantine merged pull request #11309:
URL: https://github.com/apache/kafka/pull/11309


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #11309: MINOR: Remove unsupported rsync and ssh commands from release.py

2021-09-08 Thread GitBox


kkonstantine commented on pull request #11309:
URL: https://github.com/apache/kafka/pull/11309#issuecomment-915522116


   @dajac I'd rather not include a dependency on a specific client right now. 
The standard client does not support recursive upload/download.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #11301: KAFKA-13276: Prefer KafkaFuture in admin Result constructors

2021-09-08 Thread GitBox


ijuma merged pull request #11301:
URL: https://github.com/apache/kafka/pull/11301


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11301: KAFKA-13276: Prefer KafkaFuture in admin Result constructors

2021-09-08 Thread GitBox


ijuma commented on pull request #11301:
URL: https://github.com/apache/kafka/pull/11301#issuecomment-915500735


   LGTM, thanks. Merging to 3.0 and trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #11301: KAFKA-13276: Prefer KafkaFuture in admin Result constructors

2021-09-08 Thread GitBox


ijuma commented on a change in pull request #11301:
URL: https://github.com/apache/kafka/pull/11301#discussion_r704701395



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3178,7 +3178,8 @@ public DescribeConsumerGroupsResult 
describeConsumerGroups(final Collection

[GitHub] [kafka] junrao commented on a change in pull request #11033: KAFKA-12988 Asynchronous API support for RemoteLogMetadataManager add/update methods.

2021-09-08 Thread GitBox


junrao commented on a change in pull request #11033:
URL: https://github.com/apache/kafka/pull/11033#discussion_r704689875



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
##
@@ -129,37 +132,44 @@ public void 
updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate segmen
 }
 
 // Publish the message to the topic.
-
doPublishMetadata(segmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(),
 segmentMetadataUpdate);
+return 
doPublishMetadata(segmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(),
 segmentMetadataUpdate);
 } finally {
 lock.readLock().unlock();
 }
 }
 
 @Override
-public void putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata 
remotePartitionDeleteMetadata)
+public CompletableFuture 
putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata 
remotePartitionDeleteMetadata)
 throws RemoteStorageException {
 Objects.requireNonNull(remotePartitionDeleteMetadata, 
"remotePartitionDeleteMetadata can not be null");
 
 lock.readLock().lock();
 try {
 ensureInitializedAndNotClosed();
 
-
doPublishMetadata(remotePartitionDeleteMetadata.topicIdPartition(), 
remotePartitionDeleteMetadata);
+return 
doPublishMetadata(remotePartitionDeleteMetadata.topicIdPartition(), 
remotePartitionDeleteMetadata);
 } finally {
 lock.readLock().unlock();
 }
 }
 
-private void doPublishMetadata(TopicIdPartition topicIdPartition, 
RemoteLogMetadata remoteLogMetadata)
+private CompletableFuture doPublishMetadata(TopicIdPartition 
topicIdPartition,
+  RemoteLogMetadata 
remoteLogMetadata)
 throws RemoteStorageException {
 log.debug("Publishing metadata for partition: [{}] with context: 
[{}]", topicIdPartition, remoteLogMetadata);
 
 try {
 // Publish the message to the topic.
-RecordMetadata recordMetadata = 
producerManager.publishMessage(remoteLogMetadata);
-// Wait until the consumer catches up with this offset. This will 
ensure read-after-write consistency
-// semantics.
-consumerManager.waitTillConsumptionCatchesUp(recordMetadata);
+CompletableFuture produceFuture = new 
CompletableFuture<>();
+producerManager.publishMessage(remoteLogMetadata, produceFuture);
+return produceFuture.thenApplyAsync((Function) recordMetadata -> {
+try {
+
consumerManager.waitTillConsumptionCatchesUp(recordMetadata);

Review comment:
   Thanks for the explanation. Semantically, for the CompletableFuture 
returned from doPublishMetadata(), when do we expect it to complete? The 
implementation completes it after the metadata is acked in the producer. 
However, I thought it should be completed after the consumer has caught up on 
the offset? It would be useful to document this in the API clearly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wycccccc commented on pull request #10881: KAFKA-12947 Replace EasyMock and PowerMock with Mockito for Streams…

2021-09-08 Thread GitBox


wycc commented on pull request #10881:
URL: https://github.com/apache/kafka/pull/10881#issuecomment-915451252


   @ijuma Sure,I have resolved the problem.If there are other problems, I will 
solve them immediately.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wycccccc commented on a change in pull request #10881: KAFKA-12947 Replace EasyMock and PowerMock with Mockito for Streams…

2021-09-08 Thread GitBox


wycc commented on a change in pull request #10881:
URL: https://github.com/apache/kafka/pull/10881#discussion_r704650439



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
##
@@ -42,77 +39,65 @@
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
-import static org.easymock.EasyMock.createStrictControl;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replayAll;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.doThrow;
 
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Utils.class)
 public class StateManagerUtilTest {
 
-@Mock(type = MockType.NICE)
+@Mock
 private ProcessorStateManager stateManager;
 
-@Mock(type = MockType.NICE)
+@Mock
 private StateDirectory stateDirectory;
 
-@Mock(type = MockType.NICE)
+@Mock
 private ProcessorTopology topology;
 
-@Mock(type = MockType.NICE)
+@Mock
 private InternalProcessorContext processorContext;
 
-private IMocksControl ctrl;
 
 private Logger logger = new LogContext("test").logger(AbstractTask.class);
 
 private final TaskId taskId = new TaskId(0, 0);
 
 @Before
 public void setup() {
-ctrl = createStrictControl();
-topology = ctrl.createMock(ProcessorTopology.class);
-processorContext = ctrl.createMock(InternalProcessorContext.class);
+topology = mock(ProcessorTopology.class);
+processorContext = mock(InternalProcessorContext.class);
 
-stateManager = ctrl.createMock(ProcessorStateManager.class);
-stateDirectory = ctrl.createMock(StateDirectory.class);
+stateManager = mock(ProcessorStateManager.class);
+stateDirectory = mock(StateDirectory.class);
 }
 
 @Test
 public void testRegisterStateStoreWhenTopologyEmpty() {
-expect(topology.stateStores()).andReturn(emptyList());
-
-ctrl.checkOrder(true);
-ctrl.replay();
+when(topology.stateStores()).thenReturn(emptyList());
+inOrder(topology);
 
 StateManagerUtil.registerStateStores(logger,
 "logPrefix:", topology, stateManager, stateDirectory, 
processorContext);
-
-ctrl.verify();
 }
 
 @Test
 public void testRegisterStateStoreFailToLockStateDirectory() {
-expect(topology.stateStores()).andReturn(singletonList(new 
MockKeyValueStore("store", false)));
+when(topology.stateStores()).thenReturn(singletonList(new 
MockKeyValueStore("store", false)));
+
+when(stateManager.taskId()).thenReturn(taskId);
 
-expect(stateManager.taskId()).andReturn(taskId);
+when(stateDirectory.lock(taskId)).thenReturn(false);
 
-expect(stateDirectory.lock(taskId)).andReturn(false);
-
-ctrl.checkOrder(true);
-ctrl.replay();
+inOrder(topology, stateManager, stateDirectory);

Review comment:
   I have gotten the method how to use inorder and the problem has been 
solved.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11309: MINOR: Remove unsupported rsync and ssh commands from release.py

2021-09-08 Thread GitBox


dajac commented on pull request #11309:
URL: https://github.com/apache/kafka/pull/11309#issuecomment-915448147


   @kkonstantine Could we use sftp in the script directly?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13279) Implement CreateTopicsPolicy for KRaft

2021-09-08 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-13279:


 Summary: Implement CreateTopicsPolicy for KRaft
 Key: KAFKA-13279
 URL: https://issues.apache.org/jira/browse/KAFKA-13279
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin McCabe
Assignee: Colin McCabe


Implement CreateTopicsPolicy for KRaft



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8406) kafka-topics throws wrong error on invalid configuration with bootstrap-server and alter config

2021-09-08 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski resolved KAFKA-8406.

Fix Version/s: 2.4.0
   Resolution: Fixed

> kafka-topics throws wrong error on invalid configuration with 
> bootstrap-server and alter config
> ---
>
> Key: KAFKA-8406
> URL: https://issues.apache.org/jira/browse/KAFKA-8406
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Minor
> Fix For: 2.4.0
>
>
> Running
> {code:java}
> ./kafka-topics --bootstrap-server  --alter --config 
> retention.ms=360 --topic topic{code}
> Results in
> {code:java}
> Missing required argument "[partitions]"{code}
> Running
> {code:java}
> ./kafka-topics --bootstrap-server  --alter --config 
> retention.ms=360 --topic topic --partitions 25{code}
> Results in
> {code:java}
> Option combination "[bootstrap-server],[config]" can't be used with option 
> "[alter]"{code}
> For better clarity, we should just throw the last error outright.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8406) kafka-topics throws wrong error on invalid configuration with bootstrap-server and alter config

2021-09-08 Thread Stanislav Kozlovski (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17412103#comment-17412103
 ] 

Stanislav Kozlovski commented on KAFKA-8406:


Fixed in 2.4 - https://github.com/apache/kafka/pull/6786/files

> kafka-topics throws wrong error on invalid configuration with 
> bootstrap-server and alter config
> ---
>
> Key: KAFKA-8406
> URL: https://issues.apache.org/jira/browse/KAFKA-8406
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Minor
>
> Running
> {code:java}
> ./kafka-topics --bootstrap-server  --alter --config 
> retention.ms=360 --topic topic{code}
> Results in
> {code:java}
> Missing required argument "[partitions]"{code}
> Running
> {code:java}
> ./kafka-topics --bootstrap-server  --alter --config 
> retention.ms=360 --topic topic --partitions 25{code}
> Results in
> {code:java}
> Option combination "[bootstrap-server],[config]" can't be used with option 
> "[alter]"{code}
> For better clarity, we should just throw the last error outright.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (KAFKA-8406) kafka-topics throws wrong error on invalid configuration with bootstrap-server and alter config

2021-09-08 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-8406:
---
Comment: was deleted

(was: [~savulchik] are you sure? Can you try the exact same commands I listed 
in the description? I just tested this in 2.5 and it is still an issue)

> kafka-topics throws wrong error on invalid configuration with 
> bootstrap-server and alter config
> ---
>
> Key: KAFKA-8406
> URL: https://issues.apache.org/jira/browse/KAFKA-8406
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Minor
>
> Running
> {code:java}
> ./kafka-topics --bootstrap-server  --alter --config 
> retention.ms=360 --topic topic{code}
> Results in
> {code:java}
> Missing required argument "[partitions]"{code}
> Running
> {code:java}
> ./kafka-topics --bootstrap-server  --alter --config 
> retention.ms=360 --topic topic --partitions 25{code}
> Results in
> {code:java}
> Option combination "[bootstrap-server],[config]" can't be used with option 
> "[alter]"{code}
> For better clarity, we should just throw the last error outright.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kkonstantine opened a new pull request #11309: MINOR: Remove unsupported rsync and ssh commands from release.py

2021-09-08 Thread GitBox


kkonstantine opened a new pull request #11309:
URL: https://github.com/apache/kafka/pull/11309


   ssh and rsync access has been removed from home.apache.org.
   Removing the commands from release.py and replacing them with a note to make 
sure they are manually uploaded with an sftp client instead. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #11302: KAFKA-13243: KIP-773 Differentiate metric latency measured in ms and ns

2021-09-08 Thread GitBox


jlprat commented on pull request #11302:
URL: https://github.com/apache/kafka/pull/11302#issuecomment-915413829


   Thanks for the reviews @tombentley and @guozhangwang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley merged pull request #11302: KAFKA-13243: KIP-773 Differentiate metric latency measured in ms and ns

2021-09-08 Thread GitBox


tombentley merged pull request #11302:
URL: https://github.com/apache/kafka/pull/11302


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #11302: KAFKA-13243: KIP-773 Differentiate metric latency measured in ms and ns

2021-09-08 Thread GitBox


tombentley commented on pull request #11302:
URL: https://github.com/apache/kafka/pull/11302#issuecomment-915409290


   Test failures are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #11301: KAFKA-13276: Prefer KafkaFuture in admin Result constructors

2021-09-08 Thread GitBox


tombentley commented on pull request #11301:
URL: https://github.com/apache/kafka/pull/11301#issuecomment-915405715


   Any more comments @ijuma @dajac @showuon ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13237) Add ActiveBrokerCount and FencedBrokerCount metrics to the ZK controller (KIP-748)

2021-09-08 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-13237.
-
Fix Version/s: 3.1.0
 Reviewer: Jason Gustafson
   Resolution: Fixed

> Add ActiveBrokerCount and FencedBrokerCount metrics to the ZK controller 
> (KIP-748)
> --
>
> Key: KAFKA-13237
> URL: https://issues.apache.org/jira/browse/KAFKA-13237
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
> Fix For: 3.1.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac merged pull request #11273: KAFKA-13237; Add ActiveBrokerCount and FencedBrokerCount metrics to the ZK controller (KIP-748)

2021-09-08 Thread GitBox


dajac merged pull request #11273:
URL: https://github.com/apache/kafka/pull/11273


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

2021-09-08 Thread GitBox


rhauch commented on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-915395143


   @C0urante wrote:
   > @rhauch If this is all agreeable I think we're ready to start 
implementing. Since you've provided a lot of the code yourself I'm happy to let 
you take on that work if you'd like; otherwise, I'll get started and see if I 
can have a new PR with these changes out by early next week.
   
   Sounds good to me! I'm looking forward to your new PR; please link here and 
ping me. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11294: KAFKA-13266; `InitialFetchState` should be created after partition is removed from the fetchers

2021-09-08 Thread GitBox


dajac commented on pull request #11294:
URL: https://github.com/apache/kafka/pull/11294#issuecomment-915393112


   Merged to master and to 3.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac merged pull request #11294: KAFKA-13266; `InitialFetchState` should be created after partition is removed from the fetchers

2021-09-08 Thread GitBox


dajac merged pull request #11294:
URL: https://github.com/apache/kafka/pull/11294


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #11033: KAFKA-12988 Asynchronous API support for RemoteLogMetadataManager add/update methods.

2021-09-08 Thread GitBox


satishd commented on a change in pull request #11033:
URL: https://github.com/apache/kafka/pull/11033#discussion_r704559745



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ProducerManager.java
##
@@ -50,37 +51,38 @@ public 
ProducerManager(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig,
 topicPartitioner = rlmmTopicPartitioner;
 }
 
-public RecordMetadata publishMessage(RemoteLogMetadata remoteLogMetadata) 
throws KafkaException {
+public CompletableFuture publishMessage(RemoteLogMetadata 
remoteLogMetadata) {
+CompletableFuture future = new CompletableFuture<>();
+
 TopicIdPartition topicIdPartition = 
remoteLogMetadata.topicIdPartition();
 int metadataPartitionNum = 
topicPartitioner.metadataPartition(topicIdPartition);
 log.debug("Publishing metadata message of partition:[{}] into metadata 
topic partition:[{}] with payload: [{}]",
-topicIdPartition, metadataPartitionNum, remoteLogMetadata);
+  topicIdPartition, metadataPartitionNum, remoteLogMetadata);
 if (metadataPartitionNum >= rlmmConfig.metadataTopicPartitionsCount()) 
{
 // This should never occur as long as metadata partitions always 
remain the same.
 throw new KafkaException("Chosen partition no " + 
metadataPartitionNum +
  " must be less than the partition 
count: " + rlmmConfig.metadataTopicPartitionsCount());
 }
 
-ProducerCallback callback = new ProducerCallback();
 try {
+Callback callback = new Callback() {
+@Override
+public void onCompletion(RecordMetadata metadata,
+ Exception exception) {
+if (exception != null) {
+future.completeExceptionally(exception);
+} else {
+future.complete(metadata);
+}
+}
+};
 producer.send(new 
ProducerRecord<>(rlmmConfig.remoteLogMetadataTopicName(), metadataPartitionNum, 
null,
-serde.serialize(remoteLogMetadata)), callback).get();
-} catch (KafkaException e) {
-throw e;
-} catch (Exception e) {
-throw new KafkaException("Exception occurred while publishing 
message for topicIdPartition: " + topicIdPartition, e);
+   
serde.serialize(remoteLogMetadata)), callback);
+} catch (Exception ex) {
+future.completeExceptionally(ex);
 }

Review comment:
   Throwing exception is removed as you can see in the diff. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

2021-09-08 Thread GitBox


dongjinleekr commented on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-915363359


   @ashishpatil09 Many thanks for your interest in this feature. I think it 
will be released with AK 3.1, but I can't be certain yet. Please refer 
[here](http://home.apache.org/~dongjin/post/apache-kafka-log4j2-support/) if 
you need a preview or a custom patch for [2.6.1, 2.8.0].
   
   cc/ @emveee @svudutala @priyavj08


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on pull request #11033: KAFKA-12988 Asynchronous API support for RemoteLogMetadataManager add/update methods.

2021-09-08 Thread GitBox


satishd commented on pull request #11033:
URL: https://github.com/apache/kafka/pull/11033#issuecomment-915357755


   Thanks @junrao for the review comments, addressed them with the latest 
commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on a change in pull request #11033: KAFKA-12988 Asynchronous API support for RemoteLogMetadataManager add/update methods.

2021-09-08 Thread GitBox


ccding commented on a change in pull request #11033:
URL: https://github.com/apache/kafka/pull/11033#discussion_r704546511



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ProducerManager.java
##
@@ -50,37 +51,38 @@ public 
ProducerManager(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig,
 topicPartitioner = rlmmTopicPartitioner;
 }
 
-public RecordMetadata publishMessage(RemoteLogMetadata remoteLogMetadata) 
throws KafkaException {
+public CompletableFuture publishMessage(RemoteLogMetadata 
remoteLogMetadata) {
+CompletableFuture future = new CompletableFuture<>();
+
 TopicIdPartition topicIdPartition = 
remoteLogMetadata.topicIdPartition();
 int metadataPartitionNum = 
topicPartitioner.metadataPartition(topicIdPartition);
 log.debug("Publishing metadata message of partition:[{}] into metadata 
topic partition:[{}] with payload: [{}]",
-topicIdPartition, metadataPartitionNum, remoteLogMetadata);
+  topicIdPartition, metadataPartitionNum, remoteLogMetadata);
 if (metadataPartitionNum >= rlmmConfig.metadataTopicPartitionsCount()) 
{
 // This should never occur as long as metadata partitions always 
remain the same.
 throw new KafkaException("Chosen partition no " + 
metadataPartitionNum +
  " must be less than the partition 
count: " + rlmmConfig.metadataTopicPartitionsCount());
 }
 
-ProducerCallback callback = new ProducerCallback();
 try {
+Callback callback = new Callback() {
+@Override
+public void onCompletion(RecordMetadata metadata,
+ Exception exception) {
+if (exception != null) {
+future.completeExceptionally(exception);
+} else {
+future.complete(metadata);
+}
+}
+};
 producer.send(new 
ProducerRecord<>(rlmmConfig.remoteLogMetadataTopicName(), metadataPartitionNum, 
null,
-serde.serialize(remoteLogMetadata)), callback).get();
-} catch (KafkaException e) {
-throw e;
-} catch (Exception e) {
-throw new KafkaException("Exception occurred while publishing 
message for topicIdPartition: " + topicIdPartition, e);
+   
serde.serialize(remoteLogMetadata)), callback);
+} catch (Exception ex) {
+future.completeExceptionally(ex);
 }

Review comment:
   do we want to remove printing the topic id in the exception?

##
File path: 
storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
##
@@ -62,16 +63,17 @@
  * @param remoteLogSegmentMetadata metadata about the remote log segment.
  * @throws RemoteStorageException   if there are any storage related 
errors occurred.
  * @throws IllegalArgumentException if the given metadata instance does 
not have the state as {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}
+ * @return a CompletableFuture which will complete once this operation is 
finished.
  */
-void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) throws RemoteStorageException;
+CompletableFuture 
addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) 
throws RemoteStorageException;
 
 /**
- * This method is used to update the {@link RemoteLogSegmentMetadata}. 
Currently, it allows to update with the new
+ * This method is used to update the {@link RemoteLogSegmentMetadata} 
asynchronously. Currently, it allows to update with the new
  * state based on the life cycle of the segment. It can go through the 
below state transitions.
  * 
  * 
  * +-++--+
- * |COPY_SEGMENT_STARTED |---|COPY_SEGMENT_FINISHED |
+ * |COPY_SEGMENT_STARTED |--->|COPY_SEGMENT_FINISHED |

Review comment:
   Can you verify if this could result in a correct HTML doc?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wycccccc commented on a change in pull request #11017: KAFKA-12950 Replace EasyMock and PowerMock with Mockito for KafkaStream

2021-09-08 Thread GitBox


wycc commented on a change in pull request #11017:
URL: https://github.com/apache/kafka/pull/11017#discussion_r704545691



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -111,9 +104,24 @@
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.atMostOnce;
+import static org.mockito.Mockito.isA;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.withSettings;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.MockitoAnnotations.openMocks;
 
-@RunWith(PowerMockRunner.class)

Review comment:
   I remember that I have modified all the streams and I have updated the 
module build dependencies.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

2021-09-08 Thread GitBox


C0urante commented on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-915350922


   > I also think that the behavior with the suggested approach and your option 
3 is still a lot better than the current situation.
   
   Agreed  
   
   > IIUC the `offset.flush.timeout.ms` would actually not be used anymore, as 
there actually are no timeouts as the offset commit thread doesn't block 
anymore.
   
   That's mostly correct--we wouldn't be waiting on a blocking operation while 
iterating through the dequeue(s), although we might still choose to block on 
the actual write to the offset topic in the [same way that we 
currently](https://github.com/apache/kafka/blob/fb77da941ac2a34513cf2cd5d11137ba9b275575/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L565-L586)
 do just for the sake of metrics and allowing users to monitor the health of 
the connection between the Connect worker and the offsets topic. Not a huge 
deal though, and the point that we wouldn't be blocking on the task's producer 
is still valid.
   
   I think the issue is less that we'd end up timing out and more that we'd end 
up violating the guarantee that's provided right now by the framework that each 
task gets to take up only `offset.flush.timeout.ms` milliseconds per offset 
commit attempt before aborting the attempt and yielding control to the next 
task. A dequeue-based approach may actually be worse than the current behavior 
in that regard if there's no check in place to ensure that iterating over the 
dequeue doesn't exceed the offset flush timeout. Probably worth the tradeoff, 
but we can probably satisfy both objectives with your suggestion:
   
   > another option might be to incur the iteration on the worker source task 
thread.
   
   I think this'd be great, especially with the snapshotting logic you mention, 
which should basically eliminate any blocking between the two threads except to 
prevent race conditions while simple operations like clearing a hash map or 
assigning a new value to an instance variable take place.
   
   One thing that gave me pause initially was the realization that we'd be 
double-iterating over every source record at this point: once to transform, 
convert, and dispatch the record to the producer, and then once to verify that 
it had been acknowledged while iterating over the dequeue it's in. But I can't 
imagine it'd make a serious difference with CPU utilization given that 
transformation, conversion, and dispatching to a producer are likely to be at 
least an order of magnitude more expensive than just checking a boolean flag 
and possibly inserting the record's offset into a hash map. And memory 
utilization should be very close to the existing approach, which already tracks 
every single unacknowledged record in the `outstandingMessages` and 
`outstandingMessagesBacklog` fields.
   
   I think this buys us enough that my earlier-mentioned option 2 (multiple 
threads for offset commits) isn't called for, since the only blocking operation 
that would be performed during offset commit at this point is a write to the 
offsets topic. If the offsets topic is unavailable, it's likely that the impact 
would be the same across all tasks (unless the task is using a separate offsets 
topic, which will become possible once the changes for KIP-618 are merged), and 
even if not, things wouldn't be made any worse than they already are: the 
offset flush timeout would expire, and the next task in line would get its 
chance to commit offsets.
   
   @rhauch If this is all agreeable I think we're ready to start implementing. 
Since you've provided a lot of the code yourself I'm happy to let you take on 
that work if you'd like; otherwise, I'll get started and see if I can have a 
new PR with these changes out by early next week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10643) Static membership - repetitive PreparingRebalance with updating metadata for member reason

2021-09-08 Thread John Gray (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17411926#comment-17411926
 ] 

John Gray edited comment on KAFKA-10643 at 9/8/21, 2:53 PM:


We were having this same issue with our new static consumers once their 
changelog topics got large enough. The group would never stabilize because of 
these looping metadata updates. We ended up stabilizing our groups by 
increasing max.poll.interval.ms and metadata.max.age.ms in our streams apps to 
longer than however long we expected our restore consumer to take restoring our 
large stores. 30 minutes ended up working for us. I am not sure if it is 
expected that a metadata update should trigger a rebalance for a static 
consumer group with lots of restoring threads, but it certainly sent our groups 
with large state into a frenzy. It has been a while so you may have moved on 
from this, but I would be curious to see if these configs help your group, 
[~maatdeamon].


was (Author: gray.john):
We were having this same issue with our new static consumers once their 
changelog topics got large enough. The group would never stabilize because of 
these looping metadata updates. We ended up stabilizing our groups by 
increasing max.poll.record.ms and metadata.max.age.ms in our streams apps to 
longer than however long we expected our restore consumer to take restoring our 
large stores. 30 minutes ended up working for us. I am not sure if it is 
expected that a metadata update should trigger a rebalance for a static 
consumer group with lots of restoring threads, but it certainly sent our groups 
with large state into a frenzy. It has been a while so you may have moved on 
from this, but I would be curious to see if these configs help your group, 
[~maatdeamon].

> Static membership - repetitive PreparingRebalance with updating metadata for 
> member reason
> --
>
> Key: KAFKA-10643
> URL: https://issues.apache.org/jira/browse/KAFKA-10643
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Eran Levy
>Priority: Major
> Attachments: broker-4-11.csv, client-4-11.csv, 
> client-d-9-11-11-2020.csv
>
>
> Kafka streams 2.6.0, brokers version 2.6.0. Kafka nodes are healthy, kafka 
> streams app is healthy. 
> Configured with static membership. 
> Every 10 minutes (I assume cause of topic.metadata.refresh.interval.ms), I 
> see the following group coordinator log for different stream consumers: 
> INFO [GroupCoordinator 2]: Preparing to rebalance group **--**-stream in 
> state PreparingRebalance with old generation 12244 (__consumer_offsets-45) 
> (reason: Updating metadata for member 
> -stream-11-1-013edd56-ed93-4370-b07c-1c29fbe72c9a) 
> (kafka.coordinator.group.GroupCoordinator)
> and right after that the following log: 
> INFO [GroupCoordinator 2]: Assignment received from leader for group 
> **-**-stream for generation 12246 (kafka.coordinator.group.GroupCoordinator)
>  
> Looked a bit on the kafka code and Im not sure that I get why such a thing 
> happening - is this line described the situation that happens here re the 
> "reason:"?[https://github.com/apache/kafka/blob/7ca299b8c0f2f3256c40b694078e422350c20d19/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L311]
> I also dont see it happening too often in other kafka streams applications 
> that we have. 
> The only thing suspicious that I see around every hour that different pods of 
> that kafka streams application throw this exception: 
> {"timestamp":"2020-10-25T06:44:20.414Z","level":"INFO","thread":"**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1","logger":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer
>  
> clientId=**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1-restore-consumer,
>  groupId=null] Error sending fetch request (sessionId=34683236, epoch=2872) 
> to node 
> 3:","context":"default","exception":"org.apache.kafka.common.errors.DisconnectException:
>  null\n"}
> I came across this strange behaviour after stated to investigate a strange 
> stuck rebalancing state after one of the members left the group and caused 
> the rebalance to stuck - the only thing that I found is that maybe because 
> that too often preparing to rebalance states, the app might affected of this 
> bug - KAFKA-9752 ?
> I dont understand why it happens, it wasn't before I applied static 
> membership to that kafka streams application (since around 2 weeks ago). 
> Will be happy if you can help me
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10643) Static membership - repetitive PreparingRebalance with updating metadata for member reason

2021-09-08 Thread John Gray (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17411982#comment-17411982
 ] 

John Gray commented on KAFKA-10643:
---

[~maatdeamon] we did set both max.poll.interval.ms and metadata.max.age.ms 
equal at 30minutes, but I think the key is that it be larger than however long 
your state stores take to restore. Assuming you have state stores to restore. 
We could certainly be running into similar but different problems.

> Static membership - repetitive PreparingRebalance with updating metadata for 
> member reason
> --
>
> Key: KAFKA-10643
> URL: https://issues.apache.org/jira/browse/KAFKA-10643
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Eran Levy
>Priority: Major
> Attachments: broker-4-11.csv, client-4-11.csv, 
> client-d-9-11-11-2020.csv
>
>
> Kafka streams 2.6.0, brokers version 2.6.0. Kafka nodes are healthy, kafka 
> streams app is healthy. 
> Configured with static membership. 
> Every 10 minutes (I assume cause of topic.metadata.refresh.interval.ms), I 
> see the following group coordinator log for different stream consumers: 
> INFO [GroupCoordinator 2]: Preparing to rebalance group **--**-stream in 
> state PreparingRebalance with old generation 12244 (__consumer_offsets-45) 
> (reason: Updating metadata for member 
> -stream-11-1-013edd56-ed93-4370-b07c-1c29fbe72c9a) 
> (kafka.coordinator.group.GroupCoordinator)
> and right after that the following log: 
> INFO [GroupCoordinator 2]: Assignment received from leader for group 
> **-**-stream for generation 12246 (kafka.coordinator.group.GroupCoordinator)
>  
> Looked a bit on the kafka code and Im not sure that I get why such a thing 
> happening - is this line described the situation that happens here re the 
> "reason:"?[https://github.com/apache/kafka/blob/7ca299b8c0f2f3256c40b694078e422350c20d19/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L311]
> I also dont see it happening too often in other kafka streams applications 
> that we have. 
> The only thing suspicious that I see around every hour that different pods of 
> that kafka streams application throw this exception: 
> {"timestamp":"2020-10-25T06:44:20.414Z","level":"INFO","thread":"**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1","logger":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer
>  
> clientId=**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1-restore-consumer,
>  groupId=null] Error sending fetch request (sessionId=34683236, epoch=2872) 
> to node 
> 3:","context":"default","exception":"org.apache.kafka.common.errors.DisconnectException:
>  null\n"}
> I came across this strange behaviour after stated to investigate a strange 
> stuck rebalancing state after one of the members left the group and caused 
> the rebalance to stuck - the only thing that I found is that maybe because 
> that too often preparing to rebalance states, the app might affected of this 
> bug - KAFKA-9752 ?
> I dont understand why it happens, it wasn't before I applied static 
> membership to that kafka streams application (since around 2 weeks ago). 
> Will be happy if you can help me
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13278) Deserialization behavior of the Fetcher class does not match up with API contract of the Deserializer interface

2021-09-08 Thread Julian Reichinger (Jira)
Julian Reichinger created KAFKA-13278:
-

 Summary: Deserialization behavior of the Fetcher class does not 
match up with API contract of the Deserializer interface
 Key: KAFKA-13278
 URL: https://issues.apache.org/jira/browse/KAFKA-13278
 Project: Kafka
  Issue Type: Bug
  Components: clients, documentation
Affects Versions: 2.6.0
Reporter: Julian Reichinger


The documentation of the
{noformat}
org.apache.kafka.common.serialization.Deserializer{noformat}
interface states that implementations have to expect null byte-arrays and 
should handle them in a meaningful way.

However, at least in the kafka client it seems to be impossible to actually get 
a null value into a deserializer because the class
{noformat}
org.apache.kafka.clients.consumer.internals.Fetcher{noformat}
does not call the registered deserializer in case of a null value.
{code:java}
private ConsumerRecord parseRecord(TopicPartition partition,
 RecordBatch batch,
 Record record) {
try {
long offset = record.offset();
long timestamp = record.timestamp();
Optional leaderEpoch = 
maybeLeaderEpoch(batch.partitionLeaderEpoch());
TimestampType timestampType = batch.timestampType();
Headers headers = new RecordHeaders(record.headers());
ByteBuffer keyBytes = record.key();
byte[] keyByteArray = keyBytes == null ? null : 
Utils.toArray(keyBytes);
K key = keyBytes == null ? null : 
this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
ByteBuffer valueBytes = record.value();
byte[] valueByteArray = valueBytes == null ? null : 
Utils.toArray(valueBytes);
V value = valueBytes == null ? null : 
this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
return new ConsumerRecord<>(partition.topic(), 
partition.partition(), offset,
timestamp, timestampType, 
record.checksumOrNull(),
keyByteArray == null ? 
ConsumerRecord.NULL_SIZE : keyByteArray.length,
valueByteArray == null ? 
ConsumerRecord.NULL_SIZE : valueByteArray.length,
key, value, headers, leaderEpoch);
} catch (RuntimeException e) {
throw new SerializationException("Error deserializing key/value for 
partition " + partition +
" at offset " + record.offset() + ". If needed, please seek 
past the record to continue consumption.", e);
}
}
{code}
I implemented an ErrorHandlingDeserializer which I use to wrap the actual 
deserializers and which records the result (value or exception) in a container 
object.
{code:java}
/**
 * Handles exceptions during de-serializations thrown by a delegate {@link 
Deserializer}.
 *
 * @param  type of the deserialized object
 */
final class ErrorHandlingDeserializer implements Deserializer> 
{

  private final Deserializer> delegate;

  private ErrorHandlingDeserializer(Deserializer> delegate) {
this.delegate = requireNonNull(delegate);
  }

  static  ErrorHandlingDeserializer wrap(Deserializer> 
delegate) {
return new ErrorHandlingDeserializer<>(delegate);
  }

  @Override
  public ReadResult deserialize(String topic, @Nullable byte[] data) {
try {
  return ReadResult.successful(delegate.deserialize(topic, data));
} catch (Exception e) {
  return ReadResult.failed(e);
}
  }
}
{code}
This deserializer cannot produce a null value. However, because of the Fetcher 
behavior I still have to check for null values in the consumer records at every 
usage and additionally I also have to check for a null value inside the 
ReadResult container class, because the Deserializer API says so and I have no 
guarantee that the Fetcher behavior will never change.

In my opinion this behavior is a bug, because everyone implementing a 
Deserializer would expect to actually receive null values (for example in case 
of deletions). There should either be a guarantee on the client side that 
Deserializers always receive null values or that they never receive null values.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10643) Static membership - repetitive PreparingRebalance with updating metadata for member reason

2021-09-08 Thread Maatari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17411932#comment-17411932
 ] 

Maatari edited comment on KAFKA-10643 at 9/8/21, 1:36 PM:
--

[~gray.john] Thank you so much. this is still relevant to  my team. We have not 
solved the problems. Quick question, to how long did you set 
metadata.max.age.ms  ? We already increase the max.poll.records.ms, so i am 
curious how the metadata.max.age.ms actually impact the situation, given that  
by default it is already 5 minutes, should it be proprtional to   
max.poll.records.ms somehow ?


was (Author: maatdeamon):
[~gray.john] Thank you so much. this is still relevant to  my team. We have not 
solved the problems. Quick question, to how long did you set 
metadata.max.age.ms  ?

> Static membership - repetitive PreparingRebalance with updating metadata for 
> member reason
> --
>
> Key: KAFKA-10643
> URL: https://issues.apache.org/jira/browse/KAFKA-10643
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Eran Levy
>Priority: Major
> Attachments: broker-4-11.csv, client-4-11.csv, 
> client-d-9-11-11-2020.csv
>
>
> Kafka streams 2.6.0, brokers version 2.6.0. Kafka nodes are healthy, kafka 
> streams app is healthy. 
> Configured with static membership. 
> Every 10 minutes (I assume cause of topic.metadata.refresh.interval.ms), I 
> see the following group coordinator log for different stream consumers: 
> INFO [GroupCoordinator 2]: Preparing to rebalance group **--**-stream in 
> state PreparingRebalance with old generation 12244 (__consumer_offsets-45) 
> (reason: Updating metadata for member 
> -stream-11-1-013edd56-ed93-4370-b07c-1c29fbe72c9a) 
> (kafka.coordinator.group.GroupCoordinator)
> and right after that the following log: 
> INFO [GroupCoordinator 2]: Assignment received from leader for group 
> **-**-stream for generation 12246 (kafka.coordinator.group.GroupCoordinator)
>  
> Looked a bit on the kafka code and Im not sure that I get why such a thing 
> happening - is this line described the situation that happens here re the 
> "reason:"?[https://github.com/apache/kafka/blob/7ca299b8c0f2f3256c40b694078e422350c20d19/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L311]
> I also dont see it happening too often in other kafka streams applications 
> that we have. 
> The only thing suspicious that I see around every hour that different pods of 
> that kafka streams application throw this exception: 
> {"timestamp":"2020-10-25T06:44:20.414Z","level":"INFO","thread":"**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1","logger":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer
>  
> clientId=**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1-restore-consumer,
>  groupId=null] Error sending fetch request (sessionId=34683236, epoch=2872) 
> to node 
> 3:","context":"default","exception":"org.apache.kafka.common.errors.DisconnectException:
>  null\n"}
> I came across this strange behaviour after stated to investigate a strange 
> stuck rebalancing state after one of the members left the group and caused 
> the rebalance to stuck - the only thing that I found is that maybe because 
> that too often preparing to rebalance states, the app might affected of this 
> bug - KAFKA-9752 ?
> I dont understand why it happens, it wasn't before I applied static 
> membership to that kafka streams application (since around 2 weeks ago). 
> Will be happy if you can help me
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12857) Using Connect Sink with CooperativeStickyAssignor results in commit offsets failure

2021-09-08 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-12857.
---
  Assignee: (was: dgd_contributor)
Resolution: Duplicate

> Using Connect Sink with CooperativeStickyAssignor results in commit offsets 
> failure
> ---
>
> Key: KAFKA-12857
> URL: https://issues.apache.org/jira/browse/KAFKA-12857
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.7.1
> Environment: Linux
>Reporter: Oliver Hsu
>Priority: Major
>
> We are attempting to use a Kafka Connect Sink Connector with 
> {{CooperativeStickyAssignor}} assignment strategy.  When we use 
> {{CooperativeStickyAssignor}} offset commits sometimes fail with 
> {{[2021-05-26 22:03:36,435] WARN WorkerSinkTask\{id=sink-connector-7} 
> Ignoring invalid task provided offset 
> mytopic-0/OffsetAndMetadata\{offset=16305575, leaderEpoch=null, metadata=''} 
> – partition not assigned, assignment=[mytopic-0] 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:434)}}
> Note that the invalid partition in the warning message matches the partition 
> assignment.
> *Config changes*
> {{consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor}}
> *Cooperative vs Eager Assignment Strategy background*
>  
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP429:KafkaConsumerIncrementalRebalanceProtocol-ConsumerRebalanceListenerandConsumerPartitionAssignorSemantics]
> With eager assignment:
> {quote}Listener#onPartitionsAssigned: called on the full set of assigned 
> partitions (may have overlap with the partitions passed to 
> #onPartitionsRevoked
> {quote}
> With cooperative assignment:
> {quote}Listener#onPartitionsAssigned: called on the subset of assigned 
> partitions that were not previously owned before this rebalance. There should 
> be no overlap with the revoked partitions (if any). This will always be 
> called, even if there are no new partitions being assigned to a given member.
> {quote}
> This means with cooperative assignment, `onPartitionsAssigned` may be called 
> with a partial assignment or an empty collection.
> However, the 
> [WorkerSinkTask.HandleRebalance|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L669-L680]
>  class makes the assumption that `onPartitionsAssigned` is called with the 
> full set of assigned partitions which is true for eager but not coooperative.
> {code:java|title=WorkerSinkTask.HandleRebalance.java|borderStyle=solid}
> public void onPartitionsAssigned(Collection 
> partitions) {
> log.debug("{} Partitions assigned {}", WorkerSinkTask.this, 
> partitions);
> lastCommittedOffsets = new HashMap<>();
> currentOffsets = new HashMap<>();
> for (TopicPartition tp : partitions) {
> long pos = consumer.position(tp);
> lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos));
> currentOffsets.put(tp, new OffsetAndMetadata(pos));
> log.debug("{} Assigned topic partition {} with offset {}", 
> WorkerSinkTask.this, tp, pos);
> }
> {code}
> The {{onPartitionsAssigned}} creates a new empty {{HashMap}} and puts the 
> offsets of the {{partitions}} in that {{HashMap}}.
> In the logs we see
>  {{[2021-05-26 22:02:09,785] DEBUG WorkerSinkTask\{id=sink-connector-7} 
> Partitions assigned [myTopic-0] 
> (org.apache.kafka.connect.runtime.WorkerSinkTask:677)}}
>  {{[2021-05-26 22:02:13,063] DEBUG WorkerSinkTask\{id=sink-connector-7} 
> Partitions assigned [] (org.apache.kafka.connect.runtime.WorkerSinkTask:677)}}
>  {{[2021-05-26 22:02:16,074] DEBUG WorkerSinkTask\{id=sink-connector-7} }} 
> Partitions assigned [] (org.apache.kafka.connect.runtime.WorkerSinkTask:677)}}
> These logs show that the {{CooperativeStickyAssignor}} calls 
> {{onPartitionsAssigned}} first with the partition assigned to it followed by 
> additional calls with an empty {{partitions}} collection.
> When {{HandleRebalance.onPartitionsAssigned}} is called first with the 
> assigned partition followed by empty collections, the result will be 
> {{lastCommittedOffsets}} initialized to an empty {{HashMap}}.
> Inside 
> [WorkerSinkTask.commitOffsets|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L415-L419],
>  the current {{committableOffsets}} are based on the 
> {{lastCommittedOffsets}}, which is an empty {{HashMap}}:
> {code:java|title=WorkerSinkTask.java|borderStyle=solid}
> private void commitOffsets(long now, boolean closing) {
> ...
> 

[jira] [Commented] (KAFKA-10643) Static membership - repetitive PreparingRebalance with updating metadata for member reason

2021-09-08 Thread Maatari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17411932#comment-17411932
 ] 

Maatari commented on KAFKA-10643:
-

[~gray.john] Thank you so much. this is still relevant to  my team. We have not 
solved the problems. Quick question, to how long did you set 
metadata.max.age.ms  ?

> Static membership - repetitive PreparingRebalance with updating metadata for 
> member reason
> --
>
> Key: KAFKA-10643
> URL: https://issues.apache.org/jira/browse/KAFKA-10643
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Eran Levy
>Priority: Major
> Attachments: broker-4-11.csv, client-4-11.csv, 
> client-d-9-11-11-2020.csv
>
>
> Kafka streams 2.6.0, brokers version 2.6.0. Kafka nodes are healthy, kafka 
> streams app is healthy. 
> Configured with static membership. 
> Every 10 minutes (I assume cause of topic.metadata.refresh.interval.ms), I 
> see the following group coordinator log for different stream consumers: 
> INFO [GroupCoordinator 2]: Preparing to rebalance group **--**-stream in 
> state PreparingRebalance with old generation 12244 (__consumer_offsets-45) 
> (reason: Updating metadata for member 
> -stream-11-1-013edd56-ed93-4370-b07c-1c29fbe72c9a) 
> (kafka.coordinator.group.GroupCoordinator)
> and right after that the following log: 
> INFO [GroupCoordinator 2]: Assignment received from leader for group 
> **-**-stream for generation 12246 (kafka.coordinator.group.GroupCoordinator)
>  
> Looked a bit on the kafka code and Im not sure that I get why such a thing 
> happening - is this line described the situation that happens here re the 
> "reason:"?[https://github.com/apache/kafka/blob/7ca299b8c0f2f3256c40b694078e422350c20d19/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L311]
> I also dont see it happening too often in other kafka streams applications 
> that we have. 
> The only thing suspicious that I see around every hour that different pods of 
> that kafka streams application throw this exception: 
> {"timestamp":"2020-10-25T06:44:20.414Z","level":"INFO","thread":"**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1","logger":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer
>  
> clientId=**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1-restore-consumer,
>  groupId=null] Error sending fetch request (sessionId=34683236, epoch=2872) 
> to node 
> 3:","context":"default","exception":"org.apache.kafka.common.errors.DisconnectException:
>  null\n"}
> I came across this strange behaviour after stated to investigate a strange 
> stuck rebalancing state after one of the members left the group and caused 
> the rebalance to stuck - the only thing that I found is that maybe because 
> that too often preparing to rebalance states, the app might affected of this 
> bug - KAFKA-9752 ?
> I dont understand why it happens, it wasn't before I applied static 
> membership to that kafka streams application (since around 2 weeks ago). 
> Will be happy if you can help me
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10643) Static membership - repetitive PreparingRebalance with updating metadata for member reason

2021-09-08 Thread John Gray (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17411926#comment-17411926
 ] 

John Gray edited comment on KAFKA-10643 at 9/8/21, 1:23 PM:


We were having this same issue with our new static consumers once their 
changelog topics got large enough. The group would never stabilize because of 
these looping metadata updates. We ended up stabilizing our groups by 
increasing max.poll.record.ms and metadata.max.age.ms in our streams apps to 
longer than however long we expected our restore consumer to take restoring our 
large stores. 30 minutes ended up working for us. I am not sure if it is 
expected that a metadata update should trigger a rebalance for a static 
consumer group with lots of restoring threads, but it certainly sent our groups 
with large state into a frenzy. It has been a while so you may have moved on 
from this, but I would be curious to see if these configs help your group, 
[~maatdeamon].


was (Author: gray.john):
We were having this same issue with our new static consumers once their 
changelog topics got large enough. The group would never stabilize because of 
these looping metadata updates. We ended up stabilizing our groups by 
increasing max.poll.record.ms and metadata.max.age.ms in our streams apps to 
longer than however long we expected our restore consumer to take restoring our 
large stores. 30 minutes ended up working for us. I am not sure if this is 
expected that a metadata update should trigger a rebalance for a static 
consumer group with lots of restoring threads, but it certainly sent our groups 
with large state into a frenzy. It has been a while so you may have moved on 
from this, but I would be curious to see if these configs help your group, 
[~maatdeamon].

> Static membership - repetitive PreparingRebalance with updating metadata for 
> member reason
> --
>
> Key: KAFKA-10643
> URL: https://issues.apache.org/jira/browse/KAFKA-10643
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Eran Levy
>Priority: Major
> Attachments: broker-4-11.csv, client-4-11.csv, 
> client-d-9-11-11-2020.csv
>
>
> Kafka streams 2.6.0, brokers version 2.6.0. Kafka nodes are healthy, kafka 
> streams app is healthy. 
> Configured with static membership. 
> Every 10 minutes (I assume cause of topic.metadata.refresh.interval.ms), I 
> see the following group coordinator log for different stream consumers: 
> INFO [GroupCoordinator 2]: Preparing to rebalance group **--**-stream in 
> state PreparingRebalance with old generation 12244 (__consumer_offsets-45) 
> (reason: Updating metadata for member 
> -stream-11-1-013edd56-ed93-4370-b07c-1c29fbe72c9a) 
> (kafka.coordinator.group.GroupCoordinator)
> and right after that the following log: 
> INFO [GroupCoordinator 2]: Assignment received from leader for group 
> **-**-stream for generation 12246 (kafka.coordinator.group.GroupCoordinator)
>  
> Looked a bit on the kafka code and Im not sure that I get why such a thing 
> happening - is this line described the situation that happens here re the 
> "reason:"?[https://github.com/apache/kafka/blob/7ca299b8c0f2f3256c40b694078e422350c20d19/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L311]
> I also dont see it happening too often in other kafka streams applications 
> that we have. 
> The only thing suspicious that I see around every hour that different pods of 
> that kafka streams application throw this exception: 
> {"timestamp":"2020-10-25T06:44:20.414Z","level":"INFO","thread":"**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1","logger":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer
>  
> clientId=**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1-restore-consumer,
>  groupId=null] Error sending fetch request (sessionId=34683236, epoch=2872) 
> to node 
> 3:","context":"default","exception":"org.apache.kafka.common.errors.DisconnectException:
>  null\n"}
> I came across this strange behaviour after stated to investigate a strange 
> stuck rebalancing state after one of the members left the group and caused 
> the rebalance to stuck - the only thing that I found is that maybe because 
> that too often preparing to rebalance states, the app might affected of this 
> bug - KAFKA-9752 ?
> I dont understand why it happens, it wasn't before I applied static 
> membership to that kafka streams application (since around 2 weeks ago). 
> Will be happy if you can help me
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10643) Static membership - repetitive PreparingRebalance with updating metadata for member reason

2021-09-08 Thread John Gray (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17411926#comment-17411926
 ] 

John Gray commented on KAFKA-10643:
---

We were having this same issue with our new static consumers once their 
changelog topics got large enough. The group would never stabilize because of 
these looping metadata updates. We ended up stabilizing our groups by 
increasing max.poll.record.ms and metadata.max.age.ms in our streams apps to 
longer than however long we expected our restore consumer to take restoring our 
large stores. 30 minutes ended up working for us. I am not sure if this is 
expected that a metadata update should trigger a rebalance for a static 
consumer group with lots of restoring threads, but it certainly sent our groups 
with large state into a frenzy. It has been a while so you may have moved on 
from this, but I would be curious to see if these configs help your group, 
[~maatdeamon].

> Static membership - repetitive PreparingRebalance with updating metadata for 
> member reason
> --
>
> Key: KAFKA-10643
> URL: https://issues.apache.org/jira/browse/KAFKA-10643
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Eran Levy
>Priority: Major
> Attachments: broker-4-11.csv, client-4-11.csv, 
> client-d-9-11-11-2020.csv
>
>
> Kafka streams 2.6.0, brokers version 2.6.0. Kafka nodes are healthy, kafka 
> streams app is healthy. 
> Configured with static membership. 
> Every 10 minutes (I assume cause of topic.metadata.refresh.interval.ms), I 
> see the following group coordinator log for different stream consumers: 
> INFO [GroupCoordinator 2]: Preparing to rebalance group **--**-stream in 
> state PreparingRebalance with old generation 12244 (__consumer_offsets-45) 
> (reason: Updating metadata for member 
> -stream-11-1-013edd56-ed93-4370-b07c-1c29fbe72c9a) 
> (kafka.coordinator.group.GroupCoordinator)
> and right after that the following log: 
> INFO [GroupCoordinator 2]: Assignment received from leader for group 
> **-**-stream for generation 12246 (kafka.coordinator.group.GroupCoordinator)
>  
> Looked a bit on the kafka code and Im not sure that I get why such a thing 
> happening - is this line described the situation that happens here re the 
> "reason:"?[https://github.com/apache/kafka/blob/7ca299b8c0f2f3256c40b694078e422350c20d19/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L311]
> I also dont see it happening too often in other kafka streams applications 
> that we have. 
> The only thing suspicious that I see around every hour that different pods of 
> that kafka streams application throw this exception: 
> {"timestamp":"2020-10-25T06:44:20.414Z","level":"INFO","thread":"**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1","logger":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer
>  
> clientId=**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1-restore-consumer,
>  groupId=null] Error sending fetch request (sessionId=34683236, epoch=2872) 
> to node 
> 3:","context":"default","exception":"org.apache.kafka.common.errors.DisconnectException:
>  null\n"}
> I came across this strange behaviour after stated to investigate a strange 
> stuck rebalancing state after one of the members left the group and caused 
> the rebalance to stuck - the only thing that I found is that maybe because 
> that too often preparing to rebalance states, the app might affected of this 
> bug - KAFKA-9752 ?
> I dont understand why it happens, it wasn't before I applied static 
> membership to that kafka streams application (since around 2 weeks ago). 
> Will be happy if you can help me
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13257) KafkaStreams Support For Latest RocksDB Version

2021-09-08 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17411910#comment-17411910
 ] 

Bruno Cadonna commented on KAFKA-13257:
---

If you want to know the progress of the AK 3.0 release you can follow the dev 
mailing list to which you can subscribe here. The release is in its last phase, 
but there is no specific date.

Can you not use a different OS image that provides glibc to move forward?

> KafkaStreams Support For Latest RocksDB Version
> ---
>
> Key: KAFKA-13257
> URL: https://issues.apache.org/jira/browse/KAFKA-13257
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Alagukannan
>Priority: Major
> Attachments: hs_err_pid6.log
>
>
> Hi,
>  Can you please let us know if there is any plan for adding the latest 
> versions of rocksDB in kafka streams. If your planning it what's the timeline 
> we are looking at. If not planning to upgrade what's the reason behind it. Is 
> there any significant impact on upgrading like backward combability etc.. 
> Just to remind this general query to know about the rocksdb upgrade and its 
> impact on streams application.
> The main pain point behind asking this upgrade is, We tried to build an 
> application with kafka streams 2.8.0 on an alpine based OS and the docker 
> base image is as follows  
> azul/zulu-openjdk-alpine:11.0.12-11.50.19-jre-headless.  The streams 
> application worked fine until it had an interaction with state 
> store(rocksdb). The jvm crashed with the following error:
>  #
>  # A fatal error has been detected by the Java Runtime Environment:
>  #
>  # SIGSEGV (0xb) at pc=0x7f9551951b27, pid=6, tid=207
>  #
>  # JRE version: OpenJDK Runtime Environment Zulu11.45+27-CA (11.0.10+9) 
> (build 11.0.10+9-LTS)
>  # Java VM: OpenJDK 64-Bit Server VM Zulu11.45+27-CA (11.0.10+9-LTS, mixed 
> mode, tiered, compressed oops, g1 gc, linux-amd64)
>  # Problematic frame:
>  # C [librocksdbjni15322693993163550519.so+0x271b27] 
> std::_Rb_tree, 
> std::less, std::allocator 
> >::_M_erase(std::_Rb_tree_node*)+0x27
> Then we found out rocksdb works well on glibc and not musl lib, where as 
> alpine supports musl lib alone for native dependencies. Further looking into 
> rocksdb for a solution we found that they have started supporting both glib 
> and musl native libs from 6.5.x versions.
>  But latest kafka streams(2.8.0) is having rocksdb(5.18.x) version. This is 
> the main reason behind asking for the rocksDB upgrade in kafka streams as 
> well.
> Have attached the PID log where JVM failures are happening.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jlprat commented on pull request #11302: KAFKA-13243: KIP-773 Differentiate metric latency measured in ms and ns

2021-09-08 Thread GitBox


jlprat commented on pull request #11302:
URL: https://github.com/apache/kafka/pull/11302#issuecomment-915206008


   Failure was https://issues.apache.org/jira/browse/KAFKA-13128 
   The issue was already closed, but now the test failed for another different 
reason.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Reopened] (KAFKA-13128) Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread

2021-09-08 Thread Josep Prat (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josep Prat reopened KAFKA-13128:


Sorry to reopen this issue, it just occurred in this PR 
[https://github.com/apache/kafka/pull/11302]

It's a different error though:

{{}}
{code:java}
java.lang.AssertionError: Unexpected exception thrown while getting the value 
from store.
Expected: is (a string containing "Cannot get state store source-table because 
the stream thread is PARTITIONS_ASSIGNED, not RUNNING" or a string containing 
"The state store, source-table, may have migrated to another instance" or a 
string containing "Cannot get state store source-table because the stream 
thread is STARTING, not RUNNING")
  but: was "Cannot get state store source-table because the stream thread is 
PARTITIONS_REVOKED, not RUNNING"{code}
[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11302/3/testReport/junit/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldQueryStoresAfterAddingAndRemovingStreamThread/?cloudbees-analytics-link=scm-reporting%2Ftests%2Ffailed]

Let me know if I should have opened a new issue instead of reopening this one.

{{}}

> Flaky Test 
> StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
> 
>
> Key: KAFKA-13128
> URL: https://issues.apache.org/jira/browse/KAFKA-13128
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.0, 2.8.1
>Reporter: A. Sophie Blee-Goldman
>Assignee: Walker Carlson
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 3.1.0
>
>
> h3. Stacktrace
> java.lang.AssertionError: Expected: is not null but: was null 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$19(StoreQueryIntegrationTest.java:461)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:506)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:455)
>  
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11085/5/testReport/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread_2/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vamossagar12 commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

2021-09-08 Thread GitBox


vamossagar12 commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r704376366



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
 }
 }
 
+private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
   @patrickstuedi , I resorted to creating only 1 instance of 
DirectByteBuffer(one for putAll keys and values and one for range. I have 
checked in that code for reference. However, I don't see much of a difference 
in terms of throughput numbers. I ran it on the same setup(pushing 100 messages 
on a single partition topic and for each message, push 1M keys using putAll and 
read through range() call).
   
   I did 3 runs per approach (original v/s ByteBuffer) Here are the numbers:
   
   ```
   ByteBuffer
   Operator: putAll, Real: 87.206 CPU: 371.460 GC: 11.048 GCCount: 30 avg 
throughput: 1206503.194 op/s p95 throughput: 1423370.427 op/s p99 throughput: 
1437098.797 op/s 
   Operator: range, Real: 115.121 CPU: 328.090 GC: 7.925 GCCount: 20 avg 
throughput: 923332.052 op/s p95 throughput: 1261332.193 op/s p99 throughput: 
1308671.917 op/s 
   
   Operator: putAll, Real: 87.274 CPU: 382.920 GC: 11.477 GCCount: 29 avg 
throughput: 1211901.123 op/s p95 throughput: 1432304.571 op/s p99 throughput: 
1447671.672 op/s 
   Operator: range, Real: 116.886 CPU: 335.610 GC: 8.230 GCCount: 21 avg 
throughput: 911159.182 op/s p95 throughput: 1234183.861 op/s p99 throughput: 
1255574.938 op/s
   
   putAll, Real: 84.438 CPU: 366.390 GC: 10.992 GCCount: 29 avg throughput: 
1254820.856 op/s p95 throughput: 1481654.517 op/s p99 throughput: 1508043.762 
op/s 
   Operator: range, Real: 114.877 CPU: 338.090 GC: 8.465 GCCount: 21 avg 
throughput: 935155.065 op/s p95 throughput: 1254738.341 op/s p99 throughput: 
1280340.688 op/s
   
   
   Original
   Operator: putAll, Real: 95.037 CPU: 378.850 GC: 11.078 GCCount: 29 avg 
throughput: 1100406.576 op/s p95 throughput: 1292393.480 op/s p99 throughput: 
1297963.396 op/s 
   Operator: range, Real: 111.177 CPU: 328.180 GC: 8.299 GCCount: 21 avg 
throughput: 967757.168 op/s p95 throughput: 1281079.326 op/s p99 throughput: 
1296172.722 op/s
   
   Operator: putAll, Real: 95.186 CPU: 356.040 GC: 10.132 GCCount: 28 avg 
throughput: 1092794.645 op/s p95 throughput: 1257861.812 op/s p99 throughput: 
1286016.834 op/s 
   Operator: range, Real: 112.568 CPU: 347.350 GC: 9.163 GCCount: 25 avg 
throughput: 952179.810 op/s p95 throughput: 1298717.792 op/s p99 throughput: 
1323234.359 op/s 
   
   Operator: putAll, Real: 97.332 CPU: 400.690 GC: 12.000 GCCount: 30 avg 
throughput: 1079682.386 op/s p95 throughput: 1283925.766 op/s p99 throughput: 
1290499.661 op/s 
   Operator: range, Real: 109.653 CPU: 319.020 GC: 7.995 GCCount: 20 avg 
throughput: 980557.905 op/s p95 throughput: 1298695.144 op/s p99 throughput: 
1313645.941 op/s
   ```
   
   One thing that I am noticing is that range is better with the original 
implementation. This is contrary to the initial number s I had posted 
[here](https://github.com/apache/kafka/pull/10798#issuecomment-872398600) which 
was wrong due to a bug in my implementation back then.
   
   Also, the discussion we had about the capacity allocation rocksdb places a 
limit of 8MB on keys and 3 GB for values. Either ways, it doesn't recommend 
storing large keys/values. Wanted to know if there are any configs for 
keys/values sizes for stores? Don't think I could find it.
   
   Lastlty, with the pre-allocation approach for Interactive queries which 
allows multi threaded access, do we want to restirct by N as you specified 
above?
   
   What do you. and (@guozhangwang / @cadonna ) think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

2021-09-08 Thread GitBox


vamossagar12 commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-915194995


   > > Okay, let me re-trigger the tests.
   > 
   > Thanks.. This time there's a compilation error due to benchmarks. I will 
remove that class.
   
   @guozhangwang , i removed that. class.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna merged pull request #11250: Kafka 12766 - Disabling WAL-related Options in RocksDB

2021-09-08 Thread GitBox


cadonna merged pull request #11250:
URL: https://github.com/apache/kafka/pull/11250


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #11250: Kafka 12766 - Disabling WAL-related Options in RocksDB

2021-09-08 Thread GitBox


cadonna commented on pull request #11250:
URL: https://github.com/apache/kafka/pull/11250#issuecomment-915167984


   Build failures are unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…

2021-09-08 Thread GitBox


vamossagar12 commented on a change in pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#discussion_r704335821



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
##
@@ -292,13 +408,46 @@ public V fetch(final K key,
 time);
 }
 
+private long getActualWindowStartTime(final long timeFrom) {
+return Math.max(timeFrom, ((PersistentWindowStore) 
wrapped()).getObservedStreamTime() - retentionPeriod + 1);
+}
+
+private KeyValueIterator, V> filterExpiredRecords(final 
boolean forward) {
+final KeyValueIterator, byte[]> 
allWindowedKeyValueIterator = forward ? wrapped().all() : 
wrapped().backwardAll();
+
+final long observedStreamTime = ((PersistentWindowStore) wrapped()).getObservedStreamTime();
+if (!allWindowedKeyValueIterator.hasNext() || observedStreamTime == 
ConsumerRecord.NO_TIMESTAMP)
+return new 
MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, 
streamsMetrics, serdes, time);
+
+final long windowStartBoundary = observedStreamTime - retentionPeriod 
+ 1;
+final List, byte[]>> 
windowedKeyValuesInBoundary = new ArrayList<>();
+
+while (allWindowedKeyValueIterator.hasNext()) {
+final KeyValue, byte[]> next = 
allWindowedKeyValueIterator.next();
+if 
(next.key.window().endTime().isBefore(Instant.ofEpochMilli(windowStartBoundary)))
 {
+continue;
+}
+windowedKeyValuesInBoundary.add(next);
+}
+return new MeteredWindowedKeyValueIterator<>(new 
WindowedKeyValueIterator(windowedKeyValuesInBoundary.iterator()), fetchSensor, 
streamsMetrics, serdes, time);
+}

Review comment:
   hey.. let me know what you guys think about the above comment of mine. 
Based upon that, I will proceed with fixing the issue with the test case(which 
doesn't seem possible with the way I have implemented).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon edited a comment on pull request #11292: KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order

2021-09-08 Thread GitBox


showuon edited a comment on pull request #11292:
URL: https://github.com/apache/kafka/pull/11292#issuecomment-915106066


   @ableegoldman @guozhangwang , thanks for your comments. I agree that it 
doesn't need to be  integration test. I've moved it out from integration test. 
Thank you.
   
   Failed tests are unrelated.
   ```
   Build / JDK 8 and Scala 2.12 / 
kafka.server.KRaftClusterTest.testCreateClusterAndPerformReassignment()
   Build / JDK 8 and Scala 2.12 / 
kafka.server.KRaftClusterTest.testCreateClusterAndCreateListDeleteTopic()
   Build / JDK 11 and Scala 2.13 / 
kafka.server.DelayedOperationTest.testDelayedFuture()
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11292: KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order

2021-09-08 Thread GitBox


showuon commented on pull request #11292:
URL: https://github.com/apache/kafka/pull/11292#issuecomment-915106066


   @ableegoldman @guozhangwang , thanks for your comments. I agree that it 
doesn't need to be  integration test. I've moved it out from integration test. 
Thank you.
   
   Failed tests are unrelated.
   ```
   Build / JDK 8 and Scala 2.12 / 
kafka.server.KRaftClusterTest.testCreateClusterAndPerformReassignment()
   Build / JDK 8 and Scala 2.12 / 
kafka.server.KRaftClusterTest.testCreateClusterAndCreateListDeleteTopic()
   Build / JDK 11 and Scala 2.13 / 
kafka.server.DelayedOperationTest.testDelayedFuture()
   111


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-13260) FindCoordinator errorCounts does not handle v4

2021-09-08 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17411824#comment-17411824
 ] 

Mickael Maison edited comment on KAFKA-13260 at 9/8/21, 9:25 AM:
-

As things stood last week, it was affecting 3.0.0 RC0 and it wasn't clear if 
the fix would be merged into 3.0. This is why I tagged this JIRA this way. 

Thanks for updating it!


was (Author: mimaison):
As things stood last week, it was affecting 3.0.0 RC0 and it wasn't clear if 
the fix would be merged into 3.0. This is why I tagged this JIRA this way. 

> FindCoordinator errorCounts does not handle v4
> --
>
> Key: KAFKA-13260
> URL: https://issues.apache.org/jira/browse/KAFKA-13260
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Blocker
> Fix For: 3.0.0
>
>
> When using batch find coordinator (>=v4), errorCounts() does not correctly 
> compute the error count.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13260) FindCoordinator errorCounts does not handle v4

2021-09-08 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17411824#comment-17411824
 ] 

Mickael Maison commented on KAFKA-13260:


As things stood last week, it was affecting 3.0.0 RC0 and it wasn't clear if 
the fix would be merged into 3.0. This is why I tagged this JIRA this way. 

> FindCoordinator errorCounts does not handle v4
> --
>
> Key: KAFKA-13260
> URL: https://issues.apache.org/jira/browse/KAFKA-13260
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Blocker
> Fix For: 3.0.0
>
>
> When using batch find coordinator (>=v4), errorCounts() does not correctly 
> compute the error count.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison merged pull request #11288: KAFKA-13258/13259/13260: Fix error response generation

2021-09-08 Thread GitBox


mimaison merged pull request #11288:
URL: https://github.com/apache/kafka/pull/11288


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13256) Possible NPE in ConfigDef when rendering (enriched) RST or HTML when documentation is not set/NULL

2021-09-08 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17411820#comment-17411820
 ] 

René Kerner commented on KAFKA-13256:
-

[~mjsax] Currently there's no way to prevent that users of  ConfigDef do NOT 
provide a documentation.

I don't know who's consuming `org.apache.kafka.common.config.ConfigDef#toRst`, 
`org.apache.kafka.common.config.ConfigDef#toEnrichedRst` and 
`org.apache.kafka.common.config.ConfigDef#toHtml` but calling these methods in 
ConfigDef will fail with an NPE when there are ConfigKeys that don't have 
`documentation` set.

An alternative would be to add null-checks into all 
`org.apache.kafka.common.config.ConfigDef#define` methods, but this would be a 
drastic API breaking change.

> Possible NPE in ConfigDef when rendering (enriched) RST or HTML when 
> documentation is not set/NULL
> --
>
> Key: KAFKA-13256
> URL: https://issues.apache.org/jira/browse/KAFKA-13256
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.8.0
>Reporter: René Kerner
>Priority: Major
>   Original Estimate: 0.5h
>  Remaining Estimate: 0.5h
>
> While working on Debezium I discovered the following issue:
> When Kafka's ConfigDef renders the HTML or RST documentation representation 
> of the config definition, it requires `ConfigKey.documentation` member 
> variable to be a java.lang.String instance that's set to an actual value 
> different than NULL, else NPE happens:
> {code:java}
>  b.append(key.documentation.replaceAll("\n", ""));
> {code}
> {code:java}
>  for (String docLine : key.documentation.split("\n")) {
> {code}
>  
> When `documentation` is not set/NULL I suggest to either set a valid String 
> like "No documentation available" or skip that config key.
>  
> I could provide a PR to fix this soon.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison merged pull request #11300: KAFKA-13258/13259/13260: Fix error response generation

2021-09-08 Thread GitBox


mimaison merged pull request #11300:
URL: https://github.com/apache/kafka/pull/11300


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison merged pull request #11299: KAFKA-13258/KAFKA-13259: Fix error response generation

2021-09-08 Thread GitBox


mimaison merged pull request #11299:
URL: https://github.com/apache/kafka/pull/11299


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13277) Serialization of long tagged string in request/response throws BufferOverflowException

2021-09-08 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-13277:
---
Fix Version/s: 2.7.2

> Serialization of long tagged string in request/response throws 
> BufferOverflowException
> --
>
> Key: KAFKA-13277
> URL: https://issues.apache.org/jira/browse/KAFKA-13277
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Blocker
> Fix For: 3.0.0, 2.7.2, 2.8.1
>
>
> Size computation for tagged strings in the message generator is incorrect and 
> hence it works only for small strings (126 bytes or so) where the length 
> happens to be correct.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13277) Serialization of long tagged string in request/response throws BufferOverflowException

2021-09-08 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-13277:
---
Affects Version/s: 2.4.1
   2.5.1
   2.8.0
   2.7.1
   2.6.2

> Serialization of long tagged string in request/response throws 
> BufferOverflowException
> --
>
> Key: KAFKA-13277
> URL: https://issues.apache.org/jira/browse/KAFKA-13277
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.4.1, 2.5.1, 2.8.0, 2.7.1, 2.6.2
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Blocker
> Fix For: 3.0.0, 2.7.2, 2.8.1
>
>
> Size computation for tagged strings in the message generator is incorrect and 
> hence it works only for small strings (126 bytes or so) where the length 
> happens to be correct.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13264) backwardFetch in InMemoryWindowStore doesn't return in reverse order

2021-09-08 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-13264:
--
Description: 
When working on another PR, I found currently, the backwardFetch in 
InMemoryWindowStore doesn't return in reverse order when there are records in 
the same window.

ex: window size = 500,

input records:

key: "a", value: "aa", timestamp: 0 ==> will be in [0, 500] window

key: "b", value: "bb", timestamp: 10 ==> will be in [0, 500] window

 

So, internally, the "a" and "b" will be in the same segment.

when fetch in forward order:

"a" -> "b", which is expected

when fetch in backward order:

"a" -> "b", which is NOT expected (because we didn't make the segment iterator 
as descendingMap)

  was:
When working on another PR, I found currently, the backwardFetch in 
InMemoryWindowStore doesn't return in reverse order when there are records in 
the same window.

ex: window size = 500,

input records:

key: "a", value: "aa", timestamp: 0 ==> will be in [0, 500\] window

key: "b", value: "bb", timestamp: 10 ==> will be in [0, 500\] window

when fetch in forward order:

"a" -> "b", which is expected

when fetch in backward order:

"a" -> "b", which is NOT expected


> backwardFetch in InMemoryWindowStore doesn't return in reverse order
> 
>
> Key: KAFKA-13264
> URL: https://issues.apache.org/jira/browse/KAFKA-13264
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> When working on another PR, I found currently, the backwardFetch in 
> InMemoryWindowStore doesn't return in reverse order when there are records in 
> the same window.
> ex: window size = 500,
> input records:
> key: "a", value: "aa", timestamp: 0 ==> will be in [0, 500] window
> key: "b", value: "bb", timestamp: 10 ==> will be in [0, 500] window
>  
> So, internally, the "a" and "b" will be in the same segment.
> when fetch in forward order:
> "a" -> "b", which is expected
> when fetch in backward order:
> "a" -> "b", which is NOT expected (because we didn't make the segment 
> iterator as descendingMap)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on a change in pull request #11292: KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order

2021-09-08 Thread GitBox


showuon commented on a change in pull request #11292:
URL: https://github.com/apache/kafka/pull/11292#discussion_r704188256



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
##
@@ -80,6 +80,15 @@
 static final long SEGMENT_INTERVAL = 60_000L;
 static final long RETENTION_PERIOD = 2 * SEGMENT_INTERVAL;
 
+final long defaultStartTime = SEGMENT_INTERVAL - 4L;
+
+final KeyValue, String> zero = windowedPair(0, "zero", 
defaultStartTime);
+final KeyValue, String> one = windowedPair(1, "one", 
defaultStartTime + 1);
+final KeyValue, String> two = windowedPair(2, "two", 
defaultStartTime + 2);
+final KeyValue, String> three = windowedPair(3, "three", 
defaultStartTime + 2);

Review comment:
   Thanks for your comment, but here, I make it as `defaultStartTime + 2` 
on purpose, to test the case that when window starts time is the same, the 
forward/backward query API can return the order as expected. I updated the PR 
description to make it clear. Thank you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13277) Serialization of long tagged string in request/response throws BufferOverflowException

2021-09-08 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-13277:
---
Fix Version/s: 2.8.1

> Serialization of long tagged string in request/response throws 
> BufferOverflowException
> --
>
> Key: KAFKA-13277
> URL: https://issues.apache.org/jira/browse/KAFKA-13277
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Blocker
> Fix For: 3.0.0, 2.8.1
>
>
> Size computation for tagged strings in the message generator is incorrect and 
> hence it works only for small strings (126 bytes or so) where the length 
> happens to be correct.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] comptooladmin commented on a change in pull request #11292: KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order

2021-09-08 Thread GitBox


comptooladmin commented on a change in pull request #11292:
URL: https://github.com/apache/kafka/pull/11292#discussion_r704180358



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
##
@@ -80,6 +80,15 @@
 static final long SEGMENT_INTERVAL = 60_000L;
 static final long RETENTION_PERIOD = 2 * SEGMENT_INTERVAL;
 
+final long defaultStartTime = SEGMENT_INTERVAL - 4L;
+
+final KeyValue, String> zero = windowedPair(0, "zero", 
defaultStartTime);
+final KeyValue, String> one = windowedPair(1, "one", 
defaultStartTime + 1);
+final KeyValue, String> two = windowedPair(2, "two", 
defaultStartTime + 2);
+final KeyValue, String> three = windowedPair(3, "three", 
defaultStartTime + 2);

Review comment:
   Is this supposed to be defaultStartTime + **_3_**?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

2021-09-08 Thread GitBox


vamossagar12 commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-915026729


   > Okay, let me re-trigger the tests.
   
   Thanks.. This time there's a compilation error due to benchmarks. I will 
remove that class.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #11308: KAFKA-13277; Fix size calculation for tagged string fields in message generator

2021-09-08 Thread GitBox


rajinisivaram commented on pull request #11308:
URL: https://github.com/apache/kafka/pull/11308#issuecomment-915008347


   @cmccabe Thanks for reviewing and merging to master. @ijuma @dajac Yes, will 
cherry-pick to 3.0 and 2.8, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11294: KAFKA-13266; `InitialFetchState` should be created after partition is removed from the fetchers

2021-09-08 Thread GitBox


dajac commented on pull request #11294:
URL: https://github.com/apache/kafka/pull/11294#issuecomment-915005946


   @hachikuji Thanks for your review. I have addressed your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11294: KAFKA-13266; `InitialFetchState` should be created after partition is removed from the fetchers

2021-09-08 Thread GitBox


dajac commented on a change in pull request #11294:
URL: https://github.com/apache/kafka/pull/11294#discussion_r704137955



##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -3242,6 +3260,92 @@ class ReplicaManagerTest {
 }
   }
 
+  @Test
+  def testDeltaFollowerStopFetcherBeforeCreatingInitialFetchOffset(): Unit = {
+val localId = 1
+val otherId = localId + 1
+val topicPartition = new TopicPartition("foo", 0)
+
+val mockReplicaFetcherManager = 
Mockito.mock(classOf[ReplicaFetcherManager])
+val replicaManager = setupReplicaManagerWithMockedPurgatories(
+  timer = new MockTimer(time),
+  brokerId = localId,
+  mockReplicaFetcherManager = Some(mockReplicaFetcherManager)
+)
+
+try {
+  // The first call to removeFetcherForPartitions should be ignored.
+  Mockito.when(mockReplicaFetcherManager.removeFetcherForPartitions(
+Set(topicPartition))
+  ).thenReturn(Map.empty[TopicPartition, PartitionFetchState])
+
+  // Make the local replica the follower
+  var followerTopicsDelta = topicsCreateDelta(localId, false)
+  var followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+  replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+
+  // Check the state of that partition
+  val HostedPartition.Online(followerPartition) = 
replicaManager.getPartition(topicPartition)
+  assertFalse(followerPartition.isLeader)
+  assertEquals(0, followerPartition.getLeaderEpoch)
+  assertEquals(0, followerPartition.localLogOrException.logEndOffset)
+
+  // Verify that addFetcherForPartitions was called with the correct
+  // init offset.
+  Mockito.verify(mockReplicaFetcherManager, Mockito.times(1))
+.addFetcherForPartitions(
+  Map(topicPartition -> InitialFetchState(
+leader = BrokerEndPoint(otherId, "localhost", 9093),
+currentLeaderEpoch = 0,
+initOffset = 0
+  ))
+)
+
+  // The second call to removeFetcherForPartitions simulate the case
+  // where the fetcher write to the log before being shutdown.
+  Mockito.when(mockReplicaFetcherManager.removeFetcherForPartitions(
+Set(topicPartition))
+  ).thenAnswer { _ =>
+replicaManager.getPartition(topicPartition) match {
+  case HostedPartition.Online(partition) =>
+partition.appendRecordsToFollowerOrFutureReplica(
+  records = MemoryRecords.withRecords(CompressionType.NONE, 0,
+new SimpleRecord("first message".getBytes)),
+  isFuture = false
+)
+
+  case _ =>
+}
+
+Map.empty[TopicPartition, PartitionFetchState]
+  }
+
+  // Apply changes that bumps the leader epoch.
+  followerTopicsDelta = topicsChangeDelta(followerMetadataImage.topics(), 
localId, false)
+  followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+  replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+
+  assertFalse(followerPartition.isLeader)
+  assertEquals(1, followerPartition.getLeaderEpoch)
+  assertEquals(1, followerPartition.localLogOrException.logEndOffset)
+
+  // Verify that addFetcherForPartitions was called with the correct
+  // init offset.
+  Mockito.verify(mockReplicaFetcherManager, Mockito.times(1))
+.addFetcherForPartitions(
+  Map(topicPartition -> InitialFetchState(
+leader = BrokerEndPoint(otherId, "localhost", 9093),
+currentLeaderEpoch = 1,
+initOffset = 1
+  ))
+)
+} finally {
+  replicaManager.shutdown()
+}
+
+TestUtils.assertNoNonDaemonThreads(this.getClass.getName)

Review comment:
   I was also thinking about this as most of the tests have it but wanted 
to test/refactor this separately if you don't mind. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on a change in pull request #11302: KAFKA-13243: KIP-773 Differentiate metric latency measured in ms and ns

2021-09-08 Thread GitBox


jlprat commented on a change in pull request #11302:
URL: https://github.com/apache/kafka/pull/11302#discussion_r704133811



##
File path: docs/upgrade.html
##
@@ -22,9 +22,12 @@
 Notable changes in 
3.1.0
 
 Apache Kafka supports Java 17.
+The following metrics have been deprecated: 
bufferpool-wait-time-total, io-waittime-total,

Review comment:
   Added the link to the KIP!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11308: KAFKA-13277; Fix size calculation for tagged string fields in message generator

2021-09-08 Thread GitBox


dajac commented on pull request #11308:
URL: https://github.com/apache/kafka/pull/11308#issuecomment-914988398


   We could also include it in 2.8.1.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org