[GitHub] [kafka] kamalcph edited a comment on pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.

2021-08-03 Thread GitBox


kamalcph edited a comment on pull request #10602:
URL: https://github.com/apache/kafka/pull/10602#issuecomment-892406838


   @vvcephei 
   Thanks for figuring out the issue. Feel free to take over this patch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kamalcph commented on pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.

2021-08-03 Thread GitBox


kamalcph commented on pull request #10602:
URL: https://github.com/apache/kafka/pull/10602#issuecomment-892406838


   @vvcephei 
   Thanks for figuring out the issue. Feel free to take over this patch!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr removed a comment on pull request #11141: KAFKA-13142: Validate dynamic config alterations prior to forwarding them to the KRaft controller.

2021-08-03 Thread GitBox


dielhennr removed a comment on pull request #11141:
URL: https://github.com/apache/kafka/pull/11141#issuecomment-892359084


   @hachikuji @rajinisivaram since this build is green I’m fairly confident 
that dynamic configs in a zk cluster haven’t changed behavior. I am wondering 
if it would be better to have buggy validation than no validation at all for 
dynamic configs on KRaft brokers (assuming it is still buggy). I think with a 
few tests this week this will be good to go for 3.0. Do either of you 
anticipate problems here?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr edited a comment on pull request #11141: KAFKA-13142: Validate dynamic config alterations prior to forwarding them to the KRaft controller.

2021-08-03 Thread GitBox


dielhennr edited a comment on pull request #11141:
URL: https://github.com/apache/kafka/pull/11141#issuecomment-892359084


   @hachikuji @rajinisivaram since this build is green I’m fairly confident 
that dynamic configs in a zk cluster haven’t changed behavior. I am wondering 
if it would be better to have buggy validation than no validation at all for 
dynamic configs on KRaft brokers (assuming it is still buggy). I think with a 
few tests this week this will be good to go for 3.0. Do either of you 
anticipate problems here?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on pull request #11141: KAFKA-13142: Validate dynamic config alterations prior to forwarding them to the KRaft controller.

2021-08-03 Thread GitBox


dielhennr commented on pull request #11141:
URL: https://github.com/apache/kafka/pull/11141#issuecomment-892359084


   @hachikuji @rajinisivaram since this build is green I’m fairly confident 
that dynamic configs in a zk cluster haven’t changed behavior. I am wondering 
if it would be better to have buggy validation than no validation at all for 
dynamic configs on KRaft brokers (assuming it is still buggy).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on a change in pull request #11110: MINOR: move tiered storage related configs to a separate class within LogConfig

2021-08-03 Thread GitBox


ccding commented on a change in pull request #0:
URL: https://github.com/apache/kafka/pull/0#discussion_r682281792



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1434,6 +1434,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
   val zkEnableSecureAcls: Boolean = 
getBoolean(KafkaConfig.ZkEnableSecureAclsProp)
   val zkMaxInFlightRequests: Int = 
getInt(KafkaConfig.ZkMaxInFlightRequestsProp)
 
+  private val _tieredKafkaConfig = new RemoteLogManagerConfig(this)

Review comment:
   I had a hard time naming this variable. Would `_remoteLogManagerConfig` 
be better?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.

2021-08-03 Thread GitBox


vvcephei commented on pull request #10602:
URL: https://github.com/apache/kafka/pull/10602#issuecomment-892354449


   I went ahead and kicked off a full system test build to evaluate the other 
tests that got changed in this PR: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4633/console


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.

2021-08-03 Thread GitBox


vvcephei commented on pull request #10602:
URL: https://github.com/apache/kafka/pull/10602#issuecomment-892354208


   Ok, I figured it out. The problem was that this change was adding the new 
version(s) to the _metadata_ upgrade test, but there isn't a metadata upgrade 
between 2.x and 3.0. Since we're down to the wire now, I just went ahead and 
pushed a fix to this branch.
   
   It passes for me locally now:
   ```
   [john@arcturus kafka]$ 
TC_PATHS="tests/kafkatest/tests/streams/streams_application_upgrade_test.py::StreamsUpgradeTest.test_app_upgrade"
 _DUCKTAPE_OPTIONS='--parameters 
'\''{"from_version":"2.8.0","to_version":"3.1.0-SNAPSHOT","bounce_type":"full"}'\'
 bash tests/docker/run_tests.sh
   
   docker exec ducker01 bash -c "cd /opt/kafka-dev && ducktape --cluster-file 
/opt/kafka-dev/tests/docker/build/cluster.json  
./tests/kafkatest/tests/streams/streams_application_upgrade_test.py::StreamsUpgradeTest.test_app_upgrade
 --parameters 
'{"from_version":"2.8.0","to_version":"3.1.0-SNAPSHOT","bounce_type":"full"}'"
   [INFO:2021-08-03 21:27:59,868]: starting test run with session id 
2021-08-03--012...
   [INFO:2021-08-03 21:27:59,868]: running 1 tests...
   [INFO:2021-08-03 21:27:59,869]: Triggering test 1 of 1...
   [INFO:2021-08-03 21:27:59,878]: RunnerClient: Loading test {'directory': 
'/opt/kafka-dev/tests/kafkatest/tests/streams', 'file_name': 
'streams_application_upgrade_test.py', 'cls_name': 'StreamsUpgradeTest', 
'method_name': 'test_app_upgrade', 'injected_args': {'from_version': '2.8.0', 
'to_version': '3.1.0-SNAPSHOT', 'bounce_type': 'full'}}
   [INFO:2021-08-03 21:27:59,883]: RunnerClient: 
kafkatest.tests.streams.streams_application_upgrade_test.StreamsUpgradeTest.test_app_upgrade.from_version=2.8.0.to_version=3.1.0-SNAPSHOT.bounce_type=full:
 Setting up...
   [INFO:2021-08-03 21:27:59,884]: RunnerClient: 
kafkatest.tests.streams.streams_application_upgrade_test.StreamsUpgradeTest.test_app_upgrade.from_version=2.8.0.to_version=3.1.0-SNAPSHOT.bounce_type=full:
 Running...
   [INFO:2021-08-03 21:29:44,814]: RunnerClient: 
kafkatest.tests.streams.streams_application_upgrade_test.StreamsUpgradeTest.test_app_upgrade.from_version=2.8.0.to_version=3.1.0-SNAPSHOT.bounce_type=full:
 PASS
   [INFO:2021-08-03 21:29:44,816]: RunnerClient: 
kafkatest.tests.streams.streams_application_upgrade_test.StreamsUpgradeTest.test_app_upgrade.from_version=2.8.0.to_version=3.1.0-SNAPSHOT.bounce_type=full:
 Tearing down...
   [INFO:2021-08-03 21:30:03,898]: RunnerClient: 
kafkatest.tests.streams.streams_application_upgrade_test.StreamsUpgradeTest.test_app_upgrade.from_version=2.8.0.to_version=3.1.0-SNAPSHOT.bounce_type=full:
 Summary: 
   [INFO:2021-08-03 21:30:03,899]: RunnerClient: 
kafkatest.tests.streams.streams_application_upgrade_test.StreamsUpgradeTest.test_app_upgrade.from_version=2.8.0.to_version=3.1.0-SNAPSHOT.bounce_type=full:
 Data: None
   

   SESSION REPORT (ALL TESTS)
   ducktape version: 0.8.1
   session_id:   2021-08-03--012
   run time: 2 minutes 4.072 seconds
   tests run:1
   passed:   1
   failed:   0
   ignored:  0
   

   test_id:
kafkatest.tests.streams.streams_application_upgrade_test.StreamsUpgradeTest.test_app_upgrade.from_version=2.8.0.to_version=3.1.0-SNAPSHOT.bounce_type=full
   status: PASS
   run time:   2 minutes 4.014 seconds
   

   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.

2021-08-03 Thread GitBox


vvcephei commented on a change in pull request #10602:
URL: https://github.com/apache/kafka/pull/10602#discussion_r682270517



##
File path: 
streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
##
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.time.Instant;
+
+public class SmokeTestUtil {
+
+final static int END = Integer.MAX_VALUE;
+
+static ProcessorSupplier printProcessorSupplier(final 
String topic) {
+return printProcessorSupplier(topic, "");
+}
+
+static ProcessorSupplier printProcessorSupplier(final 
String topic, final String name) {
+return new ProcessorSupplier() {
+@Override
+public Processor get() {
+return new AbstractProcessor() {
+private int numRecordsProcessed = 0;
+private long smallestOffset = Long.MAX_VALUE;
+private long largestOffset = Long.MIN_VALUE;
+
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+System.out.println("[DEV] initializing processor: 
topic=" + topic + " taskId=" + context.taskId());

Review comment:
   Good eye :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.

2021-08-03 Thread GitBox


vvcephei commented on a change in pull request #10602:
URL: https://github.com/apache/kafka/pull/10602#discussion_r682270266



##
File path: 
streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
+
+public class SmokeTestClient extends SmokeTestUtil {
+
+private final String name;
+
+private KafkaStreams streams;
+private boolean uncaughtException = false;
+private boolean started;
+private volatile boolean closed;
+
+private static void addShutdownHook(final String name, final Runnable 
runnable) {
+if (name != null) {
+Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, 
runnable));
+} else {
+Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+}
+}
+
+private static File tempDirectory() {
+final String prefix = "kafka-";
+final File file;
+try {
+file = Files.createTempDirectory(prefix).toFile();
+} catch (final IOException ex) {
+throw new RuntimeException("Failed to create a temp dir", ex);
+}
+file.deleteOnExit();
+
+addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+try {
+Utils.delete(file);
+} catch (final IOException e) {
+System.out.println("Error deleting " + file.getAbsolutePath());
+e.printStackTrace(System.out);
+}
+});
+
+return file;
+}
+
+public SmokeTestClient(final String name) {
+this.name = name;
+}
+
+public boolean started() {
+return started;
+}
+
+public boolean closed() {
+return closed;
+}
+
+public void start(final Properties streamsProperties) {
+final Topology build = getTopology();
+streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
+
+final CountDownLatch countDownLatch = new CountDownLatch(1);
+streams.setStateListener((newState, oldState) -> {
+System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), 
oldState, newState);
+if (oldState == KafkaStreams.State.REBALANCING && newState == 
KafkaStreams.State.RUNNING) {
+started = true;
+countDownLatch.countDown();
+}
+
+if (newState == KafkaStreams.State.NOT_RUNNING) {
+closed = true;
+}
+});
+
+streams.setUncaughtExceptionHandler(e -> {
+System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+System.out.println(name + ": FAT

[GitHub] [kafka] dengziming commented on pull request #11173: MINOR: Support max timestamp in GetOffsetShell

2021-08-03 Thread GitBox


dengziming commented on pull request #11173:
URL: https://github.com/apache/kafka/pull/11173#issuecomment-892334259


   cc @dajac @thomaskwscott .🤝


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming opened a new pull request #11173: MINOR: Support max timestamp in GetOffsetShell

2021-08-03 Thread GitBox


dengziming opened a new pull request #11173:
URL: https://github.com/apache/kafka/pull/11173


   *More detailed description of your change*
   
   Support max timestamp in GetOffsetShell which we added in KIP-734
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #11110: MINOR: move tiered storage related configs to a separate class within LogConfig

2021-08-03 Thread GitBox


ijuma commented on a change in pull request #0:
URL: https://github.com/apache/kafka/pull/0#discussion_r682253716



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1434,6 +1434,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
   val zkEnableSecureAcls: Boolean = 
getBoolean(KafkaConfig.ZkEnableSecureAclsProp)
   val zkMaxInFlightRequests: Int = 
getInt(KafkaConfig.ZkMaxInFlightRequestsProp)
 
+  private val _tieredKafkaConfig = new RemoteLogManagerConfig(this)

Review comment:
   Why is this prefixed with `tiered` while the class name is prefixed with 
`Remote`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-03 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r682238334



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -248,12 +249,29 @@ public ByteBuffer subscriptionUserData(final Set 
topics) {
 handleRebalanceStart(topics);
 uniqueField++;
 
+final Set currentNamedTopologies;
+final Map taskOffsetSums;
+try {
+taskManager.topologyMetadata().lock();

Review comment:
   The lock was less for thread-safety purposes and more about atomicity 
w.r.t updating some bookkeeping/state. But I guess I removed a lot of that 
bookkeeping recently, so I'll do another pass and clean up this handling




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-03 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r682237513



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -53,44 +56,162 @@
 private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
 
 private final StreamsConfig config;
-private final SortedMap builders; // Keep 
sorted by topology name for readability
+private final TopologyVersion version;
+
+private final ConcurrentNavigableMap 
builders; // Keep sorted by topology name for readability
 
 private ProcessorTopology globalTopology;
-private Map globalStateStores = new HashMap<>();
-final Set allInputTopics = new HashSet<>();
+private final Map globalStateStores = new HashMap<>();
+private final Set allInputTopics = new HashSet<>();
+
+public static class TopologyVersion {
+public AtomicLong topologyVersion = new AtomicLong(0L); // the local 
topology version
+public Set assignedNamedTopologies = new HashSet<>(); // the 
named topologies whose tasks are actively assigned
+public ReentrantLock topologyLock = new ReentrantLock();
+public Condition topologyCV = topologyLock.newCondition();
+}
 
-public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+public TopologyMetadata(final InternalTopologyBuilder builder,
+final StreamsConfig config) {
+version = new TopologyVersion();
 this.config = config;
-builders = new TreeMap<>();
+builders = new ConcurrentSkipListMap<>();
 if (builder.hasNamedTopology()) {
 builders.put(builder.topologyName(), builder);
 } else {
 builders.put(UNNAMED_TOPOLOGY, builder);
 }
 }
 
-public TopologyMetadata(final SortedMap 
builders, final StreamsConfig config) {
+public TopologyMetadata(final ConcurrentNavigableMap builders,
+final StreamsConfig config) {
+version = new TopologyVersion();
 this.config = config;
+
 this.builders = builders;
 if (builders.isEmpty()) {
-log.debug("Building KafkaStreams app with no empty topology");
+log.debug("Starting up empty KafkaStreams app with no topology");
 }
 }
 
-public int getNumStreamThreads(final StreamsConfig config) {
-final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+public void updateCurrentAssignmentTopology(final Set 
assignedNamedTopologies) {
+version.assignedNamedTopologies = assignedNamedTopologies;
+}
 
-// If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
-if (builders.isEmpty()) {
-if (configuredNumStreamThreads != 0) {
-log.info("Overriding number of StreamThreads to zero for empty 
topology");
+/**
+ * @return the set of named topologies that the assignor distributed tasks 
for during the last rebalance
+ */
+public Set assignmentNamedTopologies() {
+return version.assignedNamedTopologies;
+}
+
+public long topologyVersion() {
+return version.topologyVersion.get();
+}
+
+public void lock() {

Review comment:
   This part of the code has been through a lot of refactoring and after 
the latest cleanup, I agree we should be able to avoid exposing it at all




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-03 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r682235008



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -935,6 +944,14 @@ void shutdown(final boolean clean) {
 return tasksToCloseDirty;
 }
 
+public void updateCurrentAssignmentTopology(final Set 
assignedNamedTopologies) {
+
topologyMetadata.updateCurrentAssignmentTopology(assignedNamedTopologies);

Review comment:
   Removed until it's needed, if it is




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-03 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r682234726



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -53,44 +56,162 @@
 private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
 
 private final StreamsConfig config;
-private final SortedMap builders; // Keep 
sorted by topology name for readability
+private final TopologyVersion version;
+
+private final ConcurrentNavigableMap 
builders; // Keep sorted by topology name for readability
 
 private ProcessorTopology globalTopology;
-private Map globalStateStores = new HashMap<>();
-final Set allInputTopics = new HashSet<>();
+private final Map globalStateStores = new HashMap<>();
+private final Set allInputTopics = new HashSet<>();
+
+public static class TopologyVersion {
+public AtomicLong topologyVersion = new AtomicLong(0L); // the local 
topology version
+public Set assignedNamedTopologies = new HashSet<>(); // the 
named topologies whose tasks are actively assigned
+public ReentrantLock topologyLock = new ReentrantLock();
+public Condition topologyCV = topologyLock.newCondition();
+}
 
-public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+public TopologyMetadata(final InternalTopologyBuilder builder,
+final StreamsConfig config) {
+version = new TopologyVersion();
 this.config = config;
-builders = new TreeMap<>();
+builders = new ConcurrentSkipListMap<>();
 if (builder.hasNamedTopology()) {
 builders.put(builder.topologyName(), builder);
 } else {
 builders.put(UNNAMED_TOPOLOGY, builder);
 }
 }
 
-public TopologyMetadata(final SortedMap 
builders, final StreamsConfig config) {
+public TopologyMetadata(final ConcurrentNavigableMap builders,
+final StreamsConfig config) {
+version = new TopologyVersion();
 this.config = config;
+
 this.builders = builders;
 if (builders.isEmpty()) {
-log.debug("Building KafkaStreams app with no empty topology");
+log.debug("Starting up empty KafkaStreams app with no topology");
 }
 }
 
-public int getNumStreamThreads(final StreamsConfig config) {
-final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+public void updateCurrentAssignmentTopology(final Set 
assignedNamedTopologies) {
+version.assignedNamedTopologies = assignedNamedTopologies;
+}
 
-// If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
-if (builders.isEmpty()) {
-if (configuredNumStreamThreads != 0) {
-log.info("Overriding number of StreamThreads to zero for empty 
topology");
+/**
+ * @return the set of named topologies that the assignor distributed tasks 
for during the last rebalance
+ */
+public Set assignmentNamedTopologies() {
+return version.assignedNamedTopologies;
+}
+
+public long topologyVersion() {
+return version.topologyVersion.get();
+}
+
+public void lock() {
+version.topologyLock.lock();
+}
+
+public void unlock() {
+version.topologyLock.unlock();
+}
+
+public InternalTopologyBuilder getBuilderForTopologyName(final String 
name) {
+return builders.get(name);
+}
+
+/**
+ * @throws IllegalStateException if the thread is not already holding the 
lock via TopologyMetadata#lock
+ */
+public void maybeWaitForNonEmptyTopology() {
+if (!version.topologyLock.isHeldByCurrentThread()) {
+throw new IllegalStateException("Must call lock() before 
attempting to wait on non-empty topology");
+}
+while (isEmpty()) {
+try {
+log.debug("Detected that the topology is currently empty, 
going to wait for something to be added");
+version.topologyCV.await();
+} catch (final InterruptedException e) {
+log.debug("StreamThread was interrupted while waiting on empty 
topology", e);
+}
+}
+}
+
+public void registerAndBuildNewTopology(final InternalTopologyBuilder 
newTopologyBuilder) {
+try {
+lock();
+version.topologyVersion.incrementAndGet();
+log.info("Adding NamedTopology {}, latest topology version is {}", 
newTopologyBuilder.topologyName(), version.topologyVersion.get());
+builders.put(newTopologyBuilder.topologyName(), 
newTopologyBuilder);
+buildAndVerifyTopology(newTopologyBuilder);
+version.topologyCV.signalAll();
+} finally {
+ 

[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-03 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r682233727



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -53,44 +56,162 @@
 private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
 
 private final StreamsConfig config;
-private final SortedMap builders; // Keep 
sorted by topology name for readability
+private final TopologyVersion version;
+
+private final ConcurrentNavigableMap 
builders; // Keep sorted by topology name for readability
 
 private ProcessorTopology globalTopology;
-private Map globalStateStores = new HashMap<>();
-final Set allInputTopics = new HashSet<>();
+private final Map globalStateStores = new HashMap<>();
+private final Set allInputTopics = new HashSet<>();
+
+public static class TopologyVersion {
+public AtomicLong topologyVersion = new AtomicLong(0L); // the local 
topology version
+public Set assignedNamedTopologies = new HashSet<>(); // the 
named topologies whose tasks are actively assigned
+public ReentrantLock topologyLock = new ReentrantLock();
+public Condition topologyCV = topologyLock.newCondition();
+}
 
-public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+public TopologyMetadata(final InternalTopologyBuilder builder,
+final StreamsConfig config) {
+version = new TopologyVersion();
 this.config = config;
-builders = new TreeMap<>();
+builders = new ConcurrentSkipListMap<>();
 if (builder.hasNamedTopology()) {
 builders.put(builder.topologyName(), builder);
 } else {
 builders.put(UNNAMED_TOPOLOGY, builder);
 }
 }
 
-public TopologyMetadata(final SortedMap 
builders, final StreamsConfig config) {
+public TopologyMetadata(final ConcurrentNavigableMap builders,
+final StreamsConfig config) {
+version = new TopologyVersion();
 this.config = config;
+
 this.builders = builders;
 if (builders.isEmpty()) {
-log.debug("Building KafkaStreams app with no empty topology");
+log.debug("Starting up empty KafkaStreams app with no topology");
 }
 }
 
-public int getNumStreamThreads(final StreamsConfig config) {
-final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+public void updateCurrentAssignmentTopology(final Set 
assignedNamedTopologies) {
+version.assignedNamedTopologies = assignedNamedTopologies;
+}
 
-// If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
-if (builders.isEmpty()) {
-if (configuredNumStreamThreads != 0) {
-log.info("Overriding number of StreamThreads to zero for empty 
topology");
+/**
+ * @return the set of named topologies that the assignor distributed tasks 
for during the last rebalance
+ */
+public Set assignmentNamedTopologies() {
+return version.assignedNamedTopologies;
+}
+
+public long topologyVersion() {
+return version.topologyVersion.get();
+}
+
+public void lock() {
+version.topologyLock.lock();
+}
+
+public void unlock() {
+version.topologyLock.unlock();
+}
+
+public InternalTopologyBuilder getBuilderForTopologyName(final String 
name) {
+return builders.get(name);
+}
+
+/**
+ * @throws IllegalStateException if the thread is not already holding the 
lock via TopologyMetadata#lock
+ */
+public void maybeWaitForNonEmptyTopology() {
+if (!version.topologyLock.isHeldByCurrentThread()) {
+throw new IllegalStateException("Must call lock() before 
attempting to wait on non-empty topology");
+}
+while (isEmpty()) {
+try {
+log.debug("Detected that the topology is currently empty, 
going to wait for something to be added");
+version.topologyCV.await();
+} catch (final InterruptedException e) {
+log.debug("StreamThread was interrupted while waiting on empty 
topology", e);
+}
+}
+}
+
+public void registerAndBuildNewTopology(final InternalTopologyBuilder 
newTopologyBuilder) {
+try {
+lock();
+version.topologyVersion.incrementAndGet();
+log.info("Adding NamedTopology {}, latest topology version is {}", 
newTopologyBuilder.topologyName(), version.topologyVersion.get());
+builders.put(newTopologyBuilder.topologyName(), 
newTopologyBuilder);
+buildAndVerifyTopology(newTopologyBuilder);
+version.topologyCV.signalAll();
+} finally {
+ 

[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-03 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r682231789



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##
@@ -105,31 +105,76 @@ private KafkaStreamsNamedTopologyWrapper(final 
Collection topolog
 (v1, v2) -> {
 throw new IllegalArgumentException("Topology names 
must be unique");
 },
-() -> new TreeMap<>())),
+() -> new ConcurrentSkipListMap<>())),
 config),
 config,
 clientSupplier
 );
-for (final NamedTopology topology : topologies) {
-nameToTopology.put(topology.name(), topology);
-}
 }
 
-public NamedTopology getTopologyByName(final String name) {
-if (nameToTopology.containsKey(name)) {
-return nameToTopology.get(name);
-} else {
-throw new IllegalArgumentException("Unable to locate a 
NamedTopology called " + name);
+/**
+ * @return the NamedTopology for the specific name, or Optional.empty() if 
the application has no NamedTopology of that name
+ */
+public Optional getTopologyByName(final String name) {
+return 
Optional.ofNullable(topologyMetadata.getBuilderForTopologyName(name)).map(InternalTopologyBuilder::namedTopology);
+}
+
+/**
+ * Add a new NamedTopology to a running Kafka Streams app. If multiple 
instances of the application are running,
+ * you should inform all of them by calling {@link 
#addNamedTopology(NamedTopology)} on each client in order for
+ * it to begin processing the new topology.
+ *
+ * @throws IllegalArgumentException if this topology name is already in use
+ * @throws IllegalStateExceptionif streams has not been started or has 
already shut down
+ * @throws TopologyExceptionif this topology subscribes to any 
input topics or pattern already in use
+ */
+public void addNamedTopology(final NamedTopology newTopology) {
+if (hasStartedOrFinishedShuttingDown()) {
+throw new IllegalStateException("Cannot add a NamedTopology while 
the state is " + super.state);
+} else if (getTopologyByName(newTopology.name()).isPresent()) {
+throw new IllegalArgumentException("Unable to add the new 
NamedTopology " + newTopology.name() +
+   " as another of the same 
name already exists");
 }
+
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
 }
 
-public void addNamedTopology(final NamedTopology topology) {
-nameToTopology.put(topology.name(), topology);
-throw new UnsupportedOperationException();
+/**
+ * Remove an existing NamedTopology from a running Kafka Streams app. If 
multiple instances of the application are
+ * running, you should inform all of them by calling {@link 
#removeNamedTopology(String)} on each client to ensure
+ * it stops processing the old topology.
+ *
+ * @throws IllegalArgumentException if this topology name cannot be found
+ * @throws IllegalStateExceptionif streams has not been started or has 
already shut down
+ * @throws TopologyExceptionif this topology subscribes to any 
input topics or pattern already in use
+ */
+public void removeNamedTopology(final String topologyToRemove) {
+if (!isRunningOrRebalancing()) {
+throw new IllegalStateException("Cannot remove a NamedTopology 
while the state is " + super.state);
+} else if (!getTopologyByName(topologyToRemove).isPresent()) {
+throw new IllegalArgumentException("Unable to locate for removal a 
NamedTopology called " + topologyToRemove);
+}
+
+topologyMetadata.unregisterTopology(topologyToRemove);
 }
 
-public void removeNamedTopology(final String namedTopology) {
-throw new UnsupportedOperationException();
+/**
+ * Do a clean up of the local state directory for this NamedTopology by 
deleting all data with regard to the
+ * @link StreamsConfig#APPLICATION_ID_CONFIG application ID} in the 
({@link StreamsConfig#STATE_DIR_CONFIG})
+ * 
+ * May be called while the Streams is in any state, but only on a {@link 
NamedTopology} that has already been
+ * removed via {@link #removeNamedTopology(String)}.
+ * 
+ * Calling this method triggers a restore of local {@link StateStore}s for 
this {@link NamedTopology} if it is
+ * ever re-added via {@link #addNamedTopology(NamedTopology)}.
+ *
+ * @throws IllegalStateException if this {@code NamedTopology} hasn't been 
removed
+ * @throws StreamsException if cleanup failed
+ */
+public void cleanUpNamedTopology(final String name) {
+if (getTopologyBy

[GitHub] [kafka] showuon commented on a change in pull request #11124: KAFKA-12839: Let SlidingWindows aggregation support window size of 0

2021-08-03 Thread GitBox


showuon commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r682231711



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -100,17 +99,79 @@
 
 private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 private final String threadId = Thread.currentThread().getName();
+private final String topic = "topic";
+private final String defaultInOrderName = "InOrder";
+private final String defaultReverseName = "Reverse";
+private final long defaultWindowSize = 10L;
+private final long defaultRetentionPeriod = 5000L;
+
+private WindowBytesStoreSupplier getStoreSupplier(final boolean 
inOrderIterator,
+  final String inOrderName,
+  final String reverseName,
+  final long windowSize) {
+return inOrderIterator
+? new InOrderMemoryWindowStoreSupplier(inOrderName, 
defaultRetentionPeriod, windowSize, false)
+: Stores.inMemoryWindowStore(reverseName, 
ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false);
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void testAggregateSmallInputWithZeroTimeDifference() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+// We use CachingWindowStore to store the aggregated values 
internally, and then use TimeWindow to represent the "windowed KTable"
+// thus, the window size must be greater than 0 here
+final WindowBytesStoreSupplier storeSupplier = 
getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 1L);

Review comment:
   @mjsax , answering your review comments here. Please check the above 
discussion for more info.
   
   > Why window-agnostic ? In general, I am not sure why we need to change the 
existing JavaDocs? What information do you think is missing or wrong?
   
   > You add (inclusive) and (exclusive) in SessionWindow but remove it here. 
Seems inconsistent?
   
   > Why do you remove this check? A TimeWindow should not allow this case.
   
   > Why do we need to remove this temporarily?
   
   --> The answer for the above questions are that we can't create a store 
supplier with window size of 0 here because we use `TimeWindow` to represent 
the "windowed KTable" result. @ableegoldman and I both thought it doesn't make 
sense to use `TimeWindow` to represent it if `WindowStore` is used for both 
inclusive-exclusive and also inclusive-inclusive windows. We should have a 
neutral time window for this case. That's why Sophie suggested that we should 
have a container class that does nothing but hold the start and end time for 
use in window-agnostic cases like the `CachingWindowStore`. And the container 
class can be named `TimeWindow`, and we were thinking that after all, we'll 
rename the `TimeWindow` into `InclusiveExclusiveWindow`, so that's why I 
changed the java doc/start and end time checking/test for it. 
   
   So, since we agreed that we won't rename the window, I'll revert it. But 
still, there's a question there:  
   _We can't create a store supplier with window size of 0 here because we use 
`TimeWindow` to represent the "windowed KTable" result._
   Do you agree we should have a container class to do nothing but hold the 
start and end time for use in window-agnostic cases like the 
`CachingWindowStore`? Or any other suggestions?
   
   Thank you.
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on pull request #11110: MINOR: move tiered storage related configs to a separate class within LogConfig

2021-08-03 Thread GitBox


ccding commented on pull request #0:
URL: https://github.com/apache/kafka/pull/0#issuecomment-892299485


   Included the motivation and updated the PR. PTAL @ijuma @junrao @kowshik 
@satishd 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-03 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r682230209



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
##
@@ -64,6 +65,9 @@
 private final Map taskProducers;
 private final StreamThread.ProcessingMode processingMode;
 
+// tasks may be assigned for a NamedTopology that is not yet known by this 
host, and saved for later creation
+private final Map>  unknownTasksToBeCreated = 
new HashMap<>();

Review comment:
   I think we discussed this already, but in case anyone else is wondering: 
the leader will always assign tasks based on its view of the current named 
topologies and topics, ie it does not check on individual subscriptions since 
the group is assumed to be eventually consistent in this regard. (Note this is 
actually no different than today; even if each instance of an app has a 
different input topic in their topology, they will all wind up receiving tasks 
for whichever topic the leader happened to have.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13145) Renaming the time interval window for better understanding

2021-08-03 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-13145.
---
Resolution: Won't Fix

> Renaming the time interval window for better understanding
> --
>
> Key: KAFKA-13145
> URL: https://issues.apache.org/jira/browse/KAFKA-13145
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
>  
> I have another thought, which is to rename the time interval related windows. 
> Currently, we have 3 types of time interval window:
>  {{TimeWindow}} -> to have {{[start,end)}} time interval
>  {{SessionWindow}} -> to have {{[start,end]}} time interval
>  {{UnlimitedWindow}} -> to have {{[start, MAX_VALUE)}} time interval
> I think the name {{SessionWindow}} is definitely not good here, especially we 
> want to use it in {{SlidingWindows}} now, although it is only used for 
> {{SessionWindows}} before. We should name them with time interval meaning, 
> not the streaming window functions meaning. {{}}Because these 3 window types 
> are internal use only, it is safe to rename them.
>  
> {{TimeWindow}} --> {{InclusiveExclusiveWindow}}
>  {{SessionWindow}} / {{SlidingWindow}} --> {{InclusiveInclusiveWindow}}
>  {{UnlimitedWindow}} --> {{InclusiveUnboundedWindow}}
> {{}}
> See the discussion here{{: 
> [https://github.com/apache/kafka/pull/11124#issuecomment-887989639]}}
> {{}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13145) Renaming the time interval window for better understanding

2021-08-03 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392626#comment-17392626
 ] 

Luke Chen commented on KAFKA-13145:
---

I'm good. I'll close this ticket and add `SlidingWindow` class via KAKFA-12839. 
Thank you.

> Renaming the time interval window for better understanding
> --
>
> Key: KAFKA-13145
> URL: https://issues.apache.org/jira/browse/KAFKA-13145
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
>  
> I have another thought, which is to rename the time interval related windows. 
> Currently, we have 3 types of time interval window:
>  {{TimeWindow}} -> to have {{[start,end)}} time interval
>  {{SessionWindow}} -> to have {{[start,end]}} time interval
>  {{UnlimitedWindow}} -> to have {{[start, MAX_VALUE)}} time interval
> I think the name {{SessionWindow}} is definitely not good here, especially we 
> want to use it in {{SlidingWindows}} now, although it is only used for 
> {{SessionWindows}} before. We should name them with time interval meaning, 
> not the streaming window functions meaning. {{}}Because these 3 window types 
> are internal use only, it is safe to rename them.
>  
> {{TimeWindow}} --> {{InclusiveExclusiveWindow}}
>  {{SessionWindow}} / {{SlidingWindow}} --> {{InclusiveInclusiveWindow}}
>  {{UnlimitedWindow}} --> {{InclusiveUnboundedWindow}}
> {{}}
> See the discussion here{{: 
> [https://github.com/apache/kafka/pull/11124#issuecomment-887989639]}}
> {{}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)

2021-08-03 Thread GitBox


hachikuji commented on a change in pull request #11171:
URL: https://github.com/apache/kafka/pull/11171#discussion_r682206623



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -313,14 +313,20 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, 
offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): Unit = {
-isFutureReplica match {
-  case true if futureLog.isEmpty =>
-val log = createLog(isNew, isFutureReplica, offsetCheckpoints, topicId)
+val logOpt = if (isFutureReplica) futureLog else log
+if (logOpt.isEmpty) {
+  val log = createLog(isNew, isFutureReplica, offsetCheckpoints, topicId)
+  if (isFutureReplica)
 this.futureLog = Option(log)
-  case false if log.isEmpty =>
-val log = createLog(isNew, isFutureReplica, offsetCheckpoints, topicId)
+  else
 this.log = Option(log)
-  case _ => trace(s"${if (isFutureReplica) "Future Log" else "Log"} 
already exists.")
+} else {
+  trace(s"${if (isFutureReplica) "Future Log" else "Log"} already exists.")
+  logOpt.foreach { log =>
+if (log.topicId.isEmpty) {

Review comment:
   By the time we get here, I think we have already validated that the 
topicid is consistent. Nevertheless, I wonder if it makes sense to let 
`assignTopicId` validate the passed topicId? Currently it will just override 
the value.

##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -313,14 +313,20 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, 
offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): Unit = {
-isFutureReplica match {
-  case true if futureLog.isEmpty =>
-val log = createLog(isNew, isFutureReplica, offsetCheckpoints, topicId)
+val logOpt = if (isFutureReplica) futureLog else log

Review comment:
   Any better?
   ```scala
 def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, 
offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): Unit = {
   def maybeCreate(logOpt: Option[Log]): Log = {
 logOpt match {
   case Some(log) =>
 trace(s"${if (isFutureReplica) "Future Log" else "Log"} already 
exists.")
 topicId.foreach(log.assignTopicId)
 log
   case None =>
 createLog(isNew, isFutureReplica, offsetCheckpoints, topicId)
 }
   }
   
   if (isFutureReplica) {
 this.futureLog = Some(maybeCreate(this.futureLog))
   } else {
 this.log = Some(maybeCreate(this.log))
   }
 }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

2021-08-03 Thread GitBox


ableegoldman commented on pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#issuecomment-892279085


   1) Possibly -- you can check out the [Streams upgrade 
guide](https://kafka.apache.org/28/documentation/streams/upgrade-guide), which 
describes any public API changes in each version. The section for 3.0 won't 
appear until it's released, but you can always poke around the [source html for 
3.0 
changes](https://github.com/apache/kafka/blob/4eb72add11b548e3fe8fea72856af49dc950e444/docs/streams/upgrade-guide.html#L97)
 if you want to be prepared.
   
   2) Good question -- depends on how quickly the remaining open items can be 
addressed and if there are any new issues found during the two week testing 
period. The [3.0 release 
plan](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177046466)
 lists the unresolved issues targeted for 3.0, and you can subscribe to the 
mailing list if you want to keep up with the release progress.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13160) Fix the code that calls the broker’s config handler to pass the expected default resource name when using KRaft.

2021-08-03 Thread Ryan Dielhenn (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Dielhenn updated KAFKA-13160:
--
Summary: Fix the code that calls the broker’s config handler to pass the 
expected default resource name when using KRaft.  (was: Fix the code the calls 
the broker’s config handler to pass the expected default resource name for 
dynamic broker configs)

> Fix the code that calls the broker’s config handler to pass the expected 
> default resource name when using KRaft.
> 
>
> Key: KAFKA-13160
> URL: https://issues.apache.org/jira/browse/KAFKA-13160
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In a ZK cluster, dynamic default broker configs are stored in the zNode 
> /brokers/. Without this fix, when dynamic configs from snapshots are 
> processed by the KRaft brokers, the BrokerConfigHandler checks if the 
> resource name is "" to do a default update and converts the resource 
> name to an integer otherwise to do a per-broker config update.
> In KRaft, dynamic default broker configs are serialized in metadata with 
> empty string instead of "". This was causing the BrokerConfigHandler 
> to throw a NumberFormatException for dynamic default broker configs since the 
> resource name for them is not "" or a single integer. The code that 
> calls the handler method for config changes should be fixed to pass 
> "" instead of empty string to the handler method if using KRaft.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13160) Fix the code the calls the broker’s config handler to pass the expected default resource name for dynamic broker configs

2021-08-03 Thread Ryan Dielhenn (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Dielhenn updated KAFKA-13160:
--
Summary: Fix the code the calls the broker’s config handler to pass the 
expected default resource name for dynamic broker configs  (was: Fix 
BrokerConfigHandler to expect empty string as the resource name for dynamic 
default broker configs in KRaft)

> Fix the code the calls the broker’s config handler to pass the expected 
> default resource name for dynamic broker configs
> 
>
> Key: KAFKA-13160
> URL: https://issues.apache.org/jira/browse/KAFKA-13160
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In a ZK cluster, dynamic default broker configs are stored in the zNode 
> /brokers/. Without this fix, when dynamic configs from snapshots are 
> processed by the KRaft brokers, the BrokerConfigHandler checks if the 
> resource name is "" to do a default update and converts the resource 
> name to an integer otherwise to do a per-broker config update.
> In KRaft, dynamic default broker configs are serialized in metadata with 
> empty string instead of "". This was causing the BrokerConfigHandler 
> to throw a NumberFormatException for dynamic default broker configs since the 
> resource name for them is not "" or a single integer. The code that 
> calls the handler method for config changes should be fixed to pass 
> "" instead of empty string to the handler method if using KRaft.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

2021-08-03 Thread GitBox


mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r682020580



##
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##
@@ -628,26 +634,43 @@ private[log] class Cleaner(val id: Int,
* @param sourceRecords The dirty log segment
* @param dest The cleaned log segment
* @param map The key=>offset mapping
-   * @param retainDeletesAndTxnMarkers Should tombstones and markers be 
retained while cleaning this segment
+   * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than 
version 2) and markers be retained while cleaning this segment
+   * @param deleteRetentionMs Defines how long a tombstone should be kept as 
defined by log configuration
* @param maxLogMessageSize The maximum message size of the corresponding 
topic
* @param stats Collector for cleaning statistics
+   * @param currentTime The time at which the clean was initiated

Review comment:
   ~~It seems like we don't need the return value at all, since we would 
only be using it to track the latestDeleteHorizon in the Log, but it doesn't 
seem like we need that either. I'm going to remove it~~
   
   ~~edit: going through the build errors and I see other usages. I will go 
through it more thoroughly~~

##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -270,7 +270,8 @@ class Log(@volatile private var _dir: File,
   val producerStateManager: ProducerStateManager,
   logDirFailureChannel: LogDirFailureChannel,
   @volatile private var _topicId: Option[Uuid],
-  val keepPartitionMetadataFile: Boolean) extends Logging with 
KafkaMetricsGroup {
+  val keepPartitionMetadataFile: Boolean,
+  @volatile var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP) 
extends Logging with KafkaMetricsGroup {

Review comment:
   ~~hmm it seems like we only use it in a test. That goes with the return 
value that was added into the `cleanInto` method in the LogCleaner. I'm going 
to remove these and see if I can take another approach in the testing~~
   
   ~~edit: going through the build errors and I see other usages. I will go 
through it more thoroughly~~

##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -270,7 +270,8 @@ class Log(@volatile private var _dir: File,
   val producerStateManager: ProducerStateManager,
   logDirFailureChannel: LogDirFailureChannel,
   @volatile private var _topicId: Option[Uuid],
-  val keepPartitionMetadataFile: Boolean) extends Logging with 
KafkaMetricsGroup {
+  val keepPartitionMetadataFile: Boolean,
+  @volatile var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP) 
extends Logging with KafkaMetricsGroup {

Review comment:
   sorry I jumped the gun a bit w/ the earlier changes
   
   There is an addition to the LogCleanerManager that allows the cleaner to 
check for cleaning logs that have tombstones past the deleteHorizon. The logic 
in the LogCleanerManager can be paraphrased to "if there are no eligible 
cleanable logs, we can see if there are logs that have tombstones that can be 
deleted by checking the Log's latestDeleteHorizon. We can enqueue those with 
tombstones eligible for cleaning"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

2021-08-03 Thread GitBox


mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r682018753



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -270,7 +270,8 @@ class Log(@volatile private var _dir: File,
   val producerStateManager: ProducerStateManager,
   logDirFailureChannel: LogDirFailureChannel,
   @volatile private var _topicId: Option[Uuid],
-  val keepPartitionMetadataFile: Boolean) extends Logging with 
KafkaMetricsGroup {
+  val keepPartitionMetadataFile: Boolean,
+  @volatile var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP) 
extends Logging with KafkaMetricsGroup {

Review comment:
   ~~hmm it seems like we only use it in a test. That goes with the return 
value that was added into the `cleanInto` method in the LogCleaner. I'm going 
to remove these and see if I can take another approach in the testing 
   
   edit: going through the build errors and I see other usages. I will go 
through it more thoroughly~~
   
   sorry I jumped the gun a bit w/ the earlier changes
   
   There is an addition to the LogCleanerManager that allows the cleaner to 
check for cleaning logs that have tombstones past the deleteHorizon. The logic 
in the LogCleanerManager can be paraphrased to "if there are no eligible 
cleanable logs, we can see if there are logs that have tombstones that can be 
deleted by checking the Log's latestDeleteHorizon. We can enqueue those with 
tombstones eligible for cleaning"

##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -270,7 +270,8 @@ class Log(@volatile private var _dir: File,
   val producerStateManager: ProducerStateManager,
   logDirFailureChannel: LogDirFailureChannel,
   @volatile private var _topicId: Option[Uuid],
-  val keepPartitionMetadataFile: Boolean) extends Logging with 
KafkaMetricsGroup {
+  val keepPartitionMetadataFile: Boolean,
+  @volatile var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP) 
extends Logging with KafkaMetricsGroup {

Review comment:
   ~~hmm it seems like we only use it in a test. That goes with the return 
value that was added into the `cleanInto` method in the LogCleaner. I'm going 
to remove these and see if I can take another approach in the testing~~
   
   ~~edit: going through the build errors and I see other usages. I will go 
through it more thoroughly~~
   
   sorry I jumped the gun a bit w/ the earlier changes
   
   There is an addition to the LogCleanerManager that allows the cleaner to 
check for cleaning logs that have tombstones past the deleteHorizon. The logic 
in the LogCleanerManager can be paraphrased to "if there are no eligible 
cleanable logs, we can see if there are logs that have tombstones that can be 
deleted by checking the Log's latestDeleteHorizon. We can enqueue those with 
tombstones eligible for cleaning"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13145) Renaming the time interval window for better understanding

2021-08-03 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392615#comment-17392615
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13145:


Fine with me. FWIW the "InclusiveExclusiveWindow" name was my idea, but that 
was just to avoid using something called "SessionWindow" in the _Sliding_ 
window processor – making a new SlidingWindow class works too.

> Renaming the time interval window for better understanding
> --
>
> Key: KAFKA-13145
> URL: https://issues.apache.org/jira/browse/KAFKA-13145
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
>  
> I have another thought, which is to rename the time interval related windows. 
> Currently, we have 3 types of time interval window:
>  {{TimeWindow}} -> to have {{[start,end)}} time interval
>  {{SessionWindow}} -> to have {{[start,end]}} time interval
>  {{UnlimitedWindow}} -> to have {{[start, MAX_VALUE)}} time interval
> I think the name {{SessionWindow}} is definitely not good here, especially we 
> want to use it in {{SlidingWindows}} now, although it is only used for 
> {{SessionWindows}} before. We should name them with time interval meaning, 
> not the streaming window functions meaning. {{}}Because these 3 window types 
> are internal use only, it is safe to rename them.
>  
> {{TimeWindow}} --> {{InclusiveExclusiveWindow}}
>  {{SessionWindow}} / {{SlidingWindow}} --> {{InclusiveInclusiveWindow}}
>  {{UnlimitedWindow}} --> {{InclusiveUnboundedWindow}}
> {{}}
> See the discussion here{{: 
> [https://github.com/apache/kafka/pull/11124#issuecomment-887989639]}}
> {{}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13150) How is Kafkastream configured to consume data from a specified offset ?

2021-08-03 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392612#comment-17392612
 ] 

Matthias J. Sax commented on KAFKA-13150:
-

To subscribe, follow instruction on the webpage: 
[https://kafka.apache.org/contact] 

> How is Kafkastream configured to consume data from a specified offset ?
> ---
>
> Key: KAFKA-13150
> URL: https://issues.apache.org/jira/browse/KAFKA-13150
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: wangjh
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] codefactor commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

2021-08-03 Thread GitBox


codefactor commented on pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#issuecomment-892269776


   @ableegoldman ,
   Thanks a lot - do you have any links where I can find out the following:
   1. are there any breaking changes if I upgrade from version 2.6 to 3.0?
   2. when is version 3.0 scheduled to be released to maven central repository?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2021-08-03 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13152:

Labels: needs-kip  (was: )

> Replace "buffered.records.per.partition" with "input.buffer.max.bytes" 
> ---
>
> Key: KAFKA-13152
> URL: https://issues.apache.org/jira/browse/KAFKA-13152
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13145) Renaming the time interval window for better understanding

2021-08-03 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392611#comment-17392611
 ] 

Matthias J. Sax commented on KAFKA-13145:
-

Personally, I am not a fan on `InclusiveExclusiveWindow` as a name, but if you 
feel strong about it, I can be convinced. Because the window classes in 
question are small an easy, I would rather prefer to just duplicate the code if 
necessary, and keep the semantically more meaningful names that we have now. 
Ie, we would just add a `SlidingWindow` (that also has inclusive upper/lower 
bound, similar to `SessionWindow`) and call it a day. – Because the code is so 
simple, I am not worried about code duplication personally.

For this case, we could close this ticket as "won't fix" and just add 
`SlidingWindow` class via KAKFA-12839 instead. Thoughts? \cc [~ableegoldman]

> Renaming the time interval window for better understanding
> --
>
> Key: KAFKA-13145
> URL: https://issues.apache.org/jira/browse/KAFKA-13145
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
>  
> I have another thought, which is to rename the time interval related windows. 
> Currently, we have 3 types of time interval window:
>  {{TimeWindow}} -> to have {{[start,end)}} time interval
>  {{SessionWindow}} -> to have {{[start,end]}} time interval
>  {{UnlimitedWindow}} -> to have {{[start, MAX_VALUE)}} time interval
> I think the name {{SessionWindow}} is definitely not good here, especially we 
> want to use it in {{SlidingWindows}} now, although it is only used for 
> {{SessionWindows}} before. We should name them with time interval meaning, 
> not the streaming window functions meaning. {{}}Because these 3 window types 
> are internal use only, it is safe to rename them.
>  
> {{TimeWindow}} --> {{InclusiveExclusiveWindow}}
>  {{SessionWindow}} / {{SlidingWindow}} --> {{InclusiveInclusiveWindow}}
>  {{UnlimitedWindow}} --> {{InclusiveUnboundedWindow}}
> {{}}
> See the discussion here{{: 
> [https://github.com/apache/kafka/pull/11124#issuecomment-887989639]}}
> {{}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12559) Add a top-level Streams config for bounding off-heap memory

2021-08-03 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392610#comment-17392610
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12559:


[~brz] I'd say give [~msundeq] another day or two to respond and if you don't 
hear back then feel free to assign this ticket to yourself. I just added you as 
a contributor on the project so you should be able to self-assign tickets from 
now on.

> Add a top-level Streams config for bounding off-heap memory
> ---
>
> Key: KAFKA-12559
> URL: https://issues.apache.org/jira/browse/KAFKA-12559
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Martin Sundeqvist
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
>
> At the moment we provide an example of how to bound the memory usage of 
> rocskdb in the [Memory 
> Management|https://kafka.apache.org/27/documentation/streams/developer-guide/memory-mgmt.html#rocksdb]
>  section of the docs. This requires implementing a custom RocksDBConfigSetter 
> class and setting a number of rocksdb options for relatively advanced 
> concepts and configurations. It seems a fair number of users either fail to 
> find this or consider it to be for more advanced use cases/users. But RocksDB 
> can eat up a lot of off-heap memory and it's not uncommon for users to come 
> across a {{RocksDBException: Cannot allocate memory}}
> It would probably be a much better user experience if we implemented this 
> memory bound out-of-the-box and just gave users a top-level StreamsConfig to 
> tune the off-heap memory given to rocksdb, like we have for on-heap cache 
> memory with cache.max.bytes.buffering. More advanced users can continue to 
> fine-tune their memory bounding and apply other configs with a custom config 
> setter, while new or more casual users can cap on the off-heap memory without 
> getting their hands dirty with rocksdb.
> I would propose to add the following top-level config:
> rocksdb.max.bytes.off.heap: medium priority, default to -1 (unbounded), valid 
> values are [0, inf]
> I'd also want to consider adding a second, lower priority top-level config to 
> give users a knob for adjusting how much of that total off-heap memory goes 
> to the block cache + index/filter blocks, and how much of it is afforded to 
> the write buffers. I'm struggling to come up with a good name for this 
> config, but it would be something like
> rocksdb.memtable.to.block.cache.off.heap.memory.ratio: low priority, default 
> to 0.5, valid values are [0, 1]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

2021-08-03 Thread GitBox


mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r682020580



##
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##
@@ -628,26 +634,43 @@ private[log] class Cleaner(val id: Int,
* @param sourceRecords The dirty log segment
* @param dest The cleaned log segment
* @param map The key=>offset mapping
-   * @param retainDeletesAndTxnMarkers Should tombstones and markers be 
retained while cleaning this segment
+   * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than 
version 2) and markers be retained while cleaning this segment
+   * @param deleteRetentionMs Defines how long a tombstone should be kept as 
defined by log configuration
* @param maxLogMessageSize The maximum message size of the corresponding 
topic
* @param stats Collector for cleaning statistics
+   * @param currentTime The time at which the clean was initiated

Review comment:
   It seems like we don't need the return value at all, since we would only 
be using it to track the latestDeleteHorizon in the Log, but it doesn't seem 
like we need that either. I'm going to remove it
   
   edit: going through the build errors and I see other usages. I will go 
through it more thoroughly




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11124: KAFKA-12839: Let SlidingWindows aggregation support window size of 0

2021-08-03 Thread GitBox


mjsax commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r682192656



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
##
@@ -19,33 +19,24 @@
 import org.apache.kafka.streams.kstream.Window;
 
 /**
- * A {@link TimeWindow} covers a half-open time interval with its start 
timestamp as an inclusive boundary and its end
- * timestamp as exclusive boundary.
- * It is a fixed size window, i.e., all instances (of a single {@link 
org.apache.kafka.streams.kstream.TimeWindows
- * window specification}) will have the same size.
- * 
- * For time semantics, see {@link 
org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}.
+ * A {@link TimeWindow} is a time interval window container that holds the 
start and end time for use in window-agnostic cases,
+ * ex: in {@link org.apache.kafka.streams.state.WindowStore}, we'll store the 
aggregated values of any fixed-size types of time windows.
+ * We use {@link TimeWindow} to represent these time windows
  *
  * @see SessionWindow
  * @see UnlimitedWindow
- * @see org.apache.kafka.streams.kstream.TimeWindows
- * @see org.apache.kafka.streams.processor.TimestampExtractor
  */
 public class TimeWindow extends Window {
 
 /**
- * Create a new window for the given start time (inclusive) and end time 
(exclusive).
+ * Create a new window for the given start time and end time.
  *
- * @param startMs the start timestamp of the window (inclusive)
- * @param endMs   the end timestamp of the window (exclusive)
- * @throws IllegalArgumentException if {@code startMs} is negative or if 
{@code endMs} is smaller than or equal to
- * {@code startMs}
+ * @param startMs the start timestamp of the window

Review comment:
   You add `(inclusive)` and `(exclusive)` in `SessionWindow` but remove it 
here. Seems inconsistent?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -351,7 +351,8 @@ private void processEarly(final K key, final V value, final 
long inputRecordTime
 }
 
 if (combinedWindow == null) {
-final TimeWindow window = new TimeWindow(0, 
windows.timeDifferenceMs());
+// created a [start, end] time interval window via 
SessionWindow
+final SessionWindow window = new SessionWindow(0, 
windows.timeDifferenceMs());

Review comment:
   I would prefer to _first_ rename existing windows and not merge this PR 
using `SessionWindows` within `SlidingWindowAggregate`...

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
##
@@ -19,33 +19,24 @@
 import org.apache.kafka.streams.kstream.Window;
 
 /**
- * A {@link TimeWindow} covers a half-open time interval with its start 
timestamp as an inclusive boundary and its end
- * timestamp as exclusive boundary.
- * It is a fixed size window, i.e., all instances (of a single {@link 
org.apache.kafka.streams.kstream.TimeWindows
- * window specification}) will have the same size.
- * 
- * For time semantics, see {@link 
org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}.
+ * A {@link TimeWindow} is a time interval window container that holds the 
start and end time for use in window-agnostic cases,

Review comment:
   Why `window-agnostic` ? In general, I am not sure why we need to change 
the existing JavaDocs? What information do you think is missing or wong?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindow.java
##
@@ -19,33 +19,24 @@
 import org.apache.kafka.streams.kstream.Window;
 
 /**
- * A {@link TimeWindow} covers a half-open time interval with its start 
timestamp as an inclusive boundary and its end
- * timestamp as exclusive boundary.
- * It is a fixed size window, i.e., all instances (of a single {@link 
org.apache.kafka.streams.kstream.TimeWindows
- * window specification}) will have the same size.
- * 
- * For time semantics, see {@link 
org.apache.kafka.streams.processor.TimestampExtractor TimestampExtractor}.
+ * A {@link TimeWindow} is a time interval window container that holds the 
start and end time for use in window-agnostic cases,
+ * ex: in {@link org.apache.kafka.streams.state.WindowStore}, we'll store the 
aggregated values of any fixed-size types of time windows.
+ * We use {@link TimeWindow} to represent these time windows
  *
  * @see SessionWindow
  * @see UnlimitedWindow
- * @see org.apache.kafka.streams.kstream.TimeWindows
- * @see org.apache.kafka.streams.processor.TimestampExtractor
  */
 public class TimeWindow extends Window {
 
 /**
- * Create a new window for the given start time (inclusive) and end time 
(exclusive).
+ * Create a new window for the given start time and end time.
  *
- * @param startMs the start 

[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

2021-08-03 Thread GitBox


mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r682018753



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -270,7 +270,8 @@ class Log(@volatile private var _dir: File,
   val producerStateManager: ProducerStateManager,
   logDirFailureChannel: LogDirFailureChannel,
   @volatile private var _topicId: Option[Uuid],
-  val keepPartitionMetadataFile: Boolean) extends Logging with 
KafkaMetricsGroup {
+  val keepPartitionMetadataFile: Boolean,
+  @volatile var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP) 
extends Logging with KafkaMetricsGroup {

Review comment:
   hmm it seems like we only use it in a test. That goes with the return 
value that was added into the `cleanInto` method in the LogCleaner. I'm going 
to remove these and see if I can take another approach in the testing 
   
   edit: going through the build errors and I see other usages. I will go 
through it more thoroughly




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13160) Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft

2021-08-03 Thread Ryan Dielhenn (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Dielhenn updated KAFKA-13160:
--
Description: 
In a ZK cluster, dynamic default broker configs are stored in the zNode 
/brokers/. Without this fix, when dynamic configs from snapshots are 
processed by the KRaft brokers, the BrokerConfigHandler checks if the resource 
name is "" to do a default update and converts the resource name to an 
integer otherwise to do a per-broker config update.

In KRaft, dynamic default broker configs are serialized in metadata with empty 
string instead of "". This was causing the BrokerConfigHandler to 
throw a NumberFormatException for dynamic default broker configs since the 
resource name for them is not "" or a single integer. The code that 
calls the handler method for config changes should be fixed to pass "" 
instead of empty string to the handler method if using KRaft.

  was:
In a ZK cluster, dynamic default broker configs are stored in the zNode 
/brokers/. Without this fix, when dynamic configs from snapshots are 
processed by the KRaft brokers, the BrokerConfigHandler checks if the resource 
name is "" to do a default update and converts the resource name to an 
integer otherwise to do a per-broker config update.

In KRaft, dynamic default broker configs are serialized in metadata with empty 
string instead of "". This was causing the BrokerConfigHandler to 
throw a NumberFormatException for dynamic default broker configs since the 
resource name for them is not "" or a single integer. The code that 
calls the handler method for config changes should be fixed to pass "" 
instead of empty string if using KRaft to the handler method.


> Fix BrokerConfigHandler to expect empty string as the resource name for 
> dynamic default broker configs in KRaft
> ---
>
> Key: KAFKA-13160
> URL: https://issues.apache.org/jira/browse/KAFKA-13160
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In a ZK cluster, dynamic default broker configs are stored in the zNode 
> /brokers/. Without this fix, when dynamic configs from snapshots are 
> processed by the KRaft brokers, the BrokerConfigHandler checks if the 
> resource name is "" to do a default update and converts the resource 
> name to an integer otherwise to do a per-broker config update.
> In KRaft, dynamic default broker configs are serialized in metadata with 
> empty string instead of "". This was causing the BrokerConfigHandler 
> to throw a NumberFormatException for dynamic default broker configs since the 
> resource name for them is not "" or a single integer. The code that 
> calls the handler method for config changes should be fixed to pass 
> "" instead of empty string to the handler method if using KRaft.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13160) Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft

2021-08-03 Thread Ryan Dielhenn (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Dielhenn updated KAFKA-13160:
--
Description: 
In a ZK cluster, dynamic default broker configs are stored in the zNode 
/brokers/. Without this fix, when dynamic configs from snapshots are 
processed by the KRaft brokers, the BrokerConfigHandler checks if the resource 
name is "" to do a default update and converts the resource name to an 
integer otherwise to do a per-broker config update.

In KRaft, dynamic default broker configs are serialized in metadata with empty 
string instead of "". This was causing the BrokerConfigHandler to 
throw a NumberFormatException for dynamic default broker configs since the 
resource name for them is not "" or a single integer. The code that 
calls the handler method for config changes should be fixed to pass "" 
instead of empty string if using KRaft.

  was:
In a ZK cluster, dynamic default broker configs are stored in the zNode 
/brokers/. Without this fix, when dynamic configs from snapshots are 
processed by the KRaft brokers, the BrokerConfigHandler checks if the resource 
name is "" to do a default update and converts the resource name to an 
integer otherwise to do a per-broker config update.

In KRaft, dynamic default broker configs are serialized in metadata with empty 
string instead of "". This was causing the BrokerConfigHandler to 
throw a NumberFormatException for dynamic default broker configs since the 
resource name for them is not "" or a single integer. The code that 
calls the handler should be fixed to pass "" instead of empty string 
if using KRaft.


> Fix BrokerConfigHandler to expect empty string as the resource name for 
> dynamic default broker configs in KRaft
> ---
>
> Key: KAFKA-13160
> URL: https://issues.apache.org/jira/browse/KAFKA-13160
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In a ZK cluster, dynamic default broker configs are stored in the zNode 
> /brokers/. Without this fix, when dynamic configs from snapshots are 
> processed by the KRaft brokers, the BrokerConfigHandler checks if the 
> resource name is "" to do a default update and converts the resource 
> name to an integer otherwise to do a per-broker config update.
> In KRaft, dynamic default broker configs are serialized in metadata with 
> empty string instead of "". This was causing the BrokerConfigHandler 
> to throw a NumberFormatException for dynamic default broker configs since the 
> resource name for them is not "" or a single integer. The code that 
> calls the handler method for config changes should be fixed to pass 
> "" instead of empty string if using KRaft.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13160) Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft

2021-08-03 Thread Ryan Dielhenn (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Dielhenn updated KAFKA-13160:
--
Description: 
In a ZK cluster, dynamic default broker configs are stored in the zNode 
/brokers/. Without this fix, when dynamic configs from snapshots are 
processed by the KRaft brokers, the BrokerConfigHandler checks if the resource 
name is "" to do a default update and converts the resource name to an 
integer otherwise to do a per-broker config update.

In KRaft, dynamic default broker configs are serialized in metadata with empty 
string instead of "". This was causing the BrokerConfigHandler to 
throw a NumberFormatException for dynamic default broker configs since the 
resource name for them is not "" or a single integer. The code that 
calls the handler method for config changes should be fixed to pass "" 
instead of empty string if using KRaft to the handler method.

  was:
In a ZK cluster, dynamic default broker configs are stored in the zNode 
/brokers/. Without this fix, when dynamic configs from snapshots are 
processed by the KRaft brokers, the BrokerConfigHandler checks if the resource 
name is "" to do a default update and converts the resource name to an 
integer otherwise to do a per-broker config update.

In KRaft, dynamic default broker configs are serialized in metadata with empty 
string instead of "". This was causing the BrokerConfigHandler to 
throw a NumberFormatException for dynamic default broker configs since the 
resource name for them is not "" or a single integer. The code that 
calls the handler method for config changes should be fixed to pass "" 
instead of empty string if using KRaft.


> Fix BrokerConfigHandler to expect empty string as the resource name for 
> dynamic default broker configs in KRaft
> ---
>
> Key: KAFKA-13160
> URL: https://issues.apache.org/jira/browse/KAFKA-13160
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In a ZK cluster, dynamic default broker configs are stored in the zNode 
> /brokers/. Without this fix, when dynamic configs from snapshots are 
> processed by the KRaft brokers, the BrokerConfigHandler checks if the 
> resource name is "" to do a default update and converts the resource 
> name to an integer otherwise to do a per-broker config update.
> In KRaft, dynamic default broker configs are serialized in metadata with 
> empty string instead of "". This was causing the BrokerConfigHandler 
> to throw a NumberFormatException for dynamic default broker configs since the 
> resource name for them is not "" or a single integer. The code that 
> calls the handler method for config changes should be fixed to pass 
> "" instead of empty string if using KRaft to the handler method.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13160) Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft

2021-08-03 Thread Ryan Dielhenn (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Dielhenn updated KAFKA-13160:
--
Description: 
In a ZK cluster, dynamic default broker configs are stored in the zNode 
/brokers/. Without this fix, when dynamic configs from snapshots are 
processed by the KRaft brokers, the BrokerConfigHandler checks if the resource 
name is "" to do a default update and converts the resource name to an 
integer otherwise to do a per-broker config update.

In KRaft, dynamic default broker configs are serialized in metadata with empty 
string instead of "". This was causing the BrokerConfigHandler to 
throw a NumberFormatException for dynamic default broker configs since the 
resource name for them is not "" or a single integer. The code that 
calls the handler should be fixed to pass "" instead of empty string 
if using KRaft.

  was:
In a ZK cluster, dynamic default broker configs are stored in the zNode 
/brokers/. Without this fix, when dynamic configs from snapshots are 
processed by the KRaft brokers, the BrokerConfigHandler checks if the resource 
name is "" to do a default update and converts the resource name to an 
integer otherwise to do a per-broker config update.

In KRaft, dynamic default broker configs are serialized in metadata with empty 
string instead of "". This was causing the BrokerConfigHandler to 
throw a NumberFormatException for dynamic default broker configs since the 
resource name for them is not "" or a single integer. This code that 
calls the handler should be fixed to pass "" instead of empty string 
if using KRaft.


> Fix BrokerConfigHandler to expect empty string as the resource name for 
> dynamic default broker configs in KRaft
> ---
>
> Key: KAFKA-13160
> URL: https://issues.apache.org/jira/browse/KAFKA-13160
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In a ZK cluster, dynamic default broker configs are stored in the zNode 
> /brokers/. Without this fix, when dynamic configs from snapshots are 
> processed by the KRaft brokers, the BrokerConfigHandler checks if the 
> resource name is "" to do a default update and converts the resource 
> name to an integer otherwise to do a per-broker config update.
> In KRaft, dynamic default broker configs are serialized in metadata with 
> empty string instead of "". This was causing the BrokerConfigHandler 
> to throw a NumberFormatException for dynamic default broker configs since the 
> resource name for them is not "" or a single integer. The code that 
> calls the handler should be fixed to pass "" instead of empty string 
> if using KRaft.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13160) Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft

2021-08-03 Thread Ryan Dielhenn (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Dielhenn updated KAFKA-13160:
--
Description: 
In a ZK cluster, dynamic default broker configs are stored in the zNode 
/brokers/. Without this fix, when dynamic configs from snapshots are 
processed by the KRaft brokers, the BrokerConfigHandler checks if the resource 
name is "" to do a default update and converts the resource name to an 
integer otherwise to do a per-broker config update.

In KRaft, dynamic default broker configs are serialized in metadata with empty 
string instead of "". This was causing the BrokerConfigHandler to 
throw a NumberFormatException for dynamic default broker configs since the 
resource name for them is not "" or a single integer. This code that 
calls the handler should be fixed to pass "" instead of empty string 
if using KRaft.

  was:
In a ZK cluster, dynamic default broker configs are stored in the zNode 
/brokers/. Without this fix, when dynamic configs from snapshots are 
processed by the KRaft brokers, the BrokerConfigHandler checks if the resource 
name is "" to do a default update and converts the resource name to an 
integer otherwise to do a per-broker config update.

In KRaft, dynamic default broker configs are serialized in metadata with empty 
string instead of "". This was causing the BrokerConfigHandler to 
throw a NumberFormatException for dynamic default broker configs since the 
resource name for them is not "" or a single integer. This handler 
should be fixed to expect empty string as the resource name for the dynamic 
default broker configs if using KRaft.


> Fix BrokerConfigHandler to expect empty string as the resource name for 
> dynamic default broker configs in KRaft
> ---
>
> Key: KAFKA-13160
> URL: https://issues.apache.org/jira/browse/KAFKA-13160
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In a ZK cluster, dynamic default broker configs are stored in the zNode 
> /brokers/. Without this fix, when dynamic configs from snapshots are 
> processed by the KRaft brokers, the BrokerConfigHandler checks if the 
> resource name is "" to do a default update and converts the resource 
> name to an integer otherwise to do a per-broker config update.
> In KRaft, dynamic default broker configs are serialized in metadata with 
> empty string instead of "". This was causing the BrokerConfigHandler 
> to throw a NumberFormatException for dynamic default broker configs since the 
> resource name for them is not "" or a single integer. This code that 
> calls the handler should be fixed to pass "" instead of empty string 
> if using KRaft.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13162) ElectLeader API must be forwarded to Controller

2021-08-03 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13162:
---

 Summary: ElectLeader API must be forwarded to Controller
 Key: KAFKA-13162
 URL: https://issues.apache.org/jira/browse/KAFKA-13162
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson
 Fix For: 3.0.0


We're missing the logic to forward ElectLeaders requests to the controller. 
This means that `kafka-leader-election.sh` does not work correctly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #11157: MINOR: Replace EasyMock with Mockito in test-utils module

2021-08-03 Thread GitBox


mjsax commented on pull request #11157:
URL: https://github.com/apache/kafka/pull/11157#issuecomment-892262404


   Thanks for the PR @dengziming! Merged to `trunk`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #11157: MINOR: Replace EasyMock with Mockito in test-utils module

2021-08-03 Thread GitBox


mjsax merged pull request #11157:
URL: https://github.com/apache/kafka/pull/11157


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13161) Follower leader and ISR state not updated after partition change in KRaft

2021-08-03 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13161:
---

 Summary: Follower leader and ISR state not updated after partition 
change in KRaft
 Key: KAFKA-13161
 URL: https://issues.apache.org/jira/browse/KAFKA-13161
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jose Armando Garcia Sancio
 Fix For: 3.0.0


In KRaft when we detect a partition change, we first verify whether any leader 
or follower transitions are needed. Depending on the case, we call either 
`applyLocalLeadersDelta` or `applyLocalFollowersDelta`. In the latter case, we 
are missing a call to `Partition.makeFollower` which is responsible for 
updating LeaderAndIsr state for the partitions. As a result of this, the 
partition state may be left stale. 

The specific consequences of this bug are 1) follower fetching fails since the 
epoch is never updated, and 2) a stale leader may continue to accept Produce 
requests. The latter is the bigger issue since it can lead to log divergence if 
we are appending from both the client and from the fetcher thread at the same 
time. I tested this locally and confirmed that it is possible.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #11172: MINOR: update stream-stream join docs

2021-08-03 Thread GitBox


mjsax commented on pull request #11172:
URL: https://github.com/apache/kafka/pull/11172#issuecomment-892239651


   \cc @kkonstantine 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax opened a new pull request #11172: MINOR: update stream-stream join docs

2021-08-03 Thread GitBox


mjsax opened a new pull request #11172:
URL: https://github.com/apache/kafka/pull/11172


   Call for review @JimGalasyn @spena @ableegoldman (cf 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics)
   
   Must be cherry-picked to `3.0` branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-03 Thread GitBox


guozhangwang commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r682090255



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
##
@@ -66,14 +72,26 @@
 );
 }
 
+Map> uncreatedTasksForTopologies(final 
Set currentTopologies) {
+return unknownTasksToBeCreated.entrySet().stream().filter(t -> 
currentTopologies.contains(t.getKey().namedTopology())).collect(Collectors.toMap(Entry::getKey,
 Entry::getValue));

Review comment:
   Ditto here.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -53,44 +56,162 @@
 private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
 
 private final StreamsConfig config;
-private final SortedMap builders; // Keep 
sorted by topology name for readability
+private final TopologyVersion version;
+
+private final ConcurrentNavigableMap 
builders; // Keep sorted by topology name for readability
 
 private ProcessorTopology globalTopology;
-private Map globalStateStores = new HashMap<>();
-final Set allInputTopics = new HashSet<>();
+private final Map globalStateStores = new HashMap<>();
+private final Set allInputTopics = new HashSet<>();
+
+public static class TopologyVersion {
+public AtomicLong topologyVersion = new AtomicLong(0L); // the local 
topology version
+public Set assignedNamedTopologies = new HashSet<>(); // the 
named topologies whose tasks are actively assigned
+public ReentrantLock topologyLock = new ReentrantLock();
+public Condition topologyCV = topologyLock.newCondition();
+}
 
-public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+public TopologyMetadata(final InternalTopologyBuilder builder,
+final StreamsConfig config) {
+version = new TopologyVersion();
 this.config = config;
-builders = new TreeMap<>();
+builders = new ConcurrentSkipListMap<>();
 if (builder.hasNamedTopology()) {
 builders.put(builder.topologyName(), builder);
 } else {
 builders.put(UNNAMED_TOPOLOGY, builder);
 }
 }
 
-public TopologyMetadata(final SortedMap 
builders, final StreamsConfig config) {
+public TopologyMetadata(final ConcurrentNavigableMap builders,
+final StreamsConfig config) {
+version = new TopologyVersion();
 this.config = config;
+
 this.builders = builders;
 if (builders.isEmpty()) {
-log.debug("Building KafkaStreams app with no empty topology");
+log.debug("Starting up empty KafkaStreams app with no topology");
 }
 }
 
-public int getNumStreamThreads(final StreamsConfig config) {
-final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+public void updateCurrentAssignmentTopology(final Set 
assignedNamedTopologies) {
+version.assignedNamedTopologies = assignedNamedTopologies;
+}
 
-// If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
-if (builders.isEmpty()) {
-if (configuredNumStreamThreads != 0) {
-log.info("Overriding number of StreamThreads to zero for empty 
topology");
+/**
+ * @return the set of named topologies that the assignor distributed tasks 
for during the last rebalance
+ */
+public Set assignmentNamedTopologies() {
+return version.assignedNamedTopologies;
+}
+
+public long topologyVersion() {
+return version.topologyVersion.get();
+}
+
+public void lock() {
+version.topologyLock.lock();
+}
+
+public void unlock() {
+version.topologyLock.unlock();
+}
+
+public InternalTopologyBuilder getBuilderForTopologyName(final String 
name) {
+return builders.get(name);
+}
+
+/**
+ * @throws IllegalStateException if the thread is not already holding the 
lock via TopologyMetadata#lock
+ */
+public void maybeWaitForNonEmptyTopology() {
+if (!version.topologyLock.isHeldByCurrentThread()) {

Review comment:
   I feel a bit concerned about the "asymmetry" of this function: all other 
functions have the lock inside while this function is supposed to be called by 
a caller -- i.e. `handleTopologyUpdatesPhase`. It is quite vulnerable to bugs 
with additional edits.
   
   I'm wondering if we can move this logic out of `handleTopologyUpdatesPhase` 
instead: i.e. we first update the named topology, and then based on the new 
version we can either wait or re-subscribe and trigger rebalance. WDYT?
   
   

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Top

[GitHub] [kafka] ableegoldman commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

2021-08-03 Thread GitBox


ableegoldman commented on pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#issuecomment-892231916


   @codefactor it will be released in 3.0, which is currently in the final 
stages of testing and preparing for release. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #11168: KAFKA-13160: Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft

2021-08-03 Thread GitBox


cmccabe commented on pull request #11168:
URL: https://github.com/apache/kafka/pull/11168#issuecomment-892230644


   I don't think this is the right place to make this change. 
BrokerMetadataPublisher should be calling `processConfigChanges` with the 
expected name, instead.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

2021-08-03 Thread GitBox


mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r682164477



##
File path: 
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##
@@ -483,6 +486,49 @@ public void testBuildEndTxnMarker() {
 assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch());
 }
 
+/**
+ * This test is used to see if the base timestamp of the batch has been 
successfully
+ * converted to a delete horizon for the tombstones / transaction markers 
of the batch.
+ * It also verifies that the record timestamps remain correct as a delta 
relative to the delete horizon.
+ */
+@ParameterizedTest
+@ArgumentsSource(MemoryRecordsArgumentsProvider.class)
+public void testBaseTimestampToDeleteHorizonConversion(Args args) {
+int partitionLeaderEpoch = 998;
+if (args.magic >= RecordBatch.MAGIC_VALUE_V2) {
+ByteBuffer buffer = ByteBuffer.allocate(2048);
+MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
args.magic, args.compression, TimestampType.CREATE_TIME,
+0L, RecordBatch.NO_TIMESTAMP, partitionLeaderEpoch);
+builder.append(10L, "1".getBytes(), null);
+
+ByteBuffer filtered = ByteBuffer.allocate(2048);
+final long deleteHorizon = Integer.MAX_VALUE / 2;
+final RecordFilter recordFilter = new 
MemoryRecords.RecordFilter(deleteHorizon - 1, 1) {

Review comment:
   I've added a test to `MemoryRecordsBuilderTest` that is similar to this 
one in `MemoryRecordsTest`, but sets the `deleteHorizon` directly through the 
constructor. I see having both tests as useful




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13160) Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft

2021-08-03 Thread Ryan Dielhenn (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Dielhenn updated KAFKA-13160:
--
Description: 
In a ZK cluster, dynamic default broker configs are stored in the zNode 
/brokers/. Without this fix, when dynamic configs from snapshots are 
processed by the KRaft brokers, the BrokerConfigHandler checks if the resource 
name is "" to do a default update and converts the resource name to an 
integer otherwise to do a per-broker config update.

In KRaft, dynamic default broker configs are serialized in metadata with empty 
string instead of "". This was causing the BrokerConfigHandler to 
throw a NumberFormatException for dynamic default broker configs since the 
resource name for them is not "" or a single integer. This handler 
should be fixed to expect empty string as the resource name for the dynamic 
default broker configs if using KRaft.

  was:
In a ZK cluster, dynamic default broker configs are stored in the zNode 
/brokers/. Without this fix, when dynamic configs from snapshots are 
processed by the brokers in KRaft, the BrokerConfigHandler checks if the 
resource name is "" to do a default update and converts the resource 
name to an integer otherwise to do a per-broker config update.

In KRaft, dynamic default broker configs are serialized in the quorum with 
empty string instead of "". This was causing the BrokerConfigHandler 
to throw a NumberFormatException for dynamic default broker configs since the 
resource name for them is not "" or a single integer. This handler 
should be fixed to expect empty string as the resource name for the dynamic 
default broker configs if using KRaft.


> Fix BrokerConfigHandler to expect empty string as the resource name for 
> dynamic default broker configs in KRaft
> ---
>
> Key: KAFKA-13160
> URL: https://issues.apache.org/jira/browse/KAFKA-13160
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In a ZK cluster, dynamic default broker configs are stored in the zNode 
> /brokers/. Without this fix, when dynamic configs from snapshots are 
> processed by the KRaft brokers, the BrokerConfigHandler checks if the 
> resource name is "" to do a default update and converts the resource 
> name to an integer otherwise to do a per-broker config update.
> In KRaft, dynamic default broker configs are serialized in metadata with 
> empty string instead of "". This was causing the BrokerConfigHandler 
> to throw a NumberFormatException for dynamic default broker configs since the 
> resource name for them is not "" or a single integer. This handler 
> should be fixed to expect empty string as the resource name for the dynamic 
> default broker configs if using KRaft.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan opened a new pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)

2021-08-03 Thread GitBox


jolshan opened a new pull request #11171:
URL: https://github.com/apache/kafka/pull/11171


   Most of [KAFKA-13132](https://issues.apache.org/jira/browse/KAFKA-13132) has 
been resolved, but there is one part of one case not covered.
   From the ticket:
   `2. We only assign the topic ID when we are associating the log with the 
partition in replicamanager for the first time`
   
   We covered the case where the log is already existing when the leader epoch 
is _equal_ (ie, no updates besides the topic ID), but we don't cover the update 
case where the leader epoch is bumped and we already have the log associated to 
the partition. 
   
   This PR ensures we correctly assign topic ID in the makeLeaders/Followers 
path when the log already exists.
   I've also added a test for the bumped leader epoch scenario.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] codefactor commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.19.3

2021-08-03 Thread GitBox


codefactor commented on pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#issuecomment-892207657


   Any plans that this will be released?
   
   It looks like this upgrade happens to include the following issue fix in 
rocksdb:
   https://github.com/facebook/rocksdb/issues/6703
   
   That fixes 2 CVEs:
   https://nvd.nist.gov/vuln/detail/CVE-2019-12900
   https://nvd.nist.gov/vuln/detail/CVE-2016-3189
   
   My application is blocked by security scans due to these, and we need a new 
release of kafka-streams to get those fixes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12395) Drop topic mapKey in DeleteTopics response

2021-08-03 Thread Hiro Kuwabara (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hiro Kuwabara reassigned KAFKA-12395:
-

Assignee: Hiro Kuwabara

> Drop topic mapKey in DeleteTopics response
> --
>
> Key: KAFKA-12395
> URL: https://issues.apache.org/jira/browse/KAFKA-12395
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Hiro Kuwabara
>Priority: Major
>
> Now that DeleteTopic requests/responses may be keyed by topicId, the use of 
> the the topic name as a map key in the response makes less sense. We should 
> consider dropping it. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.

2021-08-03 Thread GitBox


mjsax commented on a change in pull request #10602:
URL: https://github.com/apache/kafka/pull/10602#discussion_r682120783



##
File path: 
streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
##
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.time.Instant;
+
+public class SmokeTestUtil {
+
+final static int END = Integer.MAX_VALUE;
+
+static ProcessorSupplier printProcessorSupplier(final 
String topic) {
+return printProcessorSupplier(topic, "");
+}
+
+static ProcessorSupplier printProcessorSupplier(final 
String topic, final String name) {
+return new ProcessorSupplier() {
+@Override
+public Processor get() {
+return new AbstractProcessor() {
+private int numRecordsProcessed = 0;
+private long smallestOffset = Long.MAX_VALUE;
+private long largestOffset = Long.MIN_VALUE;
+
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+System.out.println("[DEV] initializing processor: 
topic=" + topic + " taskId=" + context.taskId());

Review comment:
   Why `DEV` -- should be `2.8`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan opened a new pull request #11170: KAFKA-13102: Topic IDs not propagated to metadata cache quickly enough for Fetch path

2021-08-03 Thread GitBox


jolshan opened a new pull request #11170:
URL: https://github.com/apache/kafka/pull/11170


   Before we used the metadata cache to determine whether or not to use topic 
IDs. Unfortunately, metadata cache updates with ZK controllers are in a 
separate request and may be too slow for the fetcher thread. This results in 
switching between topic names and topic IDs for topics that could just use IDs. 
   
   This change adds topic IDs to FetcherState created in LeaderAndIsr requests. 
It also supports updating this state for follower threads as soon as a 
LeaderAndIsr request provides a topic ID. 
   
   I've opted to only update replica fetcher threads. Alter Log Dir threads 
will use either topic name or topic ID depending on what was present when they 
were created. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.

2021-08-03 Thread GitBox


mjsax commented on a change in pull request #10602:
URL: https://github.com/apache/kafka/pull/10602#discussion_r682119892



##
File path: 
streams/upgrade-system-tests-28/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
##
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
+
+public class SmokeTestClient extends SmokeTestUtil {
+
+private final String name;
+
+private KafkaStreams streams;
+private boolean uncaughtException = false;
+private boolean started;
+private volatile boolean closed;
+
+private static void addShutdownHook(final String name, final Runnable 
runnable) {
+if (name != null) {
+Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, 
runnable));
+} else {
+Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+}
+}
+
+private static File tempDirectory() {
+final String prefix = "kafka-";
+final File file;
+try {
+file = Files.createTempDirectory(prefix).toFile();
+} catch (final IOException ex) {
+throw new RuntimeException("Failed to create a temp dir", ex);
+}
+file.deleteOnExit();
+
+addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+try {
+Utils.delete(file);
+} catch (final IOException e) {
+System.out.println("Error deleting " + file.getAbsolutePath());
+e.printStackTrace(System.out);
+}
+});
+
+return file;
+}
+
+public SmokeTestClient(final String name) {
+this.name = name;
+}
+
+public boolean started() {
+return started;
+}
+
+public boolean closed() {
+return closed;
+}
+
+public void start(final Properties streamsProperties) {
+final Topology build = getTopology();
+streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
+
+final CountDownLatch countDownLatch = new CountDownLatch(1);
+streams.setStateListener((newState, oldState) -> {
+System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), 
oldState, newState);
+if (oldState == KafkaStreams.State.REBALANCING && newState == 
KafkaStreams.State.RUNNING) {
+started = true;
+countDownLatch.countDown();
+}
+
+if (newState == KafkaStreams.State.NOT_RUNNING) {
+closed = true;
+}
+});
+
+streams.setUncaughtExceptionHandler(e -> {
+System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+System.out.println(name + ": FATAL:

[GitHub] [kafka] mjsax commented on a change in pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.

2021-08-03 Thread GitBox


mjsax commented on a change in pull request #10602:
URL: https://github.com/apache/kafka/pull/10602#discussion_r682119041



##
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##
@@ -25,15 +25,17 @@
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.tests.streams.utils import extract_generation_from_logs, 
extract_generation_id
 from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, 
LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \
-LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, 
LATEST_2_6, LATEST_2_7, DEV_BRANCH, DEV_VERSION, KafkaVersion
+LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, 
LATEST_2_6, LATEST_2_7, LATEST_2_8, DEV_BRANCH, DEV_VERSION, KafkaVersion
 
 # broker 0.10.0 is not compatible with newer Kafka Streams versions
 broker_upgrade_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), 
str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), \
str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), 
str(LATEST_2_3), \
-   str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), 
str(LATEST_2_7), str(DEV_BRANCH)]
+   str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), 
str(LATEST_2_7), str(LATEST_2_8), str(DEV_BRANCH)]
 
 metadata_1_versions = [str(LATEST_0_10_0)]
 metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), 
str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
+metadata_3_10_versions = [str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), 
str(LATEST_2_3), str(LATEST_2_4),

Review comment:
   What is `3_10` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Reopened] (KAFKA-13132) Upgrading to topic IDs in LISR requests has gaps introduced in 3.0

2021-08-03 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan reopened KAFKA-13132:


Found an issue where we don't sufficiently cover case 2. I have a plan to 
properly cover.

 cc: [~kkonstantine] 

> Upgrading to topic IDs in LISR requests has gaps introduced in 3.0
> --
>
> Key: KAFKA-13132
> URL: https://issues.apache.org/jira/browse/KAFKA-13132
> Project: Kafka
>  Issue Type: Bug
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
> Fix For: 3.0.0
>
>
> With the change in 3.0 to how topic IDs are assigned to logs, a bug was 
> inadvertently introduced. Now, topic IDs will only be assigned on the load of 
> the log to a partition in LISR requests. This means we will only assign topic 
> IDs for newly created topics/partitions, on broker startup, or potentially 
> when a partition is reassigned.
>  
> In the case of upgrading from an IBP before 2.8, we may have a scenario where 
> we upgrade the controller to IBP 3.0 (or even 2.8) last. (Ie, the controller 
> is IBP < 2.8 and all other brokers are on the newest IBP) Upon the last 
> broker upgrading, we will elect a new controller but its LISR request will 
> not result in topic IDs being assigned to logs of existing topics. They will 
> only be assigned in the cases mentioned above.
> *Keep in mind, in this scenario, topic IDs will be still be assigned in the 
> controller/ZK to all new and pre-existing topics and will show up in 
> metadata.*  This means we are not ensured the same guarantees we had in 2.8. 
> *It is just the LISR/partition.metadata part of the code that is affected.* 
>  
> The problem is two-fold
>  1. We ignore LISR requests when the partition leader epoch has not increased 
> (previously we assigned the ID before this check)
>  2. We only assign the topic ID when we are associating the log with the 
> partition in replicamanager for the first time. Though in the scenario 
> described above, we have logs associated with partitions that need to be 
> upgraded.
>  
> We should check the if the LISR request is resulting in a topic ID addition 
> and add logic to logs already associated to partitions in replica manager.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] Borzoo opened a new pull request #11169: KAFKA-10859: Add test annotation to FileStreamSourceTaskTest.testInvalidFile and speed up the test

2021-08-03 Thread GitBox


Borzoo opened a new pull request #11169:
URL: https://github.com/apache/kafka/pull/11169


   *More detailed description of your change,
   Added the missing @Test annotation to a test in FileStreamSourceTaskTest. 
The test used to loop 100 times, each time blocking for 1 second. Checking the 
assertion more than once is unnecessary for this test.
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12994) Migrate all Tests to New API and Remove Suppression for Deprecation Warnings related to KIP-633

2021-08-03 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12994:
---
Labels: kip-633 newbie newbie++  (was: kip kip-633)

> Migrate all Tests to New API and Remove Suppression for Deprecation Warnings 
> related to KIP-633
> ---
>
> Key: KAFKA-12994
> URL: https://issues.apache.org/jira/browse/KAFKA-12994
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Affects Versions: 3.0.0
>Reporter: Israel Ekpo
>Priority: Major
>  Labels: kip-633, newbie, newbie++
> Fix For: 3.1.0
>
>
> Due to the API changes for KIP-633 a lot of deprecation warnings have been 
> generated in tests that are using the old deprecated APIs. There are a lot of 
> tests using the deprecated methods. We should absolutely migrate them all to 
> the new APIs and then get rid of all the applicable annotations for 
> suppressing the deprecation warnings.
> The applies to all Java and Scala examples and tests using the deprecated 
> APIs in the JoinWindows, SessionWindows, TimeWindows and SlidingWindows 
> classes.
>  
> This is based on the feedback from reviewers in this PR
>  
> https://github.com/apache/kafka/pull/10926



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12994) Migrate all Tests to New API and Remove Suppression for Deprecation Warnings related to KIP-633

2021-08-03 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-12994:
--

Assignee: A. Sophie Blee-Goldman

> Migrate all Tests to New API and Remove Suppression for Deprecation Warnings 
> related to KIP-633
> ---
>
> Key: KAFKA-12994
> URL: https://issues.apache.org/jira/browse/KAFKA-12994
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Affects Versions: 3.0.0
>Reporter: Israel Ekpo
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: kip-633, newbie, newbie++
> Fix For: 3.1.0
>
>
> Due to the API changes for KIP-633 a lot of deprecation warnings have been 
> generated in tests that are using the old deprecated APIs. There are a lot of 
> tests using the deprecated methods. We should absolutely migrate them all to 
> the new APIs and then get rid of all the applicable annotations for 
> suppressing the deprecation warnings.
> The applies to all Java and Scala examples and tests using the deprecated 
> APIs in the JoinWindows, SessionWindows, TimeWindows and SlidingWindows 
> classes.
>  
> This is based on the feedback from reviewers in this PR
>  
> https://github.com/apache/kafka/pull/10926



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12994) Migrate all Tests to New API and Remove Suppression for Deprecation Warnings related to KIP-633

2021-08-03 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-12994:
--

Assignee: (was: Israel Ekpo)

> Migrate all Tests to New API and Remove Suppression for Deprecation Warnings 
> related to KIP-633
> ---
>
> Key: KAFKA-12994
> URL: https://issues.apache.org/jira/browse/KAFKA-12994
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Affects Versions: 3.0.0
>Reporter: Israel Ekpo
>Priority: Major
>  Labels: kip, kip-633
> Fix For: 3.1.0
>
>
> Due to the API changes for KIP-633 a lot of deprecation warnings have been 
> generated in tests that are using the old deprecated APIs. There are a lot of 
> tests using the deprecated methods. We should absolutely migrate them all to 
> the new APIs and then get rid of all the applicable annotations for 
> suppressing the deprecation warnings.
> The applies to all Java and Scala examples and tests using the deprecated 
> APIs in the JoinWindows, SessionWindows, TimeWindows and SlidingWindows 
> classes.
>  
> This is based on the feedback from reviewers in this PR
>  
> https://github.com/apache/kafka/pull/10926



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12994) Migrate all Tests to New API and Remove Suppression for Deprecation Warnings related to KIP-633

2021-08-03 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-12994:
--

Assignee: (was: A. Sophie Blee-Goldman)

> Migrate all Tests to New API and Remove Suppression for Deprecation Warnings 
> related to KIP-633
> ---
>
> Key: KAFKA-12994
> URL: https://issues.apache.org/jira/browse/KAFKA-12994
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Affects Versions: 3.0.0
>Reporter: Israel Ekpo
>Priority: Major
>  Labels: kip-633, newbie, newbie++
> Fix For: 3.1.0
>
>
> Due to the API changes for KIP-633 a lot of deprecation warnings have been 
> generated in tests that are using the old deprecated APIs. There are a lot of 
> tests using the deprecated methods. We should absolutely migrate them all to 
> the new APIs and then get rid of all the applicable annotations for 
> suppressing the deprecation warnings.
> The applies to all Java and Scala examples and tests using the deprecated 
> APIs in the JoinWindows, SessionWindows, TimeWindows and SlidingWindows 
> classes.
>  
> This is based on the feedback from reviewers in this PR
>  
> https://github.com/apache/kafka/pull/10926



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12994) Migrate all Tests to New API and Remove Suppression for Deprecation Warnings related to KIP-633

2021-08-03 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392530#comment-17392530
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12994:


Hey [~iekpo], I'm unassigning this in case someone else wants to pick it up. If 
you already started working on this and have a partial PR ready with some 
subset of the tests migrated over, you can just open that PR and we can merge 
this in pieces. It seems like a lot of tests so splitting it up into multiple 
PRs is probably a good idea anyway. (And obviously feel free to re-assign it to 
yourself if you want to continue working on it, and/or split this ticket up 
into sub-tasks covering different sets of tests)

> Migrate all Tests to New API and Remove Suppression for Deprecation Warnings 
> related to KIP-633
> ---
>
> Key: KAFKA-12994
> URL: https://issues.apache.org/jira/browse/KAFKA-12994
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Affects Versions: 3.0.0
>Reporter: Israel Ekpo
>Assignee: Israel Ekpo
>Priority: Major
>  Labels: kip, kip-633
> Fix For: 3.1.0
>
>
> Due to the API changes for KIP-633 a lot of deprecation warnings have been 
> generated in tests that are using the old deprecated APIs. There are a lot of 
> tests using the deprecated methods. We should absolutely migrate them all to 
> the new APIs and then get rid of all the applicable annotations for 
> suppressing the deprecation warnings.
> The applies to all Java and Scala examples and tests using the deprecated 
> APIs in the JoinWindows, SessionWindows, TimeWindows and SlidingWindows 
> classes.
>  
> This is based on the feedback from reviewers in this PR
>  
> https://github.com/apache/kafka/pull/10926



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12994) Migrate all Tests to New API and Remove Suppression for Deprecation Warnings related to KIP-633

2021-08-03 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12994:
---
Fix Version/s: (was: 3.0.0)
   3.1.0

> Migrate all Tests to New API and Remove Suppression for Deprecation Warnings 
> related to KIP-633
> ---
>
> Key: KAFKA-12994
> URL: https://issues.apache.org/jira/browse/KAFKA-12994
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Affects Versions: 3.0.0
>Reporter: Israel Ekpo
>Assignee: Israel Ekpo
>Priority: Major
>  Labels: kip, kip-633
> Fix For: 3.1.0
>
>
> Due to the API changes for KIP-633 a lot of deprecation warnings have been 
> generated in tests that are using the old deprecated APIs. There are a lot of 
> tests using the deprecated methods. We should absolutely migrate them all to 
> the new APIs and then get rid of all the applicable annotations for 
> suppressing the deprecation warnings.
> The applies to all Java and Scala examples and tests using the deprecated 
> APIs in the JoinWindows, SessionWindows, TimeWindows and SlidingWindows 
> classes.
>  
> This is based on the feedback from reviewers in this PR
>  
> https://github.com/apache/kafka/pull/10926



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13160) Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft

2021-08-03 Thread Ryan Dielhenn (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Dielhenn updated KAFKA-13160:
--
Description: 
In a ZK cluster, dynamic default broker configs are stored in the zNode 
/brokers/. Without this fix, when dynamic configs from snapshots are 
processed by the brokers in KRaft, the BrokerConfigHandler checks if the 
resource name is "" to do a default update and converts the resource 
name to an integer otherwise to do a per-broker config update.

In KRaft, dynamic default broker configs are serialized in the quorum with 
empty string instead of "". This was causing the BrokerConfigHandler 
to throw a NumberFormatException for dynamic default broker configs since the 
resource name for them is not "" or a single integer. This handler 
should be fixed to expect empty string as the resource name for the dynamic 
default broker configs if using KRaft.

  was:
In a ZK cluster, dynamic default broker configs are stored in the zNode 
/brokers/. Without this fix, when dynamic configs from snapshots are 
processed by the brokers in KRaft, the BrokerConfigHandler checks if the 
resource name is "" to do a default update and converts the resource 
name to an integer otherwise to do a per-broker config update.

In KRaft dynamic default broker configs are serialized in the quorum with empty 
string instead of "". This was causing the BrokerConfigHandler to 
throw a NumberFormatException for dynamic default broker configs since the 
resource name for them is not "" or a single integer. This handler 
should be fixed to expect empty string as the resource name for the dynamic 
default broker configs if using KRaft.


> Fix BrokerConfigHandler to expect empty string as the resource name for 
> dynamic default broker configs in KRaft
> ---
>
> Key: KAFKA-13160
> URL: https://issues.apache.org/jira/browse/KAFKA-13160
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In a ZK cluster, dynamic default broker configs are stored in the zNode 
> /brokers/. Without this fix, when dynamic configs from snapshots are 
> processed by the brokers in KRaft, the BrokerConfigHandler checks if the 
> resource name is "" to do a default update and converts the resource 
> name to an integer otherwise to do a per-broker config update.
> In KRaft, dynamic default broker configs are serialized in the quorum with 
> empty string instead of "". This was causing the BrokerConfigHandler 
> to throw a NumberFormatException for dynamic default broker configs since the 
> resource name for them is not "" or a single integer. This handler 
> should be fixed to expect empty string as the resource name for the dynamic 
> default broker configs if using KRaft.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13160) Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft

2021-08-03 Thread Ryan Dielhenn (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Dielhenn updated KAFKA-13160:
--
Description: 
In a ZK cluster, dynamic default broker configs are stored in the zNode 
/brokers/. Without this fix, when dynamic configs from snapshots are 
processed by the brokers in KRaft, the BrokerConfigHandler checks if the 
resource name is "" to do a default update and converts the resource 
name to an integer otherwise to do a per-broker config update.

In KRaft dynamic default broker configs are serialized in the quorum with empty 
string instead of "". This was causing the BrokerConfigHandler to 
throw a NumberFormatException for dynamic default broker configs since the 
resource name for them is not "" or a single integer. This handler 
should be fixed to expect empty string as the resource name for the dynamic 
default broker configs if using KRaft.

  was:In a ZK cluster, dynamic default broker configs are stored in the zNode 
/brokers/. Without this fix, when dynamic configs from snapshots are 
processed by the brokers in KRaft, the BrokerConfigHandler checks if the 
resource name is "" to do a default update and converts the resource 
name to an integer otherwise to do a per-broker config update. In KRaft dynamic 
default broker configs are serialized in the quorum with empty string instead 
of "". This was causing the BrokerConfigHandler to throw a 
NumberFormatException for dynamic default broker configs since the resource 
name for them is not "" or a single integer. This handler should be 
fixed to expect empty string as the resource name for the dynamic default 
broker configs if using KRaft.


> Fix BrokerConfigHandler to expect empty string as the resource name for 
> dynamic default broker configs in KRaft
> ---
>
> Key: KAFKA-13160
> URL: https://issues.apache.org/jira/browse/KAFKA-13160
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In a ZK cluster, dynamic default broker configs are stored in the zNode 
> /brokers/. Without this fix, when dynamic configs from snapshots are 
> processed by the brokers in KRaft, the BrokerConfigHandler checks if the 
> resource name is "" to do a default update and converts the resource 
> name to an integer otherwise to do a per-broker config update.
> In KRaft dynamic default broker configs are serialized in the quorum with 
> empty string instead of "". This was causing the BrokerConfigHandler 
> to throw a NumberFormatException for dynamic default broker configs since the 
> resource name for them is not "" or a single integer. This handler 
> should be fixed to expect empty string as the resource name for the dynamic 
> default broker configs if using KRaft.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13160) Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft

2021-08-03 Thread Ryan Dielhenn (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Dielhenn updated KAFKA-13160:
--
Description: In a ZK cluster, dynamic default broker configs are stored in 
the zNode /brokers/. Without this fix, when dynamic configs from 
snapshots are processed by the brokers in KRaft, the BrokerConfigHandler checks 
if the resource name is "" to do a default update and converts the 
resource name to an integer otherwise to do a per-broker config update. In 
KRaft dynamic default broker configs are serialized in the quorum with empty 
string instead of "". This was causing the BrokerConfigHandler to 
throw a NumberFormatException for dynamic default broker configs since the 
resource name for them is not "" or a single integer. This handler 
should be fixed to expect empty string as the resource name for the dynamic 
default broker configs if using KRaft.  (was: In a ZK cluster, dynamic default 
broker configs are stored in the zNode /brokers/. When dynamic configs 
from snapshots are processed by the brokers, the BrokerConfigHandler checks if 
the resource name is "" to do a default update and converts the 
resource name to an integer otherwise to do a per-broker config update. In 
KRaft dynamic default broker configs are serialized in the quorum with empty 
string instead of "". This was causing the BrokerConfigHandler to 
throw a NumberFormatException for dynamic default broker configs since the 
resource name for them is not "" or a single integer. This handler 
should be fixed to expect empty string as the resource name for the dynamic 
default broker configs if using KRaft.)

> Fix BrokerConfigHandler to expect empty string as the resource name for 
> dynamic default broker configs in KRaft
> ---
>
> Key: KAFKA-13160
> URL: https://issues.apache.org/jira/browse/KAFKA-13160
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In a ZK cluster, dynamic default broker configs are stored in the zNode 
> /brokers/. Without this fix, when dynamic configs from snapshots are 
> processed by the brokers in KRaft, the BrokerConfigHandler checks if the 
> resource name is "" to do a default update and converts the resource 
> name to an integer otherwise to do a per-broker config update. In KRaft 
> dynamic default broker configs are serialized in the quorum with empty string 
> instead of "". This was causing the BrokerConfigHandler to throw a 
> NumberFormatException for dynamic default broker configs since the resource 
> name for them is not "" or a single integer. This handler should be 
> fixed to expect empty string as the resource name for the dynamic default 
> broker configs if using KRaft.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13160) Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft

2021-08-03 Thread Ryan Dielhenn (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Dielhenn updated KAFKA-13160:
--
Description: In a ZK cluster, dynamic default broker configs are stored in 
the zNode /brokers/. When dynamic configs from snapshots are processed 
by the brokers, the BrokerConfigHandler checks if the resource name is 
"" to do a default update and converts the resource name to an integer 
otherwise to do a per-broker config update. In KRaft dynamic default broker 
configs are serialized in the quorum with empty string instead of "". 
This was causing the BrokerConfigHandler to throw a NumberFormatException for 
dynamic default broker configs since the resource name for them is not 
"" or a single integer. This handler should be fixed to expect empty 
string as the resource name for the dynamic default broker configs if using 
KRaft.  (was: In a ZK cluster, dynamic default broker configs are stored in the 
zNode /brokers/. When these config snapshots are processed by the 
brokers, the BrokerConfigHandler checks if the resource name is "" to 
do a default update and converts the resource name to an integer otherwise to 
do a per-broker config update. In KRaft dynamic default broker configs are 
serialized in the quorum with empty string instead of "". This was 
causing the BrokerConfigHandler to throw a NumberFormatException for dynamic 
default broker configs since the resource name for them is not "" or a 
single integer. This handler should be fixed to expect empty string as the 
resource name for the dynamic default broker configs if using KRaft.)

> Fix BrokerConfigHandler to expect empty string as the resource name for 
> dynamic default broker configs in KRaft
> ---
>
> Key: KAFKA-13160
> URL: https://issues.apache.org/jira/browse/KAFKA-13160
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In a ZK cluster, dynamic default broker configs are stored in the zNode 
> /brokers/. When dynamic configs from snapshots are processed by the 
> brokers, the BrokerConfigHandler checks if the resource name is "" 
> to do a default update and converts the resource name to an integer otherwise 
> to do a per-broker config update. In KRaft dynamic default broker configs are 
> serialized in the quorum with empty string instead of "". This was 
> causing the BrokerConfigHandler to throw a NumberFormatException for dynamic 
> default broker configs since the resource name for them is not "" or 
> a single integer. This handler should be fixed to expect empty string as the 
> resource name for the dynamic default broker configs if using KRaft.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13160) Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft

2021-08-03 Thread Ryan Dielhenn (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Dielhenn updated KAFKA-13160:
--
Description: In a ZK cluster, dynamic default broker configs are stored in 
the zNode /brokers/. When these config snapshots are processed by the 
brokers, the BrokerConfigHandler checks if the resource name is "" to 
do a default update and converts the resource name to an integer otherwise to 
do a per-broker config update. In KRaft dynamic default broker configs are 
serialized in the quorum with empty string instead of "". This was 
causing the BrokerConfigHandler to throw a NumberFormatException for dynamic 
default broker configs since the resource name for them is not "" or a 
single integer. This handler should be fixed to expect empty string as the 
resource name for the dynamic default broker configs if using KRaft.  (was: In 
a ZK cluster, dynamic default broker configs are stored in the zNode 
/brokers/. When these configs are sent to the brokers in a fetch 
response, the BrokerConfigHandler checks if the resource name is "" to 
do a default update and converts the resource name to an integer otherwise to 
do a per-broker config update. In KRaft dynamic default broker configs are 
serialized in the quorum with empty string instead of "". This was 
causing the BrokerConfigHandler to throw a NumberFormatException for dynamic 
default broker configs since the resource name for them is not "" or a 
singular integer. This handler should be fixed to expect empty string as the 
resource name for the dynamic default broker configs if using KRaft.)

> Fix BrokerConfigHandler to expect empty string as the resource name for 
> dynamic default broker configs in KRaft
> ---
>
> Key: KAFKA-13160
> URL: https://issues.apache.org/jira/browse/KAFKA-13160
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In a ZK cluster, dynamic default broker configs are stored in the zNode 
> /brokers/. When these config snapshots are processed by the brokers, 
> the BrokerConfigHandler checks if the resource name is "" to do a 
> default update and converts the resource name to an integer otherwise to do a 
> per-broker config update. In KRaft dynamic default broker configs are 
> serialized in the quorum with empty string instead of "". This was 
> causing the BrokerConfigHandler to throw a NumberFormatException for dynamic 
> default broker configs since the resource name for them is not "" or 
> a single integer. This handler should be fixed to expect empty string as the 
> resource name for the dynamic default broker configs if using KRaft.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah edited a comment on pull request #11166: KAFKA-13159 Enable additional transaction system tests in KRaft

2021-08-03 Thread GitBox


mumrah edited a comment on pull request #11166:
URL: https://github.com/apache/kafka/pull/11166#issuecomment-892069170


   Streams smoke test: 
~https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4630/~
   https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4631/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12646) Implement snapshot generation on brokers

2021-08-03 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio resolved KAFKA-12646.

Resolution: Fixed

> Implement snapshot generation on brokers
> 
>
> Key: KAFKA-12646
> URL: https://issues.apache.org/jira/browse/KAFKA-12646
> Project: Kafka
>  Issue Type: Sub-task
>  Components: controller
>Reporter: Jose Armando Garcia Sancio
>Assignee: Colin McCabe
>Priority: Major
>  Labels: kip-500
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12647) Implement loading snapshot in the broker

2021-08-03 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio resolved KAFKA-12647.

Resolution: Fixed

> Implement loading snapshot in the broker
> 
>
> Key: KAFKA-12647
> URL: https://issues.apache.org/jira/browse/KAFKA-12647
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jose Armando Garcia Sancio
>Assignee: Colin McCabe
>Priority: Major
>  Labels: kip-500
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

2021-08-03 Thread GitBox


mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r682020580



##
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##
@@ -628,26 +634,43 @@ private[log] class Cleaner(val id: Int,
* @param sourceRecords The dirty log segment
* @param dest The cleaned log segment
* @param map The key=>offset mapping
-   * @param retainDeletesAndTxnMarkers Should tombstones and markers be 
retained while cleaning this segment
+   * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than 
version 2) and markers be retained while cleaning this segment
+   * @param deleteRetentionMs Defines how long a tombstone should be kept as 
defined by log configuration
* @param maxLogMessageSize The maximum message size of the corresponding 
topic
* @param stats Collector for cleaning statistics
+   * @param currentTime The time at which the clean was initiated

Review comment:
   It seems like we don't need the return value at all, since we would only 
be using it to track the latestDeleteHorizon in the Log, but it doesn't seem 
like we need that either. I'm removing it

##
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##
@@ -628,26 +634,43 @@ private[log] class Cleaner(val id: Int,
* @param sourceRecords The dirty log segment
* @param dest The cleaned log segment
* @param map The key=>offset mapping
-   * @param retainDeletesAndTxnMarkers Should tombstones and markers be 
retained while cleaning this segment
+   * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than 
version 2) and markers be retained while cleaning this segment
+   * @param deleteRetentionMs Defines how long a tombstone should be kept as 
defined by log configuration
* @param maxLogMessageSize The maximum message size of the corresponding 
topic
* @param stats Collector for cleaning statistics
+   * @param currentTime The time at which the clean was initiated

Review comment:
   It seems like we don't need the return value at all, since we would only 
be using it to track the latestDeleteHorizon in the Log, but it doesn't seem 
like we need that either. I'm going to remove it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13160) Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft

2021-08-03 Thread Ryan Dielhenn (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Dielhenn updated KAFKA-13160:
--
Description: In a ZK cluster, dynamic default broker configs are stored in 
the zNode /brokers/. When these configs are sent to the brokers in a 
fetch response, the BrokerConfigHandler checks if the resource name is 
"" to do a default update and converts the resource name to an integer 
otherwise to do a per-broker config update. In KRaft dynamic default broker 
configs are serialized in the quorum with empty string instead of "". 
This was causing the BrokerConfigHandler to throw a NumberFormatException for 
dynamic default broker configs since the resource name for them is not 
"" or a singular integer. This handler should be fixed to expect empty 
string as the resource name for the dynamic default broker configs if using 
KRaft.  (was: In a ZK cluster, dynamic default broker configs are stored in the 
zNode /brokers/. When these configs are sent to the brokers in a fetch 
response, the BrokerConfigHandler checks if the resource name is "" to 
do a default update and converts the resource name to an integer otherwise to 
do a per-broker config update. In KRaft dynamic default broker configs are 
serialized in the quorum with empty string instead of "". This was 
causing the BrokerConfigHandler to throw a NumberFormatException for dynamic 
default broker configs since the resource name for them is not "" or a 
singular integer. This handler should be fixed to expect empty string for the 
dynamic default broker configs if using KRaft.)

> Fix BrokerConfigHandler to expect empty string as the resource name for 
> dynamic default broker configs in KRaft
> ---
>
> Key: KAFKA-13160
> URL: https://issues.apache.org/jira/browse/KAFKA-13160
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In a ZK cluster, dynamic default broker configs are stored in the zNode 
> /brokers/. When these configs are sent to the brokers in a fetch 
> response, the BrokerConfigHandler checks if the resource name is "" 
> to do a default update and converts the resource name to an integer otherwise 
> to do a per-broker config update. In KRaft dynamic default broker configs are 
> serialized in the quorum with empty string instead of "". This was 
> causing the BrokerConfigHandler to throw a NumberFormatException for dynamic 
> default broker configs since the resource name for them is not "" or 
> a singular integer. This handler should be fixed to expect empty string as 
> the resource name for the dynamic default broker configs if using KRaft.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dielhennr opened a new pull request #11168: KAFKA-13160: Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft

2021-08-03 Thread GitBox


dielhennr opened a new pull request #11168:
URL: https://github.com/apache/kafka/pull/11168


   The KRaft brokers are throwing NumberFormatException when processing dynamic 
default broker config updates because they expect the default entity name that 
was used in zookeeper to be the resource name for dynamic default broker 
configs instead of empty string. 
   
   https://issues.apache.org/jira/browse/KAFKA-13160


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

2021-08-03 Thread GitBox


mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r682018753



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -270,7 +270,8 @@ class Log(@volatile private var _dir: File,
   val producerStateManager: ProducerStateManager,
   logDirFailureChannel: LogDirFailureChannel,
   @volatile private var _topicId: Option[Uuid],
-  val keepPartitionMetadataFile: Boolean) extends Logging with 
KafkaMetricsGroup {
+  val keepPartitionMetadataFile: Boolean,
+  @volatile var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP) 
extends Logging with KafkaMetricsGroup {

Review comment:
   hmm it seems like we only use it in a test. That goes with the return 
value that was added into the `cleanInto` method in the LogCleaner. I'm going 
to remove these and see if I can take another approach in the testing 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wycccccc opened a new pull request #11167: Kafka-13158 Replace EasyMock and PowerMock with Mockito for ConnectClusterStateImpl Test and ConnectorPluginsResourceTest

2021-08-03 Thread GitBox


wycc opened a new pull request #11167:
URL: https://github.com/apache/kafka/pull/11167


   Development of EasyMock and PowerMock has stagnated while Mockito continues 
to be actively developed. With the new Java cadence, it's a problem to depend 
on libraries that do bytecode generation and are not actively maintained. In 
addition, Mockito is also easier to 
[use.KAFKA-7438](https://issues.apache.org/jira/browse/KAFKA-7438)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13160) Fix BrokerConfigHandler to expect empty string as the resource name for dynamic default broker configs in KRaft

2021-08-03 Thread Ryan Dielhenn (Jira)
Ryan Dielhenn created KAFKA-13160:
-

 Summary: Fix BrokerConfigHandler to expect empty string as the 
resource name for dynamic default broker configs in KRaft
 Key: KAFKA-13160
 URL: https://issues.apache.org/jira/browse/KAFKA-13160
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.0.0
Reporter: Ryan Dielhenn
Assignee: Ryan Dielhenn
 Fix For: 3.0.0


In a ZK cluster, dynamic default broker configs are stored in the zNode 
/brokers/. When these configs are sent to the brokers in a fetch 
response, the BrokerConfigHandler checks if the resource name is "" to 
do a default update and converts the resource name to an integer otherwise to 
do a per-broker config update. In KRaft dynamic default broker configs are 
serialized in the quorum with empty string instead of "". This was 
causing the BrokerConfigHandler to throw a NumberFormatException for dynamic 
default broker configs since the resource name for them is not "" or a 
singular integer. This handler should be fixed to expect empty string for the 
dynamic default broker configs if using KRaft.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mattwong949 commented on a change in pull request #10914: [KAKFA-8522] Streamline tombstone and transaction marker removal

2021-08-03 Thread GitBox


mattwong949 commented on a change in pull request #10914:
URL: https://github.com/apache/kafka/pull/10914#discussion_r682012434



##
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##
@@ -628,26 +634,43 @@ private[log] class Cleaner(val id: Int,
* @param sourceRecords The dirty log segment
* @param dest The cleaned log segment
* @param map The key=>offset mapping
-   * @param retainDeletesAndTxnMarkers Should tombstones and markers be 
retained while cleaning this segment
+   * @param retainLegacyDeletesAndTxnMarkers Should tombstones (lower than 
version 2) and markers be retained while cleaning this segment
+   * @param deleteRetentionMs Defines how long a tombstone should be kept as 
defined by log configuration
* @param maxLogMessageSize The maximum message size of the corresponding 
topic
* @param stats Collector for cleaning statistics
+   * @param currentTime The time at which the clean was initiated
*/
   private[log] def cleanInto(topicPartition: TopicPartition,
  sourceRecords: FileRecords,
  dest: LogSegment,
  map: OffsetMap,
- retainDeletesAndTxnMarkers: Boolean,
+ retainLegacyDeletesAndTxnMarkers: Boolean,
+ deleteRetentionMs: Long,
  maxLogMessageSize: Int,
  transactionMetadata: CleanedTransactionMetadata,
  lastRecordsOfActiveProducers: Map[Long, 
LastRecord],
- stats: CleanerStats): Unit = {
-val logCleanerFilter: RecordFilter = new RecordFilter {
+ stats: CleanerStats,
+ currentTime: Long): Long = {
+var latestDeleteHorizon: Long = RecordBatch.NO_TIMESTAMP
+
+val logCleanerFilter: RecordFilter = new RecordFilter(currentTime, 
deleteRetentionMs) {
   var discardBatchRecords: Boolean = _
 
-  override def checkBatchRetention(batch: RecordBatch): BatchRetention = {
+  override def checkBatchRetention(batch: RecordBatch): 
RecordFilter.BatchRetentionResult = {
 // we piggy-back on the tombstone retention logic to delay deletion of 
transaction markers.
 // note that we will never delete a marker until all the records from 
that transaction are removed.
-discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, 
retainTxnMarkers = retainDeletesAndTxnMarkers)
+val canDiscardBatch = shouldDiscardBatch(batch, transactionMetadata)
+
+if (batch.isControlBatch) {
+  if (batch.magic() < RecordBatch.MAGIC_VALUE_V2) {

Review comment:
   ah right, I didn't catch this. Seems like we don't need this block then, 
and we can just move into this check if it's a Control Batch then
   ```
   discardBatchRecords = canDiscardBatch && batch.deleteHorizonMs().isPresent 
&& batch.deleteHorizonMs().getAsLong <= currentTime
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13159) Enable system tests for transactions in KRaft mode

2021-08-03 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-13159:
-
Fix Version/s: 3.1.0

> Enable system tests for transactions in KRaft mode
> --
>
> Key: KAFKA-13159
> URL: https://issues.apache.org/jira/browse/KAFKA-13159
> Project: Kafka
>  Issue Type: Test
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Critical
> Fix For: 3.0.0, 3.1.0
>
>
> Previously, we disabled several system tests involving system tests in KRaft 
> mode. Now that KIP-730 is complete and transactions work in KRaft, we need to 
> re-enable these tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rondagostino commented on a change in pull request #11166: KAFKA-13159 Enable additional transaction system tests in KRaft

2021-08-03 Thread GitBox


rondagostino commented on a change in pull request #11166:
URL: https://github.com/apache/kafka/pull/11166#discussion_r682008773



##
File path: tests/kafkatest/tests/core/replication_test.py
##
@@ -122,14 +122,16 @@ def min_cluster_size(self):
 @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", 
"hard_bounce"],
 broker_type=["leader"],
 security_protocol=["PLAINTEXT"],
-enable_idempotence=[True])
+enable_idempotence=[True],
+metadata_quorum=quorum.all_non_upgrade)
 @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", 
"hard_bounce"],
 broker_type=["leader"],
 security_protocol=["PLAINTEXT", "SASL_SSL"],
 metadata_quorum=quorum.all_non_upgrade)
 @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", 
"hard_bounce"],
 broker_type=["controller"],
-security_protocol=["PLAINTEXT", "SASL_SSL"])
+security_protocol=["PLAINTEXT", "SASL_SSL"],
+metadata_quorum=quorum.all_non_upgrade)

Review comment:
   I wonder if this is going to fail.  The below code seems to be incorrect:
   ```
   if failure_mode == "controller" and metadata_quorum != quorum.zk:
   raise Exception("There is no controller broker when using KRaft 
metadata quorum")
   ```
   
   That maybe should be checking for `broker_type` instead of `failure_mode`.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13159) Enable system tests for transactions in KRaft mode

2021-08-03 Thread David Arthur (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392468#comment-17392468
 ] 

David Arthur commented on KAFKA-13159:
--

The main transactions_test.py were enabled on 3.0 branch here 
https://github.com/apache/kafka/pull/11165

> Enable system tests for transactions in KRaft mode
> --
>
> Key: KAFKA-13159
> URL: https://issues.apache.org/jira/browse/KAFKA-13159
> Project: Kafka
>  Issue Type: Test
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Critical
> Fix For: 3.0.0, 3.1.0
>
>
> Previously, we disabled several system tests involving system tests in KRaft 
> mode. Now that KIP-730 is complete and transactions work in KRaft, we need to 
> re-enable these tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13159) Enable system tests for transactions in KRaft mode

2021-08-03 Thread David Arthur (Jira)
David Arthur created KAFKA-13159:


 Summary: Enable system tests for transactions in KRaft mode
 Key: KAFKA-13159
 URL: https://issues.apache.org/jira/browse/KAFKA-13159
 Project: Kafka
  Issue Type: Test
Reporter: David Arthur
Assignee: David Arthur
 Fix For: 3.0.0


Previously, we disabled several system tests involving system tests in KRaft 
mode. Now that KIP-730 is complete and transactions work in KRaft, we need to 
re-enable these tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12997) Expose log record append time to the controller/broker

2021-08-03 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio resolved KAFKA-12997.

Resolution: Fixed

> Expose log record append time to the controller/broker
> --
>
> Key: KAFKA-12997
> URL: https://issues.apache.org/jira/browse/KAFKA-12997
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Niket Goel
>Assignee: Jose Armando Garcia Sancio
>Priority: Minor
>  Labels: kip-500
>
> The snapshot records are generated by each individual quorum participant 
> which also stamps the append time in the records. These appends times are 
> generated from a different clock (except in the case of the quorum leader) as 
> compared to the metadata log records (where timestamps are stamped by the 
> leader).
> To enable having a single clock to compare timestamps, 
> https://issues.apache.org/jira/browse/KAFKA-12952 adds a timestamp field to 
> the snapshot header which should contain the append time of the highest 
> record contained in the snapshot (which will be in leader time).
> This JIRA tracks exposing and wiring the batch timestamp such that it can be 
> provided to the SnapshotWriter at the time of snapshot creation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13158) Replace EasyMock and PowerMock with Mockito for ConnectClusterStateImpl Test and ConnectorPluginsResourceTest

2021-08-03 Thread YI-CHEN WANG (Jira)
YI-CHEN WANG created KAFKA-13158:


 Summary: Replace EasyMock and PowerMock with Mockito for 
ConnectClusterStateImpl Test and ConnectorPluginsResourceTest
 Key: KAFKA-13158
 URL: https://issues.apache.org/jira/browse/KAFKA-13158
 Project: Kafka
  Issue Type: Sub-task
Reporter: YI-CHEN WANG
Assignee: YI-CHEN WANG






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah commented on pull request #11166: MINOR Enable additional transaction system tests in KRaft

2021-08-03 Thread GitBox


mumrah commented on pull request #11166:
URL: https://github.com/apache/kafka/pull/11166#issuecomment-892069170


   Streams smoke test: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4630/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on pull request #11166: MINOR Enable additional transaction system tests in KRaft

2021-08-03 Thread GitBox


mumrah commented on pull request #11166:
URL: https://github.com/apache/kafka/pull/11166#issuecomment-892068893


   Group mode transactions: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4629/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12973) Update KIP and dev mailing list

2021-08-03 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio reassigned KAFKA-12973:
--

Assignee: Jose Armando Garcia Sancio

> Update KIP and dev mailing list
> ---
>
> Key: KAFKA-12973
> URL: https://issues.apache.org/jira/browse/KAFKA-12973
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> Update KIP-630 and the Kafka mailing list based on the small implementation 
> deviations from what is documented in the KIP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah commented on pull request #11166: MINOR Enable additional transaction system tests in KRaft

2021-08-03 Thread GitBox


mumrah commented on pull request #11166:
URL: https://github.com/apache/kafka/pull/11166#issuecomment-892068421


   Produce bench: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4628/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on pull request #11166: MINOR Enable additional transaction system tests in KRaft

2021-08-03 Thread GitBox


mumrah commented on pull request #11166:
URL: https://github.com/apache/kafka/pull/11166#issuecomment-892067696


   Replication test run: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4627/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah opened a new pull request #11166: MINOR Enable additional transaction system tests in KRaft

2021-08-03 Thread GitBox


mumrah opened a new pull request #11166:
URL: https://github.com/apache/kafka/pull/11166


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on pull request #11165: MINOR Enable transaction system tests for KRaft in 3.0

2021-08-03 Thread GitBox


mumrah commented on pull request #11165:
URL: https://github.com/apache/kafka/pull/11165#issuecomment-892065979


   Opening a follow-on PR for the additional tests mentioned by @rondagostino 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah merged pull request #11165: MINOR Enable transaction system tests for KRaft in 3.0

2021-08-03 Thread GitBox


mumrah merged pull request #11165:
URL: https://github.com/apache/kafka/pull/11165


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13157) Kafka-dump-log needs to support snapshot records

2021-08-03 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-13157:
--

 Summary: Kafka-dump-log needs to support snapshot records
 Key: KAFKA-13157
 URL: https://issues.apache.org/jira/browse/KAFKA-13157
 Project: Kafka
  Issue Type: Sub-task
  Components: kraft
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio


Extends the kafka-dump-log tool to allow the user to view and print kraft 
snapshot files



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >