[GitHub] [kafka] ttapjinda opened a new pull request #9006: KAFKA-3370 nearest offset reset
ttapjinda opened a new pull request #9006: URL: https://github.com/apache/kafka/pull/9006 - Introduce nearest.offset.reset option on the Consumer, which will only be used on OffsetOutOfRangeException. On enable, the offset will be reset to the earliest if out-of-range offset is not higher than the earliest offset, otherwise it will be reset to the latest offset. *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #8913: KAFKA-10191 fix flaky StreamsOptimizedTest
ijuma commented on pull request #8913: URL: https://github.com/apache/kafka/pull/8913#issuecomment-656511435 We need this in 2.6 for the reason you stated @ableegoldman This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
ijuma commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r452634516 ## File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ## @@ -239,9 +239,13 @@ class GroupMetadataManager(brokerId: Int, } } + /** + * @return Returning a map of successfully appended topic partitions and a flag indicting whether the HWM has been + * incremented. The caller ought to complete delayed requests for those returned partitions. + */ def storeGroup(group: GroupMetadata, groupAssignment: Map[String, Array[Byte]], - responseCallback: Errors => Unit): Unit = { + responseCallback: Errors => Unit): Map[TopicPartition, LeaderHWChange] = { Review comment: It's weird to have a method that invokes a callback and returns a result. Do we need both? We have a number of other methods that do something similar. It would be good to reconsider that as it's difficult to reason about usage in such cases. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
ijuma commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r452634516 ## File path: core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ## @@ -239,9 +239,13 @@ class GroupMetadataManager(brokerId: Int, } } + /** + * @return Returning a map of successfully appended topic partitions and a flag indicting whether the HWM has been + * incremented. The caller ought to complete delayed requests for those returned partitions. + */ def storeGroup(group: GroupMetadata, groupAssignment: Map[String, Array[Byte]], - responseCallback: Errors => Unit): Unit = { + responseCallback: Errors => Unit): Map[TopicPartition, LeaderHWChange] = { Review comment: It's weird to have a method that invokes a callback and returns a result. Do we need both? ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -65,13 +65,24 @@ import scala.compat.java8.OptionConverters._ /* * Result metadata of a log append operation on the log */ -case class LogAppendResult(info: LogAppendInfo, exception: Option[Throwable] = None) { +case class LogAppendResult(info: LogAppendInfo, + exception: Option[Throwable] = None, + leaderHWChange: LeaderHWChange = LeaderHWChange.None) { def error: Errors = exception match { case None => Errors.NONE case Some(e) => Errors.forException(e) } } +/** + * a flag indicting whether the HWM has been changed. + */ +sealed trait LeaderHWChange +object LeaderHWChange { + case object LeaderHWIncremented extends LeaderHWChange Review comment: The usual naming convention is to only capitalize the first letter, eg LeaderHwChange. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted
mjsax commented on a change in pull request #8994: URL: https://github.com/apache/kafka/pull/8994#discussion_r452622160 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -193,6 +196,26 @@ private void closeAndRevive(final Map> taskWith log.error("Error suspending corrupted task {} ", task.id(), swallow); } task.closeDirty(); +if (task.isActive()) { +// Pause so we won't poll any more records for this task until it has been re-initialized +// Note, closeDirty already clears the partitiongroup for the task. +final Set currentAssignment = mainConsumer().assignment(); +final Set assignedToPauseAndReset = +Utils.intersection(HashSet::new, currentAssignment, task.inputPartitions()); + +mainConsumer().pause(assignedToPauseAndReset); +final Map committed = mainConsumer().committed(assignedToPauseAndReset); +for (final Map.Entry committedEntry : committed.entrySet()) { +final OffsetAndMetadata offsetAndMetadata = committedEntry.getValue(); +if (offsetAndMetadata != null) { +mainConsumer().seek(committedEntry.getKey(), offsetAndMetadata); + assignedToPauseAndReset.remove(committedEntry.getKey()); +} +} +final Set remainder = resetter.apply(assignedToPauseAndReset); +// If anything didn't have a configured policy, reset to beginning +mainConsumer().seekToBeginning(remainder); Review comment: I think we should fail for this case, because if user configures "none" they request that we fail if we loose track of the offset. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted
mjsax commented on a change in pull request #8994: URL: https://github.com/apache/kafka/pull/8994#discussion_r452621814 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -193,6 +196,26 @@ private void closeAndRevive(final Map> taskWith log.error("Error suspending corrupted task {} ", task.id(), swallow); } task.closeDirty(); +if (task.isActive()) { +// Pause so we won't poll any more records for this task until it has been re-initialized +// Note, closeDirty already clears the partitiongroup for the task. +final Set currentAssignment = mainConsumer().assignment(); +final Set assignedToPauseAndReset = +Utils.intersection(HashSet::new, currentAssignment, task.inputPartitions()); Review comment: I agree that it should never happen. Maybe it's a test setup issue? Should we be worries to mask a potential bug by writing the code like this? Or should we rather fail hard (if we assume it would indicate a bug) and make sure the update the tests? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted
mjsax commented on a change in pull request #8994: URL: https://github.com/apache/kafka/pull/8994#discussion_r452620905 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -768,9 +770,30 @@ void runOnce() { private void resetInvalidOffsets(final InvalidOffsetException e) { final Set partitions = e.partitions(); +final Set notReset = resetOffsets(partitions); +if (!notReset.isEmpty()) { +final String notResetString = +notReset.stream() +.map(tp -> "topic " + tp.topic() + "(partition " + tp.partition() + ")") +.collect(Collectors.joining(",")); Review comment: Nit: As reset policy is set on a per topic basis, it's sufficient to list the topic names -- it does not add value if we list the partitions, because all assigned partitions would be affected anyway. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted
mjsax commented on a change in pull request #8994: URL: https://github.com/apache/kafka/pull/8994#discussion_r452620343 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -95,7 +96,7 @@ * | | Assigned (3)| <+ * | +-+---+ | * || | - * || | + * ||--+ Review comment: \cc @vvcephei This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10262) StateDirectory is not thread-safe
[ https://issues.apache.org/jira/browse/KAFKA-10262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-10262: Description: As explicitly stated in the StateDirectory javadocs, "This class is not thread-safe." Despite this, a single StateDirectory is shared among all the StreamThreads of a client. Some of the more "dangerous" methods are indeed synchronized, but others are not. For example, the innocent-sounding #directoryForTask is not thread-safe and is called in a number of places. We call it during task creation, and we call it during task closure (through StateDirectory#lock). It's not uncommon for one thread to be closing a task while another is creating it after a rebalance. In fact, we saw exactly that happen in our test application. This ultimately lead to the following exception {code:java} org.apache.kafka.streams.errors.ProcessorStateException: task directory [/mnt/run/streams/state/stream-soak-test/1_0] doesn't exist and couldn't be created at org.apache.kafka.streams.processor.internals.StateDirectory.directoryForTask(StateDirectory.java:112) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:187) at org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createTasks(StandbyTaskCreator.java:85) at org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:337) {code} The exception arises from this line in StateDirectory#directoryForTask: {code:java} if (hasPersistentStores && !taskDir.exists() && !taskDir.mkdir()) {code} Presumably, if the taskDir did not exist when the two threads began this method, then they would both attempt to create the directory. One of them will get there first, leaving the other to return unsuccessfully from mkdir and ultimately throw the above ProcessorStateException. I've only confirmed that this affects 2.6 so far, but the unsafe methods are present in earlier versions. It's possible we made the problem worse somehow during "The Refactor" so that it's easier to hit this race condition. was: As explicitly stated in the StateDirectory javadocs, "This class is not thread-safe." Despite this, a single StateDirectory is shared among all the StreamThreads of a client. Some of the more "dangerous" methods are indeed synchronized, but others are not. For example, the innocent-sounding #directoryForTask is not thread-safe and is called in a number of places. We call it during task creation, and we call it during task closure when we check `directoryForTaskIsEmpty`. It's not uncommon for one thread to be closing a task while another is creating it after a rebalance. In fact, we saw exactly that happen in our test application. This ultimately lead to the following exception {code:java} org.apache.kafka.streams.errors.ProcessorStateException: task directory [/mnt/run/streams/state/stream-soak-test/1_0] doesn't exist and couldn't be created at org.apache.kafka.streams.processor.internals.StateDirectory.directoryForTask(StateDirectory.java:112) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:187) at org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createTasks(StandbyTaskCreator.java:85) at org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:337) {code} The exception arises from this line in StateDirectory#directoryForTask: {code:java} if (hasPersistentStores && !taskDir.exists() && !taskDir.mkdir()) {code} Presumably, if the taskDir did not exist when the two threads began this method, then they would both attempt to create the directory. One of them will get there first, leaving the other to return unsuccessfully from mkdir and ultimately throw the above ProcessorStateException. I've only confirmed that this affects 2.6 so far, but the unsafe methods are present in earlier versions. It's possible we made the problem worse somehow during "The Refactor" so that it's easier to hit this race condition. > StateDirectory is not thread-safe > - > > Key: KAFKA-10262 > URL: https://issues.apache.org/jira/browse/KAFKA-10262 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Sophie Blee-Goldman >Priority: Major > > As explicitly stated in the StateDirectory javadocs, "This class is not > thread-safe." > Despite this, a single StateDirectory is shared among all the StreamThreads > of a client. Some of the more "dangerous" methods are indeed synchronized, > but others are not. For example, the innocent-sounding #directoryForTask is > not thread-safe and is called in a number of places. We call it during task > creation, and we call it during task closure (throug
[GitHub] [kafka] vvcephei commented on pull request #9004: KAFKA-10261: Introduce the KIP-478 apis with shims
vvcephei commented on pull request #9004: URL: https://github.com/apache/kafka/pull/9004#issuecomment-656446145 Note, I didn't bother testing the shims directly, since they are very effectively tested by literally all the tests in Streams. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9004: KAFKA-10261: Introduce the KIP-478 apis with shims
vvcephei commented on a change in pull request #9004: URL: https://github.com/apache/kafka/pull/9004#discussion_r452588767 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseShim.java ## @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.internals.ThreadCache; + +import java.io.File; +import java.time.Duration; +import java.util.Map; + +public final class ProcessorContextReverseShim implements InternalProcessorContext { +final InternalApiProcessorContext delegate; + +static InternalProcessorContext shim(final InternalApiProcessorContext delegate) { +if (delegate instanceof ProcessorContextShim) { +return ((ProcessorContextShim) delegate).delegate; Review comment: You'll see this block in all the shims. There are times when the internal code would wind up converting new to old and then back to new. This block prevents us from jumping though multiple layers in that case. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java ## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.api; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.StreamsMetrics; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.TimestampExtractor; +import org.apache.kafka.streams.processor.To; + +import java.io.File; +import java.time.Duration; +import java.util.Map; + +/** + * Processor context interface. + * + * @param > a bound on the types of keys that may be forwarded + * @param > a bound on the types of values that may be forwarded + */ +public interface ProcessorContext { Review comment: I'll have to update the KIP. Replacing ProcessorContext instead of just adding the generic parameters is going to avoid the Scala compatibility issue we faced last time. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorShim.java ## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See
[GitHub] [kafka] vvcephei commented on pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted
vvcephei commented on pull request #8994: URL: https://github.com/apache/kafka/pull/8994#issuecomment-656441714 Hi @abbccdda @ableegoldman @mjsax , I've rebased on trunk to resolve conflicts with Sophie's parallel fix. I also dropped a few unnecessary side-fixes. Do you mind giving this a final pass? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10249) In-memory stores are skipped when checkpointing but not skipped when reading the checkpoint
[ https://issues.apache.org/jira/browse/KAFKA-10249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17155039#comment-17155039 ] John Roesler commented on KAFKA-10249: -- cc [~rhauch] , I've just pushed the fix to 2.6. > In-memory stores are skipped when checkpointing but not skipped when reading > the checkpoint > --- > > Key: KAFKA-10249 > URL: https://issues.apache.org/jira/browse/KAFKA-10249 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.6.0 > > > As the title suggests, offsets for in-memory stores (including the > suppression buffer) are not written to the checkpoint file. However, when > reading from the checkpoint file during task initialization, we do not check > StateStore#persistent. We attempt to look up the offsets for in-memory stores > in the checkpoint file, and obviously do not find them. > With eos we have to conclude that the existing state is dirty and thus throw > a TaskCorruptedException. So pretty much any task with in-memory state will > always hit this exception when reinitializing from the checkpoint, forcing it > to clear the entire state directory and build up all of its state again from > scratch (both persistent and in-memory). > This is especially unfortunate for KIP-441, as we will hit this any time a > task is moved from one thread to another. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10249) In-memory stores are skipped when checkpointing but not skipped when reading the checkpoint
[ https://issues.apache.org/jira/browse/KAFKA-10249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10249: - Fix Version/s: 2.6.0 > In-memory stores are skipped when checkpointing but not skipped when reading > the checkpoint > --- > > Key: KAFKA-10249 > URL: https://issues.apache.org/jira/browse/KAFKA-10249 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.6.0 > > > As the title suggests, offsets for in-memory stores (including the > suppression buffer) are not written to the checkpoint file. However, when > reading from the checkpoint file during task initialization, we do not check > StateStore#persistent. We attempt to look up the offsets for in-memory stores > in the checkpoint file, and obviously do not find them. > With eos we have to conclude that the existing state is dirty and thus throw > a TaskCorruptedException. So pretty much any task with in-memory state will > always hit this exception when reinitializing from the checkpoint, forcing it > to clear the entire state directory and build up all of its state again from > scratch (both persistent and in-memory). > This is especially unfortunate for KIP-441, as we will hit this any time a > task is moved from one thread to another. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei merged pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
vvcephei merged pull request #8996: URL: https://github.com/apache/kafka/pull/8996 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei removed a comment on pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
vvcephei removed a comment on pull request #8996: URL: https://github.com/apache/kafka/pull/8996#issuecomment-656435203 Unrelated test failures: * kafka.api.TransactionsTest.testBumpTransactionalEpoch * org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true] * org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
vvcephei commented on pull request #8996: URL: https://github.com/apache/kafka/pull/8996#issuecomment-656435203 Unrelated test failures: * kafka.api.TransactionsTest.testBumpTransactionalEpoch * org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true] * org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #8885: KAFKA-8264: decrease the record size for flaky test
showuon commented on pull request #8885: URL: https://github.com/apache/kafka/pull/8885#issuecomment-656432089 @hachikuji , could you help review this small PR to fix flaky test? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9005: KAFKA-10263: Do not assign standby for revoking stateless tasks
ableegoldman commented on a change in pull request #9005: URL: https://github.com/apache/kafka/pull/9005#discussion_r452570044 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java ## @@ -1402,9 +1404,13 @@ public void shouldTriggerImmediateRebalanceOnTasksRevoked() { final Map assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); // Verify at least one partition was revoked -assertThat(assignment.get(CONSUMER_1).partitions(), not(allTasks)); +assertThat(assignment.get(CONSUMER_1).partitions(), not(allPartitions)); Review comment: Whoops This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
ableegoldman commented on pull request #8996: URL: https://github.com/apache/kafka/pull/8996#issuecomment-656424125 Three flaky test failures: `MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1` `TransactionsTest.testBumpTransactionalEpoch` `EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10255) Fix flaky testOneWayReplicationWithAutorOffsetSync1 test
[ https://issues.apache.org/jira/browse/KAFKA-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17155024#comment-17155024 ] Sophie Blee-Goldman commented on KAFKA-10255: - Failed again h3. Stacktrace java.lang.AssertionError: consumer record size is not zero expected:<0> but was:<3> at org.junit.Assert.fail(Assert.java:89) at org.junit.Assert.failNotEquals(Assert.java:835) at org.junit.Assert.assertEquals(Assert.java:647) at org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1(MirrorConnectorsIntegrationTest.java:349) > Fix flaky testOneWayReplicationWithAutorOffsetSync1 test > > > Key: KAFKA-10255 > URL: https://issues.apache.org/jira/browse/KAFKA-10255 > Project: Kafka > Issue Type: Test >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk14/runs/279/log/?start=0 > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > > testOneWayReplicationWithAutorOffsetSync1 STARTED > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1 > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-trunk-jdk14/connect/mirror/build/reports/testOutput/org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1.test.stdout > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest > > testOneWayReplicationWithAutorOffsetSync1 FAILED > java.lang.AssertionError: consumer record size is not zero expected:<0> but > was:<2> > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.failNotEquals(Assert.java:835) > at org.junit.Assert.assertEquals(Assert.java:647) > at > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1(MirrorConnectorsIntegrationTest.java:349) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10264) Flaky Test TransactionsTest.testBumpTransactionalEpoch
Sophie Blee-Goldman created KAFKA-10264: --- Summary: Flaky Test TransactionsTest.testBumpTransactionalEpoch Key: KAFKA-10264 URL: https://issues.apache.org/jira/browse/KAFKA-10264 Project: Kafka Issue Type: Bug Components: core Reporter: Sophie Blee-Goldman h3. Stacktrace java.lang.AssertionError: Unexpected exception cause org.apache.kafka.common.KafkaException: The client hasn't received acknowledgment for some previously sent messages and can no longer retry them. It is safe to abort the transaction and continue. at org.junit.Assert.fail(Assert.java:89) at org.junit.Assert.assertTrue(Assert.java:42) at org.apache.kafka.test.TestUtils.assertFutureThrows(TestUtils.java:557) at kafka.api.TransactionsTest.testBumpTransactionalEpoch(TransactionsTest.scala:637) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on a change in pull request #9005: KAFKA-10263: Do not assign standby for revoking stateless tasks
guozhangwang commented on a change in pull request #9005: URL: https://github.com/apache/kafka/pull/9005#discussion_r452562468 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java ## @@ -1402,9 +1404,13 @@ public void shouldTriggerImmediateRebalanceOnTasksRevoked() { final Map assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); // Verify at least one partition was revoked -assertThat(assignment.get(CONSUMER_1).partitions(), not(allTasks)); +assertThat(assignment.get(CONSUMER_1).partitions(), not(allPartitions)); assertThat(assignment.get(CONSUMER_2).partitions(), equalTo(emptyList())); +// Verify that stateless revoked tasks would not be assigned as standbys + assertThat(AssignmentInfo.decode(assignment.get(CONSUMER_2).userData()).activeTasks(), equalTo(emptyList())); + assertThat(AssignmentInfo.decode(assignment.get(CONSUMER_2).userData()).standbyTasks(), equalTo(emptyMap())); Review comment: Verified that without the fix, this line will fail. ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java ## @@ -1402,9 +1404,13 @@ public void shouldTriggerImmediateRebalanceOnTasksRevoked() { final Map assignment = partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)).groupAssignment(); // Verify at least one partition was revoked -assertThat(assignment.get(CONSUMER_1).partitions(), not(allTasks)); +assertThat(assignment.get(CONSUMER_1).partitions(), not(allPartitions)); Review comment: I found it was always true because we are comparing a list of tasks with a list of partitions ;) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang opened a new pull request #9005: KAFKA-10263: Do not assign standby for revoking stateless tasks
guozhangwang opened a new pull request #9005: URL: https://github.com/apache/kafka/pull/9005 Also piggy-back a small fix to use TreeMap other than HashMap to preserve iteration ordering. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10263) Do not create standbys for those revoking active tasks if it is not stateful
Guozhang Wang created KAFKA-10263: - Summary: Do not create standbys for those revoking active tasks if it is not stateful Key: KAFKA-10263 URL: https://issues.apache.org/jira/browse/KAFKA-10263 Project: Kafka Issue Type: Improvement Reporter: Guozhang Wang Assignee: Guozhang Wang Today in StreamsPartitionAssignor, if an intended active tasks is not yet revoked from the old owner we would not give it to the newly assigned owner, but instead we would assign it as a standby task to the new owner to let it start restoring a bit early. However, if that task is not stateful, then there's no point trying to let it restore at all. This should be avoided in the assignor. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement KIP-584 write path
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r452398464 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java ## @@ -0,0 +1,26 @@ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + +@InterfaceStability.Evolving +public class DescribeFeaturesOptions extends AbstractOptions { +private boolean shouldUseControllerAsDestination = false; + +public DescribeFeaturesOptions shouldUseControllerAsDestination(boolean shouldUse) { +shouldUseControllerAsDestination = shouldUse; +return this; +} + +public boolean shouldUseControllerAsDestination() { Review comment: remove word 'should' ## File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java ## @@ -1214,6 +1215,10 @@ default AlterClientQuotasResult alterClientQuotas(Collection entries, AlterClientQuotasOptions options); +DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options); + +UpdateFinalizedFeaturesResult updateFinalizedFeatures(Set featureUpdates, UpdateFinalizedFeaturesOptions options); Review comment: add doc ## File path: clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java ## @@ -0,0 +1,26 @@ +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + +@InterfaceStability.Evolving +public class DescribeFeaturesOptions extends AbstractOptions { +private boolean shouldUseControllerAsDestination = false; + +public DescribeFeaturesOptions shouldUseControllerAsDestination(boolean shouldUse) { Review comment: add doc ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -3979,6 +3986,98 @@ void handleFailure(Throwable throwable) { return new AlterClientQuotasResult(Collections.unmodifiableMap(futures)); } +@Override +public DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options) { Review comment: 1. add test code in `KafkaAdminClientTest` 2. final variable names ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -3979,6 +3986,98 @@ void handleFailure(Throwable throwable) { return new AlterClientQuotasResult(Collections.unmodifiableMap(futures)); } +@Override +public DescribeFeaturesResult describeFeatures(DescribeFeaturesOptions options) { +final KafkaFutureImpl future = new KafkaFutureImpl<>(); +final long now = time.milliseconds(); +Call callViaLeastLoadedNode = new Call("describeFeatures", calcDeadlineMs(now, options.timeoutMs()), +new LeastLoadedNodeProvider()) { + +@Override +ApiVersionsRequest.Builder createRequest(int timeoutMs) { +return new ApiVersionsRequest.Builder(); +} + +@Override +void handleResponse(AbstractResponse response) { +ApiVersionsResponse apiVersionsResponse = (ApiVersionsResponse) response; +if (apiVersionsResponse.data.errorCode() == Errors.NONE.code()) { +future.complete( +new FeatureMetadata( +apiVersionsResponse.finalizedFeatures(), +apiVersionsResponse.finalizedFeaturesEpoch(), +apiVersionsResponse.supportedFeatures())); +} else { +future.completeExceptionally( + Errors.forCode(apiVersionsResponse.data.errorCode()).exception()); +} +} + +@Override +void handleFailure(Throwable throwable) { +completeAllExceptionally(Collections.singletonList(future), throwable); +} +}; + +Call call = callViaLeastLoadedNode; +if (options.shouldUseControllerAsDestination()) { +call = new Call("describeFeatures", calcDeadlineMs(now, options.timeoutMs()), +new ControllerNodeProvider()) { + +@Override +ApiVersionsRequest.Builder createRequest(int timeoutMs) { +return (ApiVersionsRequest.Builder) callViaLeastLoadedNode.createRequest(timeoutMs); +} + +@Override +void handleResponse(AbstractResponse response) { +callViaLeastLoadedNode.handleResponse(response); +} + +@Override +void handleFailure(Throwable throwable) { +callViaLeastLoadedNode.handleFailure(throwable); +} +}; +} +runnable.call(call, now); +return new DescribeFeaturesResult(future); +} + +
[GitHub] [kafka] abbccdda commented on a change in pull request #9002: MINOR: Add ApiMessageTypeGenerator
abbccdda commented on a change in pull request #9002: URL: https://github.com/apache/kafka/pull/9002#discussion_r452542243 ## File path: generator/src/main/java/org/apache/kafka/message/TypeClassGenerator.java ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.message; + +import java.io.BufferedWriter; +import java.io.IOException; + +public interface TypeClassGenerator { +/** + * The short name of the type class file we are generating. For example, + * ApiMessageType.java. + */ +String outputName(); + +/** + * Registers a message spec with the generator. + * + * @param spec The spec to register. + */ +void registerMessageType(MessageSpec spec); + +/** + * Write out the internal state. Review comment: Generate the type and write it out to the internal state. ## File path: generator/src/main/java/org/apache/kafka/message/MessageGenerator.java ## @@ -267,10 +283,10 @@ public static void main(String[] args) throws Exception { if (args.length == 0) { System.out.println(USAGE); System.exit(0); -} else if (args.length != 3) { +} else if (args.length != 4) { Review comment: Why we need separate exit code for mis-used arguments? ## File path: generator/src/main/java/org/apache/kafka/message/MessageGenerator.java ## @@ -169,20 +185,20 @@ public static void processDirectories(String packageName, String outputDir, Stri generator.write(writer); } numProcessed++; -messageTypeGenerator.registerMessageType(spec); +if (typeClassGenerator != null) { +typeClassGenerator.registerMessageType(spec); +} } catch (Exception e) { throw new RuntimeException("Exception while processing " + inputPath.toString(), e); } } } -if (messageTypeGenerator.hasRegisteredTypes()) { -Path factoryOutputPath = Paths.get(outputDir, API_MESSAGE_TYPE_JAVA); -outputFileNames.add(API_MESSAGE_TYPE_JAVA); +if (typeClassGenerator != null) { Review comment: Instead of using null ptr, maybe we could get a `NoOpTypeClassGenerator` which will do nothing for `generateAndWrite`, and remove the `outputName()` to make one single API. Something like ``` String outputFileName = typeClassGenerator.generateAndWrite(outputDir, writer); ``` I don't feel strong about this change, ideally it should look better, but up to you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10262) StateDirectory is not thread-safe
Sophie Blee-Goldman created KAFKA-10262: --- Summary: StateDirectory is not thread-safe Key: KAFKA-10262 URL: https://issues.apache.org/jira/browse/KAFKA-10262 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.6.0 Reporter: Sophie Blee-Goldman As explicitly stated in the StateDirectory javadocs, "This class is not thread-safe." Despite this, a single StateDirectory is shared among all the StreamThreads of a client. Some of the more "dangerous" methods are indeed synchronized, but others are not. For example, the innocent-sounding #directoryForTask is not thread-safe and is called in a number of places. We call it during task creation, and we call it during task closure when we check `directoryForTaskIsEmpty`. It's not uncommon for one thread to be closing a task while another is creating it after a rebalance. In fact, we saw exactly that happen in our test application. This ultimately lead to the following exception {code:java} org.apache.kafka.streams.errors.ProcessorStateException: task directory [/mnt/run/streams/state/stream-soak-test/1_0] doesn't exist and couldn't be created at org.apache.kafka.streams.processor.internals.StateDirectory.directoryForTask(StateDirectory.java:112) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:187) at org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createTasks(StandbyTaskCreator.java:85) at org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:337) {code} The exception arises from this line in StateDirectory#directoryForTask: {code:java} if (hasPersistentStores && !taskDir.exists() && !taskDir.mkdir()) {code} Presumably, if the taskDir did not exist when the two threads began this method, then they would both attempt to create the directory. One of them will get there first, leaving the other to return unsuccessfully from mkdir and ultimately throw the above ProcessorStateException. I've only confirmed that this affects 2.6 so far, but the unsafe methods are present in earlier versions. It's possible we made the problem worse somehow during "The Refactor" so that it's easier to hit this race condition. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei opened a new pull request #9004: KAFKA-10261: Introduce the KIP-478 apis with shims
vvcephei opened a new pull request #9004: URL: https://github.com/apache/kafka/pull/9004 Adds the new Processor and ProcessorContext interfaces as proposed in KIP-478. To integrate in a staged fashion with the code base, shims are included to convert back and forth between the new and old APIs. ProcessorNode is converted to the new APIs. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10261) Introduce the KIP-478 processors with shims
John Roesler created KAFKA-10261: Summary: Introduce the KIP-478 processors with shims Key: KAFKA-10261 URL: https://issues.apache.org/jira/browse/KAFKA-10261 Project: Kafka Issue Type: Sub-task Components: streams Reporter: John Roesler Assignee: John Roesler -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-656382338 > Do you want to rebase again so that I can run system tests one more time? sure! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…
apovzner commented on a change in pull request #8768: URL: https://github.com/apache/kafka/pull/8768#discussion_r452507990 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1308,18 +1410,27 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging { val value = maxConnections(configs) if (value <= 0) throw new ConfigException("Invalid max.connections $listenerMax") Review comment: yes, good catch. Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
junrao commented on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-656365594 @chia7712 : All 3 PRs you fixed above have been merged. Do you want to rebase again so that I can run system tests one more time? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…
apovzner commented on a change in pull request #8768: URL: https://github.com/apache/kafka/pull/8768#discussion_r452502811 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1289,15 +1311,95 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging { private def maxListenerConnections(listenerName: ListenerName): Int = maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue) + /** + * Calculates the delay needed to bring the observed connection creation rate to listener-level limit or to broker-wide + * limit, whichever the longest. The delay is capped to the quota window size defined by QuotaWindowSizeSecondsProp + * + * @param listenerName listener for which calculate the delay + * @param timeMs current time in milliseconds + * @return delay in milliseconds + */ + private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, timeMs: Long): Long = { +val listenerThrottleTimeMs = maxConnectionsPerListener + .get(listenerName) + .map(listenerQuota => recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs)) + .getOrElse(0) + +if (protectedListener(listenerName)) { + listenerThrottleTimeMs +} else { + val brokerThrottleTimeMs = recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs) + math.max(brokerThrottleTimeMs, listenerThrottleTimeMs) +} + } + + private def recordAndGetThrottleTimeMs(sensor: Sensor, timeMs: Long): Int = { +try { + sensor.record(1.0, timeMs) + 0 +} catch { + case e: QuotaViolationException => +val throttleTimeMs = QuotaUtils.boundedThrottleTime( + e.value, e.bound, QuotaUtils.rateMetricWindowSize(e.metric, timeMs), maxThrottleTimeMs).toInt +debug(s"Quota violated for sensor (${sensor.name}). Delay time: $throttleTimeMs ms") +throttleTimeMs +} + } + + /** + * Creates sensor for tracking the connection creation rate and corresponding connection rate quota for a given + * listener or broker-wide, if listener is not provided. + * @param quotaLimit connection creation rate quota + * @param listenerOpt listener name if sensor is for a listener + */ + private def createConnectionRateQuotaSensor(quotaLimit: Int, listenerOpt: Option[String] = None): Sensor = { +val quotaEntity = listenerOpt.getOrElse("broker") +val sensor = metrics.sensor(s"ConnectionCreationRate-$quotaEntity", rateQuotaMetricConfig(quotaLimit)) +sensor.add(connectionRateMetricName(listenerOpt), new Rate, null, false) +info(s"Created ConnectionCreationRate-$quotaEntity sensor, quotaLimit=$quotaLimit") +sensor + } + + private def updateConnectionRateQuota(quotaLimit: Int, listenerOpt: Option[String] = None): Unit = { +val metric = metrics.metric(connectionRateMetricName((listenerOpt))) +metric.config(rateQuotaMetricConfig(quotaLimit)) +info(s"Updated ${listenerOpt.getOrElse("broker")} max connection creation rate to $quotaLimit") + } + + private def connectionRateMetricName(listenerOpt: Option[String]): MetricName = { +val quotaEntity = listenerOpt.getOrElse("broker") +metrics.metricName( + s"connection-creation-rate-$quotaEntity", + "connection-quota-no-jmx", + s"Tracking $quotaEntity connection creation rate", + rateQuotaMetricTags(listenerOpt)) + } + + private def rateQuotaMetricConfig(quotaLimit: Int): MetricConfig = { +new MetricConfig() + .timeWindow(config.quotaWindowSizeSeconds.toLong, TimeUnit.SECONDS) + .samples(config.numQuotaSamples) + .quota(new Quota(quotaLimit, true)) + } + + private def rateQuotaMetricTags(listenerOpt: Option[String]): util.Map[String, String] = { +val tags = new util.LinkedHashMap[String, String] +listenerOpt.foreach(listener => tags.put("listener", listener)) +tags Review comment: I realized that I don't need tags here anymore, since the name of the metric contains the name of the listener, so the metrics are already distinct per listener (and broker-wide). I removed that method altogether. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10253) Kafka Connect gets into an infinite rebalance loop
[ https://issues.apache.org/jira/browse/KAFKA-10253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantin Lalafaryan updated KAFKA-10253: -- Description: Hello everyone! We are running kafka-connect cluster (3 workers) and very often it gets into an infinite rebalance loop. {code:java} 2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= kafka-connect] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= kafka-connect] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,733 INFO [Worker clientId=connect-1, groupId= kafka-connect] Was selected to perform assignments, but do not have latest config found in sync request. Returning an empty configuration to trigger re-sync. (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= kafka-connect] Successfully joined group with generation 305655831 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= kafka-connect] Joined group at generation 305655831 with protocol version 2 and got assignment: Assignment{error=1, leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= kafka-connect] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= kafka-connect] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,736 INFO [Worker clientId=connect-1, groupId= kafka-connect] Was selected to perform assignments, but do not have latest config found in sync request. Returning an empty configuration to trigger re-sync. (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= kafka-connect] Successfully joined group with generation 305655832 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= kafka-connect] Joined group at generation 305655832 with protocol version 2 and got assignment: Assignment{error=1, leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= kafka-connect] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= kafka-connect] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,740 INFO [Worker clientId=connect-1, groupId= kafka-connect] Was selected to perform assignments, but do not have latest config found in sync request. Returning an empty configuration to trigger re-sync. (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= kafka-connect] Successfully joined group with generation 305655833 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= kafka-connect] Joined group at generation 305655833 with protocol version 2 and got assignment: Assignment{error=1, leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= kafka-connect] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) [DistributedHerder-connect-1-1] 2020-07-09
[jira] [Updated] (KAFKA-10253) Kafka Connect gets into an infinite rebalance loop
[ https://issues.apache.org/jira/browse/KAFKA-10253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantin Lalafaryan updated KAFKA-10253: -- Description: Hello everyone! We are running kafka-connect cluster (3 workers) and very often it gets into an infinite rebalance loop. {code:java} 2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= kafka-connect] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= kafka-connect] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,733 INFO [Worker clientId=connect-1, groupId= kafka-connect] Was selected to perform assignments, but do not have latest config found in sync request. Returning an empty configuration to trigger re-sync. (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= kafka-connect] Successfully joined group with generation 305655831 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= kafka-connect] Joined group at generation 305655831 with protocol version 2 and got assignment: Assignment{error=1, leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= kafka-connect] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= kafka-connect] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,736 INFO [Worker clientId=connect-1, groupId= kafka-connect] Was selected to perform assignments, but do not have latest config found in sync request. Returning an empty configuration to trigger re-sync. (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= kafka-connect] Successfully joined group with generation 305655832 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= kafka-connect] Joined group at generation 305655832 with protocol version 2 and got assignment: Assignment{error=1, leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= kafka-connect] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= kafka-connect] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,740 INFO [Worker clientId=connect-1, groupId= kafka-connect] Was selected to perform assignments, but do not have latest config found in sync request. Returning an empty configuration to trigger re-sync. (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= kafka-connect] Successfully joined group with generation 305655833 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= kafka-connect] Joined group at generation 305655833 with protocol version 2 and got assignment: Assignment{error=1, leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1-1] 2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= kafka-connect] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) [DistributedHerder-connect-1-1] 2020-07-09
[GitHub] [kafka] mjsax commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
mjsax commented on a change in pull request #8996: URL: https://github.com/apache/kafka/pull/8996#discussion_r452494073 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java ## @@ -104,26 +104,27 @@ static void closeStateManager(final Logger log, if (stateDirectory.lock(id)) { try { stateMgr.close(); - -if (wipeStateStore) { -log.debug("Wiping state stores for {} task {}", taskType, id); -// we can just delete the whole dir of the task, including the state store images and the checkpoint files, -// and then we write an empty checkpoint file indicating that the previous close is graceful and we just -// need to re-bootstrap the restoration from the beginning -Utils.delete(stateMgr.baseDir()); -} } catch (final ProcessorStateException e) { firstException.compareAndSet(null, e); } finally { -stateDirectory.unlock(id); +try { +if (wipeStateStore) { +log.debug("Wiping state stores for {} task {}", taskType, id); +// we can just delete the whole dir of the task, including the state store images and the checkpoint files, +// and then we write an empty checkpoint file indicating that the previous close is graceful and we just +// need to re-bootstrap the restoration from the beginning +Utils.delete(stateMgr.baseDir()); +} +} finally { +stateDirectory.unlock(id); +} Review comment: IMHO, the code is good as-is. Thanks for rewriting to a nested `try-final` structure! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
ableegoldman commented on a change in pull request #8996: URL: https://github.com/apache/kafka/pull/8996#discussion_r452491173 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java ## @@ -104,26 +104,27 @@ static void closeStateManager(final Logger log, if (stateDirectory.lock(id)) { try { stateMgr.close(); - -if (wipeStateStore) { -log.debug("Wiping state stores for {} task {}", taskType, id); -// we can just delete the whole dir of the task, including the state store images and the checkpoint files, -// and then we write an empty checkpoint file indicating that the previous close is graceful and we just -// need to re-bootstrap the restoration from the beginning -Utils.delete(stateMgr.baseDir()); -} } catch (final ProcessorStateException e) { firstException.compareAndSet(null, e); } finally { -stateDirectory.unlock(id); +try { +if (wipeStateStore) { +log.debug("Wiping state stores for {} task {}", taskType, id); +// we can just delete the whole dir of the task, including the state store images and the checkpoint files, +// and then we write an empty checkpoint file indicating that the previous close is graceful and we just +// need to re-bootstrap the restoration from the beginning +Utils.delete(stateMgr.baseDir()); +} +} finally { +stateDirectory.unlock(id); +} Review comment: I can't use `ExceptionUtils#executeAll` because the compiler complains that we don't handle the `IOException` unless we surround each Runnable with its own try-catch block, at which point `#executeAll` isn't really doing anything This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
vvcephei commented on pull request #8996: URL: https://github.com/apache/kafka/pull/8996#issuecomment-656349040 It looks like the tests never ran before. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
vvcephei commented on pull request #8996: URL: https://github.com/apache/kafka/pull/8996#issuecomment-656348955 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
vvcephei commented on a change in pull request #8996: URL: https://github.com/apache/kafka/pull/8996#discussion_r452487075 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -222,8 +222,16 @@ void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) { log.trace("Loaded offsets from the checkpoint file: {}", loadedCheckpoints); for (final StateStoreMetadata store : stores.values()) { +if (store.corrupted) { +log.error("Tried to initialize store offsets for corrupted store {}", store); +throw new IllegalStateException("Should not initialize offsets for a corrupted task"); +} + if (store.changelogPartition == null) { log.info("State store {} is not logged and hence would not be restored", store.stateStore.name()); +} else if (!store.stateStore.persistent()) { Review comment: Fair enough. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] JimGalasyn commented on a change in pull request #8995: Restore stream-table duality description
JimGalasyn commented on a change in pull request #8995: URL: https://github.com/apache/kafka/pull/8995#discussion_r452486723 ## File path: docs/streams/core-concepts.html ## @@ -170,13 +150,59 @@ Duality of or to run interactive queries against your application's latest processing results. And, beyond its internal usage, the Kafka Streams API also allows developers to exploit this duality in their own applications. - + - + Before we discuss concepts such as aggregations in Kafka Streams, we must first introduce tables in more detail, and talk about the aforementioned stream-table duality. - Essentially, this duality means that a stream can be viewed as a table, and a table can be viewed as a stream. - + Essentially, this duality means that a stream can be viewed as a table, and a table can be viewed as a stream. Kafka's log compaction feature, for example, exploits this duality. + + + +A simple form of a table is a collection of key-value pairs, also called a map or associative array. Such a table may look as follows: + + + +The stream-table duality describes the close relationship between streams and tables. + +Stream as Table: A stream can be considered a changelog of a table, where each data record in the stream captures a state change of the table. A stream is thus a table in disguise, and it can be easily turned into a "real" table by replaying the changelog from beginning to end to reconstruct the table. Similarly, in a more general analogy, aggregating data records in a stream - such as computing the total number of pageviews by user from a stream of pageview events - will return a table (here with the key and the value being the user and its corresponding pageview count, respectively). +Table as Stream: A table can be considered a snapshot, at a point in time, of the latest value for each key in a stream (a stream's data records are key-value pairs). A table is thus a stream in disguise, and it can be easily turned into a "real" stream by iterating over each key-value entry in the table. + + + +Let's illustrate this with an example. Imagine a table that tracks the total number of pageviews by user (first column of diagram below). Over time, whenever a new pageview event is processed, the state of the table is updated accordingly. Here, the state changes between different points in time - and different revisions of the table - can be represented as a changelog stream (second column). + + + + +Interestingly, because of the stream-table duality, the same stream can be used to reconstruct the original table (third column): + + + + +The same mechanism is used, for example, to replicate databases via change data capture (CDC) and, within Kafka Streams, to replicate its so-called state stores across machines for fault-tolerance. +The stream-table duality is such an important concept that Kafka Streams models it explicitly via the KStream, KTable, and GlobalKTable interfaces. + + +Aggregations + +An aggregation operation takes one input stream or table, and yields a new table by combining multiple input records into a single output record. Examples of aggregations are computing counts or sum. + + + +In the Kafka Streams DSL, an input stream of an aggregation can be a KStream or a KTable, but the output stream will always be a KTable. This allows Kafka Streams to update an aggregate value upon the out-of-order arrival of further records after the value was produced and emitted. When such out-of-order arrival happens, the aggregating KStream or KTable emits a new aggregate value. Because the output is a KTable, the new value is considered to overwrite the old value with the same key in subsequent processing steps. Review comment: kk cool, I'll open a ticket. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8993: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938)
vvcephei commented on pull request #8993: URL: https://github.com/apache/kafka/pull/8993#issuecomment-656347070 The new test passed: http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-07-09--001.1594322852--vvcephei--kafka-10173-upgrade-smoke-system-test-2-5--91174c3b2/report.html The all-streams tests failed because of the Jenkins restart. Kicked it off again: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4021/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
ableegoldman commented on a change in pull request #8996: URL: https://github.com/apache/kafka/pull/8996#discussion_r452483525 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -222,8 +222,16 @@ void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) { log.trace("Loaded offsets from the checkpoint file: {}", loadedCheckpoints); for (final StateStoreMetadata store : stores.values()) { +if (store.corrupted) { +log.error("Tried to initialize store offsets for corrupted store {}", store); +throw new IllegalStateException("Should not initialize offsets for a corrupted task"); +} + if (store.changelogPartition == null) { log.info("State store {} is not logged and hence would not be restored", store.stateStore.name()); +} else if (!store.stateStore.persistent()) { Review comment: I'm not sure I'd call it a spurious warning -- if we don't expect to have checkpointed in-memory stores, and we happen to have an offset for one in the checkpoint file, it seems reasonable to log a warning This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #8995: Restore stream-table duality description
mjsax merged pull request #8995: URL: https://github.com/apache/kafka/pull/8995 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8995: Restore stream-table duality description
mjsax commented on a change in pull request #8995: URL: https://github.com/apache/kafka/pull/8995#discussion_r452481312 ## File path: docs/streams/core-concepts.html ## @@ -170,13 +150,59 @@ Duality of or to run interactive queries against your application's latest processing results. And, beyond its internal usage, the Kafka Streams API also allows developers to exploit this duality in their own applications. - + - + Before we discuss concepts such as aggregations in Kafka Streams, we must first introduce tables in more detail, and talk about the aforementioned stream-table duality. - Essentially, this duality means that a stream can be viewed as a table, and a table can be viewed as a stream. - + Essentially, this duality means that a stream can be viewed as a table, and a table can be viewed as a stream. Kafka's log compaction feature, for example, exploits this duality. + + + +A simple form of a table is a collection of key-value pairs, also called a map or associative array. Such a table may look as follows: + + + +The stream-table duality describes the close relationship between streams and tables. + +Stream as Table: A stream can be considered a changelog of a table, where each data record in the stream captures a state change of the table. A stream is thus a table in disguise, and it can be easily turned into a "real" table by replaying the changelog from beginning to end to reconstruct the table. Similarly, in a more general analogy, aggregating data records in a stream - such as computing the total number of pageviews by user from a stream of pageview events - will return a table (here with the key and the value being the user and its corresponding pageview count, respectively). +Table as Stream: A table can be considered a snapshot, at a point in time, of the latest value for each key in a stream (a stream's data records are key-value pairs). A table is thus a stream in disguise, and it can be easily turned into a "real" stream by iterating over each key-value entry in the table. + + + +Let's illustrate this with an example. Imagine a table that tracks the total number of pageviews by user (first column of diagram below). Over time, whenever a new pageview event is processed, the state of the table is updated accordingly. Here, the state changes between different points in time - and different revisions of the table - can be represented as a changelog stream (second column). + + + + +Interestingly, because of the stream-table duality, the same stream can be used to reconstruct the original table (third column): + + + + +The same mechanism is used, for example, to replicate databases via change data capture (CDC) and, within Kafka Streams, to replicate its so-called state stores across machines for fault-tolerance. +The stream-table duality is such an important concept that Kafka Streams models it explicitly via the KStream, KTable, and GlobalKTable interfaces. + + +Aggregations + +An aggregation operation takes one input stream or table, and yields a new table by combining multiple input records into a single output record. Examples of aggregations are computing counts or sum. + + + +In the Kafka Streams DSL, an input stream of an aggregation can be a KStream or a KTable, but the output stream will always be a KTable. This allows Kafka Streams to update an aggregate value upon the out-of-order arrival of further records after the value was produced and emitted. When such out-of-order arrival happens, the aggregating KStream or KTable emits a new aggregate value. Because the output is a KTable, the new value is considered to overwrite the old value with the same key in subsequent processing steps. Review comment: Unfortunately, and I try to get it into better shape incrementally (reading GitHub diffs with long lines is just a pain). Would go awesome if somebody (*cough*) could do a PR just fixing it throughout the docs -- the current lazy approach is somewhat tiring. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8993: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938)
vvcephei commented on pull request #8993: URL: https://github.com/apache/kafka/pull/8993#issuecomment-656341862 Just a note that this is the backport of https://github.com/apache/kafka/pull/8938 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
ableegoldman commented on a change in pull request #8996: URL: https://github.com/apache/kafka/pull/8996#discussion_r452478882 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java ## @@ -104,26 +104,27 @@ static void closeStateManager(final Logger log, if (stateDirectory.lock(id)) { try { stateMgr.close(); - -if (wipeStateStore) { -log.debug("Wiping state stores for {} task {}", taskType, id); -// we can just delete the whole dir of the task, including the state store images and the checkpoint files, -// and then we write an empty checkpoint file indicating that the previous close is graceful and we just -// need to re-bootstrap the restoration from the beginning -Utils.delete(stateMgr.baseDir()); -} } catch (final ProcessorStateException e) { firstException.compareAndSet(null, e); } finally { -stateDirectory.unlock(id); +try { +if (wipeStateStore) { +log.debug("Wiping state stores for {} task {}", taskType, id); +// we can just delete the whole dir of the task, including the state store images and the checkpoint files, +// and then we write an empty checkpoint file indicating that the previous close is graceful and we just +// need to re-bootstrap the restoration from the beginning +Utils.delete(stateMgr.baseDir()); +} +} finally { +stateDirectory.unlock(id); +} Review comment: Well, I figured it didn't matter since these both just throw IOException which we catch in the outer block. The point was to make sure we unlock it. But I'll check out `ExceptionUtils#executeAll` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10226) KStream without SASL information should return error in confluent cloud
[ https://issues.apache.org/jira/browse/KAFKA-10226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17154913#comment-17154913 ] Apurva Mehta commented on KAFKA-10226: -- It would be good to add a `producer.send(...); producer.flush(); producer.close()` to that test case and see if it throws an exception. > KStream without SASL information should return error in confluent cloud > --- > > Key: KAFKA-10226 > URL: https://issues.apache.org/jira/browse/KAFKA-10226 > Project: Kafka > Issue Type: Bug > Components: clients, streams >Affects Versions: 2.5.0 >Reporter: Werner Daehn >Priority: Minor > > I have create a KStream against the Confluent cloud and wondered why no data > has been received from the source. Reason was that I forgot to add the SASL > api keys and secrets. > > For end users this might lead to usability issues. If the KStream wants to > read from a topic and is not allowed to, this should raise an error, not be > silently ignored. > > Hoe do producer/consumer clients handle that situation? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10226) KStream without SASL information should return error in confluent cloud
[ https://issues.apache.org/jira/browse/KAFKA-10226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17154909#comment-17154909 ] John Roesler commented on KAFKA-10226: -- Hi [~wdaehn] , I'm no expert on this stuff, but I think that the constructor alone may not actually connect to the server. Do you also get no exception if you try to send and close? IIRC, the constructor does validate that the provided host is resolvable by DNS, which is probably why you get an immediate exception on a bad hostname. Thanks, -John > KStream without SASL information should return error in confluent cloud > --- > > Key: KAFKA-10226 > URL: https://issues.apache.org/jira/browse/KAFKA-10226 > Project: Kafka > Issue Type: Bug > Components: clients, streams >Affects Versions: 2.5.0 >Reporter: Werner Daehn >Priority: Minor > > I have create a KStream against the Confluent cloud and wondered why no data > has been received from the source. Reason was that I forgot to add the SASL > api keys and secrets. > > For end users this might lead to usability issues. If the KStream wants to > read from a topic and is not allowed to, this should raise an error, not be > silently ignored. > > Hoe do producer/consumer clients handle that situation? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9584) Removing headers causes ConcurrentModificationException
[ https://issues.apache.org/jira/browse/KAFKA-9584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Micah Ramos reassigned KAFKA-9584: -- Assignee: (was: Micah Ramos) > Removing headers causes ConcurrentModificationException > --- > > Key: KAFKA-9584 > URL: https://issues.apache.org/jira/browse/KAFKA-9584 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Micah Ramos >Priority: Minor > > The consumer record that is used during punctuate is static, this can cause > java.util.ConcurrentModificationException when modifying the headers. > Using a single instance of ConsumerRecord for all punctuates causes other > strange behavior: > # Headers are shared across partitions. > # A topology that adds a single header could append an infinite number of > headers (one per punctuate iteration), causing memory problems in the current > topology as well as down stream consumers since the headers are written with > the record when it is produced to a topic. > > I would expect that each invocation of punctuate would be initialized with a > new header object. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10256) Create a server gradle module for Java code needed only by servers
[ https://issues.apache.org/jira/browse/KAFKA-10256?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-10256. -- Resolution: Won't Fix > Create a server gradle module for Java code needed only by servers > -- > > Key: KAFKA-10256 > URL: https://issues.apache.org/jira/browse/KAFKA-10256 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Priority: Major > > It doesn't really make sense to have a "server" directory in the "clients" > gradle module. The client is not the server. Instead, we should have a > separate gradle module for code which is server-specific. > This will avoid polluting the client CLASSPATH with code which is internal to > the server, and make the functional division of the code clearer. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on a change in pull request #8999: MINOR; Return timed out connections as a List instead of a Set
dajac commented on a change in pull request #8999: URL: https://github.com/apache/kafka/pull/8999#discussion_r452440483 ## File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ## @@ -443,13 +444,13 @@ public boolean isConnectionSetupTimeout(String id, long now) { } /** - * Return the Set of nodes whose connection setup has timed out. + * Return the List of nodes whose connection setup has timed out. * @param now the current time in ms */ -public Set nodesWithConnectionSetupTimeout(long now) { +public List nodesWithConnectionSetupTimeout(long now) { return connectingNodes.stream() .filter(id -> isConnectionSetupTimeout(id, now)) -.collect(Collectors.toSet()); +.collect(Collectors.toCollection(LinkedList::new)); Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted
vvcephei commented on a change in pull request #8994: URL: https://github.com/apache/kafka/pull/8994#discussion_r452440068 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -232,10 +232,16 @@ void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) { log.debug("State store {} initialized from checkpoint with offset {} at changelog {}", store.stateStore.name(), store.offset, store.changelogPartition); } else { -// with EOS, if the previous run did not shutdown gracefully, we may lost the checkpoint file +// With EOS, if the previous run did not shutdown gracefully, we may lost the checkpoint file // and hence we are uncertain that the current local state only contains committed data; // in that case we need to treat it as a task-corrupted exception -if (eosEnabled && !storeDirIsEmpty) { + +// Note, this is a little overzealous, since we aren't checking whether the store's specific +// directory is nonempty, only if there are any directories for any stores. So if there are +// two stores in a task, and one is correctly written and checkpointed, while the other is +// neither written nor checkpointed, we _could_ correctly load the first and recover the second +// but instead we'll consider the whole task corrupted and discard the first and recover both. Review comment: https://issues.apache.org/jira/browse/KAFKA-10260 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10260) Streams could recover stores in a task independently
John Roesler created KAFKA-10260: Summary: Streams could recover stores in a task independently Key: KAFKA-10260 URL: https://issues.apache.org/jira/browse/KAFKA-10260 Project: Kafka Issue Type: Improvement Components: streams Reporter: John Roesler Currently, ProcessorStateManager checks for corrupted tasks by checking, for each persistent store, if its checkpoint is missing, then the task directory must also be empty. This is a little overzealous, since we aren't checking whether the store's specific directory is nonempty, only if there are any directories for any stores. So if there are two stores in a task, and one is correctly written and checkpointed, while the other is neither written nor checkpointed, we _could_ correctly load the first and recover the second but instead we'll consider the whole task corrupted and discard the first and recover both. The fix would be to check, for each persistent store that doesn't have a checkpoint, that its _specific_ store directory is also missing. Such a store will be restored from the changelog and we don't need to consider the task corrupted. See ProcessorStateManager#initializeStoreOffsetsFromCheckpoint -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] omkreddy closed pull request #8937: MINOR: Create ChannelBuilder for each connection in ConnectionStressWorker workload
omkreddy closed pull request #8937: URL: https://github.com/apache/kafka/pull/8937 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
vvcephei commented on a change in pull request #8996: URL: https://github.com/apache/kafka/pull/8996#discussion_r452433188 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java ## @@ -104,26 +104,27 @@ static void closeStateManager(final Logger log, if (stateDirectory.lock(id)) { try { stateMgr.close(); - -if (wipeStateStore) { -log.debug("Wiping state stores for {} task {}", taskType, id); -// we can just delete the whole dir of the task, including the state store images and the checkpoint files, -// and then we write an empty checkpoint file indicating that the previous close is graceful and we just -// need to re-bootstrap the restoration from the beginning -Utils.delete(stateMgr.baseDir()); -} } catch (final ProcessorStateException e) { firstException.compareAndSet(null, e); } finally { -stateDirectory.unlock(id); +try { +if (wipeStateStore) { +log.debug("Wiping state stores for {} task {}", taskType, id); +// we can just delete the whole dir of the task, including the state store images and the checkpoint files, +// and then we write an empty checkpoint file indicating that the previous close is graceful and we just +// need to re-bootstrap the restoration from the beginning +Utils.delete(stateMgr.baseDir()); +} +} finally { +stateDirectory.unlock(id); +} Review comment: I take it this block can also throw an exception? We shouldn't throw exceptions inside a finally block because it's not defined when the exception will be thrown, or in the case where the first try block threw, which exception is ultimately thrown is also undefined. To make this simpler to grapple with, we added org.apache.kafka.streams.state.internals.ExceptionUtils#executeAll ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -222,8 +222,16 @@ void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) { log.trace("Loaded offsets from the checkpoint file: {}", loadedCheckpoints); for (final StateStoreMetadata store : stores.values()) { +if (store.corrupted) { +log.error("Tried to initialize store offsets for corrupted store {}", store); +throw new IllegalStateException("Should not initialize offsets for a corrupted task"); +} + if (store.changelogPartition == null) { log.info("State store {} is not logged and hence would not be restored", store.stateStore.name()); +} else if (!store.stateStore.persistent()) { Review comment: I think we should also remove the changelogPartition from loadedCheckpoints, if it exists. Otherwise, we'll spuriously warn in L267. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes
cadonna commented on a change in pull request #8902: URL: https://github.com/apache/kafka/pull/8902#discussion_r452425472 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ## @@ -45,24 +45,23 @@ protected ProcessorNode currentNode; private long currentSystemTimeMs; -final StateManager stateManager; Review comment: If I understand you correctly, you propose to have a processor state manager reference in the `AbstractProcessorContext` and in `ProcessorContextImpl` instead of in the `GlobalProcessorContextImpl` and in `ProcessorContextImpl`. Moreover, you want to have a method `stateManager()` in `AbstractProcessorContext` that is overridden only in `ProcessorContextImpl`. FWIW, I think it is cleaner to have the references in each child and an abstract method `stateManager()` in `AbstractProcessorContext` that is overridden in both children. My reasoning is that both children have a state manager that is used in `AbstractProcessorContext` (i.e., both should provide a method `stateManager()`) but each child uses a different type of state manager internally. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted
vvcephei commented on a change in pull request #8994: URL: https://github.com/apache/kafka/pull/8994#discussion_r452424595 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -232,10 +232,16 @@ void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) { log.debug("State store {} initialized from checkpoint with offset {} at changelog {}", store.stateStore.name(), store.offset, store.changelogPartition); } else { -// with EOS, if the previous run did not shutdown gracefully, we may lost the checkpoint file +// With EOS, if the previous run did not shutdown gracefully, we may lost the checkpoint file // and hence we are uncertain that the current local state only contains committed data; // in that case we need to treat it as a task-corrupted exception -if (eosEnabled && !storeDirIsEmpty) { + +// Note, this is a little overzealous, since we aren't checking whether the store's specific +// directory is nonempty, only if there are any directories for any stores. So if there are +// two stores in a task, and one is correctly written and checkpointed, while the other is +// neither written nor checkpointed, we _could_ correctly load the first and recover the second +// but instead we'll consider the whole task corrupted and discard the first and recover both. +if (store.stateStore.persistent() && eosEnabled && !taskDirIsEmpty) { Review comment: Oh, I put this comment on the wrong line: > FYI, this is also fixed (better) in #8996 In other words, I agree, I like your fix better as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted
vvcephei commented on a change in pull request #8994: URL: https://github.com/apache/kafka/pull/8994#discussion_r452419917 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -193,6 +196,26 @@ private void closeAndRevive(final Map> taskWith log.error("Error suspending corrupted task {} ", task.id(), swallow); } task.closeDirty(); +if (task.isActive()) { +// Pause so we won't poll any more records for this task until it has been re-initialized +// Note, closeDirty already clears the partitiongroup for the task. +final Set currentAssignment = mainConsumer().assignment(); +final Set assignedToPauseAndReset = +Utils.intersection(HashSet::new, currentAssignment, task.inputPartitions()); Review comment: FYI, this is also fixed (better) in https://github.com/apache/kafka/pull/8996 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted
vvcephei commented on a change in pull request #8994: URL: https://github.com/apache/kafka/pull/8994#discussion_r452423901 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -232,10 +232,16 @@ void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) { log.debug("State store {} initialized from checkpoint with offset {} at changelog {}", store.stateStore.name(), store.offset, store.changelogPartition); } else { -// with EOS, if the previous run did not shutdown gracefully, we may lost the checkpoint file +// With EOS, if the previous run did not shutdown gracefully, we may lost the checkpoint file // and hence we are uncertain that the current local state only contains committed data; // in that case we need to treat it as a task-corrupted exception -if (eosEnabled && !storeDirIsEmpty) { + +// Note, this is a little overzealous, since we aren't checking whether the store's specific +// directory is nonempty, only if there are any directories for any stores. So if there are +// two stores in a task, and one is correctly written and checkpointed, while the other is +// neither written nor checkpointed, we _could_ correctly load the first and recover the second +// but instead we'll consider the whole task corrupted and discard the first and recover both. Review comment: IIRC, @guozhangwang tried to include this during the February refactor, but it's harder to get right than it sounds. Still, it would be very nice to have it, and I agree it would be good to file a ticket. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes
vvcephei merged pull request #8902: URL: https://github.com/apache/kafka/pull/8902 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10259) KIP-554: Add Broker-side SCRAM Config API
Ron Dagostino created KAFKA-10259: - Summary: KIP-554: Add Broker-side SCRAM Config API Key: KAFKA-10259 URL: https://issues.apache.org/jira/browse/KAFKA-10259 Project: Kafka Issue Type: New Feature Reporter: Ron Dagostino Assignee: Ron Dagostino -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on pull request #8993: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938)
vvcephei commented on pull request #8993: URL: https://github.com/apache/kafka/pull/8993#issuecomment-656291661 Testing again: the new test: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4018/ all streams tests: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4019/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted
vvcephei commented on a change in pull request #8994: URL: https://github.com/apache/kafka/pull/8994#discussion_r452419917 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -193,6 +196,26 @@ private void closeAndRevive(final Map> taskWith log.error("Error suspending corrupted task {} ", task.id(), swallow); } task.closeDirty(); +if (task.isActive()) { +// Pause so we won't poll any more records for this task until it has been re-initialized +// Note, closeDirty already clears the partitiongroup for the task. +final Set currentAssignment = mainConsumer().assignment(); +final Set assignedToPauseAndReset = +Utils.intersection(HashSet::new, currentAssignment, task.inputPartitions()); Review comment: FYI, this is also fixed (better) in https://github.com/apache/kafka/pull/8996 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes
vvcephei commented on pull request #8902: URL: https://github.com/apache/kafka/pull/8902#issuecomment-656290123 Thanks, all. I think we can merge this now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes
cadonna commented on a change in pull request #8902: URL: https://github.com/apache/kafka/pull/8902#discussion_r452415850 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -103,8 +104,12 @@ public void init(final ProcessorContext context, @SuppressWarnings("unchecked") void initStoreSerde(final ProcessorContext context) { +final String storeName = name(); +final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName); serdes = new StateSerdes<>( -ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), + changelogTopic != null ? +changelogTopic : + ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName), Review comment: I would not put any code that is not related to casts of `ProcessorContext` into `ProessorContextUtils`. I think the goal of `ProessorContextUtils` is to contain all code of which we want to get rid of in the future once the casts are fixed. We could move the `null` check into the constructor of `StateSerde` since we do also there a `null` check. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10253) Kafka Connect gets into an infinite rebalance loop
[ https://issues.apache.org/jira/browse/KAFKA-10253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17154790#comment-17154790 ] Chris Egerton commented on KAFKA-10253: --- [~klalafaryan] can you provide the configurations for all three of your workers? It would also be helpful if you could restart one of the workers and capture the logs for it from startup through the first few iterations of this rebalance loop. > Kafka Connect gets into an infinite rebalance loop > -- > > Key: KAFKA-10253 > URL: https://issues.apache.org/jira/browse/KAFKA-10253 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.5.0 >Reporter: Konstantin Lalafaryan >Priority: Blocker > > Hello everyone! > > We are running kafka-connect cluster (3 workers) and very often it gets into > an infinite rebalance loop. > > {code:java} > 2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Rebalance started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,731 INFO [Worker clientId=connect-1, groupId= > kafka-connect] (Re-)joining group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,733 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Was selected to perform assignments, but do not have latest > config found in sync request. Returning an empty configuration to trigger > re-sync. > (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Successfully joined group with generation 305655831 > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Joined group at generation 305655831 with protocol version 2 > and got assignment: Assignment{error=1, > leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', > leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], > taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Rebalance started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,735 INFO [Worker clientId=connect-1, groupId= > kafka-connect] (Re-)joining group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,736 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Was selected to perform assignments, but do not have latest > config found in sync request. Returning an empty configuration to trigger > re-sync. > (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Successfully joined group with generation 305655832 > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Joined group at generation 305655832 with protocol version 2 > and got assignment: Assignment{error=1, > leader='connect-1-0008abc5-a152-42fe-a697-a4a4641f72bb', > leaderUrl='http://10.20.30.221:8083/', offset=12, connectorIds=[], > taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Rebalance started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,739 INFO [Worker clientId=connect-1, groupId= > kafka-connect] (Re-)joining group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,740 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Was selected to perform assignments, but do not have latest > config found in sync request. Returning an empty configuration to trigger > re-sync. > (org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor) > [DistributedHerder-connect-1-1] > 2020-07-09 08:51:25,742 INFO [Worker clientId=connect-1, groupId= > kafka-connect] Successfully joined gr
[GitHub] [kafka] mjsax commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted
mjsax commented on a change in pull request #8994: URL: https://github.com/apache/kafka/pull/8994#discussion_r452410965 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -95,7 +96,7 @@ * | | Assigned (3)| <+ * | +-+---+ | * || | - * || | + * ||--+ Review comment: Where does this self-transition happen exactly? And could/should we detect this case and not call `setState()` for this case instead of allowing the transition? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted
mjsax commented on a change in pull request #8994: URL: https://github.com/apache/kafka/pull/8994#discussion_r452408268 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -232,10 +232,16 @@ void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) { log.debug("State store {} initialized from checkpoint with offset {} at changelog {}", store.stateStore.name(), store.offset, store.changelogPartition); } else { -// with EOS, if the previous run did not shutdown gracefully, we may lost the checkpoint file +// With EOS, if the previous run did not shutdown gracefully, we may lost the checkpoint file // and hence we are uncertain that the current local state only contains committed data; // in that case we need to treat it as a task-corrupted exception -if (eosEnabled && !storeDirIsEmpty) { + +// Note, this is a little overzealous, since we aren't checking whether the store's specific +// directory is nonempty, only if there are any directories for any stores. So if there are +// two stores in a task, and one is correctly written and checkpointed, while the other is +// neither written nor checkpointed, we _could_ correctly load the first and recover the second +// but instead we'll consider the whole task corrupted and discard the first and recover both. Review comment: Sound like something we should fix. Can you file a ticket? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-6453) Document timestamp propagation semantics
[ https://issues.apache.org/jira/browse/KAFKA-6453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6453: --- Description: Atm, Kafka Streams only has a defined "contract" about timestamp propagation at the Processor API level: all processor within a sub-topology, see the timestamp from the input topic record and this timestamp will be used for all result record when writing them to an topic, too. The DSL, inherits this "contract" atm. >From a DSL point of view, it would be desirable to provide a different >contract to the user. To allow this, we need to do the following: - extend Processor API to allow manipulation timestamps (ie, a Processor can set a new timestamp for downstream records) - define a DSL "contract" for timestamp propagation for each DSL operator - document the DSL "contract" - implement the DSL "contract" using the new/extended Processor API Changing the DSL contract etc was done via KIP-258. This ticket is about documenting the contract. was: Atm, Kafka Streams only has a defined "contract" about timestamp propagation at the Processor API level: all processor within a sub-topology, see the timestamp from the input topic record and this timestamp will be used for all result record when writing them to an topic, too. The DSL, inherits this "contract" atm. >From a DSL point of view, it would be desirable to provide a different >contract to the user. To allow this, we need to do the following: - extend Processor API to allow manipulation timestamps (ie, a Processor can set a new timestamp for downstream records) - define a DSL "contract" for timestamp propagation for each DSL operator - document the DSL "contract" - implement the DSL "contract" using the new/extended Processor API > Document timestamp propagation semantics > > > Key: KAFKA-6453 > URL: https://issues.apache.org/jira/browse/KAFKA-6453 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: James Galasyn >Priority: Major > Labels: needs-kip > > Atm, Kafka Streams only has a defined "contract" about timestamp propagation > at the Processor API level: all processor within a sub-topology, see the > timestamp from the input topic record and this timestamp will be used for all > result record when writing them to an topic, too. > The DSL, inherits this "contract" atm. > From a DSL point of view, it would be desirable to provide a different > contract to the user. To allow this, we need to do the following: > - extend Processor API to allow manipulation timestamps (ie, a Processor can > set a new timestamp for downstream records) > - define a DSL "contract" for timestamp propagation for each DSL operator > - document the DSL "contract" > - implement the DSL "contract" using the new/extended Processor API > Changing the DSL contract etc was done via KIP-258. This ticket is about > documenting the contract. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-6453) Document timestamp propagation semantics
[ https://issues.apache.org/jira/browse/KAFKA-6453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6453: --- Summary: Document timestamp propagation semantics (was: Reconsider timestamp propagation semantics) > Document timestamp propagation semantics > > > Key: KAFKA-6453 > URL: https://issues.apache.org/jira/browse/KAFKA-6453 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: James Galasyn >Priority: Major > Labels: needs-kip > > Atm, Kafka Streams only has a defined "contract" about timestamp propagation > at the Processor API level: all processor within a sub-topology, see the > timestamp from the input topic record and this timestamp will be used for all > result record when writing them to an topic, too. > The DSL, inherits this "contract" atm. > From a DSL point of view, it would be desirable to provide a different > contract to the user. To allow this, we need to do the following: > - extend Processor API to allow manipulation timestamps (ie, a Processor can > set a new timestamp for downstream records) > - define a DSL "contract" for timestamp propagation for each DSL operator > - document the DSL "contract" > - implement the DSL "contract" using the new/extended Processor API -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-6453) Reconsider timestamp propagation semantics
[ https://issues.apache.org/jira/browse/KAFKA-6453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-6453: -- Assignee: James Galasyn (was: Victoria Bialas) > Reconsider timestamp propagation semantics > -- > > Key: KAFKA-6453 > URL: https://issues.apache.org/jira/browse/KAFKA-6453 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: James Galasyn >Priority: Major > Labels: needs-kip > > Atm, Kafka Streams only has a defined "contract" about timestamp propagation > at the Processor API level: all processor within a sub-topology, see the > timestamp from the input topic record and this timestamp will be used for all > result record when writing them to an topic, too. > The DSL, inherits this "contract" atm. > From a DSL point of view, it would be desirable to provide a different > contract to the user. To allow this, we need to do the following: > - extend Processor API to allow manipulation timestamps (ie, a Processor can > set a new timestamp for downstream records) > - define a DSL "contract" for timestamp propagation for each DSL operator > - document the DSL "contract" > - implement the DSL "contract" using the new/extended Processor API -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on a change in pull request #8999: MINOR; Return timed out connections as a List instead of a Set
dajac commented on a change in pull request #8999: URL: https://github.com/apache/kafka/pull/8999#discussion_r452400759 ## File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ## @@ -443,13 +444,13 @@ public boolean isConnectionSetupTimeout(String id, long now) { } /** - * Return the Set of nodes whose connection setup has timed out. + * Return the List of nodes whose connection setup has timed out. * @param now the current time in ms */ -public Set nodesWithConnectionSetupTimeout(long now) { +public List nodesWithConnectionSetupTimeout(long now) { return connectingNodes.stream() .filter(id -> isConnectionSetupTimeout(id, now)) -.collect(Collectors.toSet()); +.collect(Collectors.toCollection(LinkedList::new)); Review comment: That’s good to know. I always thought that the iteration was more or less equivalent. I have learnt something today. Let me update that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8999: MINOR; Return timed out connections as a List instead of a Set
ijuma commented on a change in pull request #8999: URL: https://github.com/apache/kafka/pull/8999#discussion_r452396471 ## File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ## @@ -443,13 +444,13 @@ public boolean isConnectionSetupTimeout(String id, long now) { } /** - * Return the Set of nodes whose connection setup has timed out. + * Return the List of nodes whose connection setup has timed out. * @param now the current time in ms */ -public Set nodesWithConnectionSetupTimeout(long now) { +public List nodesWithConnectionSetupTimeout(long now) { return connectingNodes.stream() .filter(id -> isConnectionSetupTimeout(id, now)) -.collect(Collectors.toSet()); +.collect(Collectors.toCollection(LinkedList::new)); Review comment: FYI https://twitter.com/joshbloch/status/583813919019573248 :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8999: MINOR; Return timed out connections as a List instead of a Set
ijuma commented on a change in pull request #8999: URL: https://github.com/apache/kafka/pull/8999#discussion_r452395964 ## File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ## @@ -443,13 +444,13 @@ public boolean isConnectionSetupTimeout(String id, long now) { } /** - * Return the Set of nodes whose connection setup has timed out. + * Return the List of nodes whose connection setup has timed out. * @param now the current time in ms */ -public Set nodesWithConnectionSetupTimeout(long now) { +public List nodesWithConnectionSetupTimeout(long now) { return connectingNodes.stream() .filter(id -> isConnectionSetupTimeout(id, now)) -.collect(Collectors.toSet()); +.collect(Collectors.toCollection(LinkedList::new)); Review comment: Not at all. This is a perfect case for ArrayList. LinkedList iteration is very slow due to pointer chasing (in comparison). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes
mjsax commented on a change in pull request #8902: URL: https://github.com/apache/kafka/pull/8902#discussion_r452394510 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -103,8 +104,12 @@ public void init(final ProcessorContext context, @SuppressWarnings("unchecked") void initStoreSerde(final ProcessorContext context) { +final String storeName = name(); +final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName); serdes = new StateSerdes<>( -ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), + changelogTopic != null ? +changelogTopic : + ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName), Review comment: Ah thanks. I missed this case. However, should we move both `null` checks into `ProcessorContextUtils.changelogFor()` for this case? It seem, we do the same "outer" `null`-check each time we call the method, so why not do it at a single place in the code? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #8999: MINOR; Return timed out connections as a List instead of a Set
dajac commented on a change in pull request #8999: URL: https://github.com/apache/kafka/pull/8999#discussion_r452393587 ## File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ## @@ -443,13 +444,13 @@ public boolean isConnectionSetupTimeout(String id, long now) { } /** - * Return the Set of nodes whose connection setup has timed out. + * Return the List of nodes whose connection setup has timed out. * @param now the current time in ms */ -public Set nodesWithConnectionSetupTimeout(long now) { +public List nodesWithConnectionSetupTimeout(long now) { return connectingNodes.stream() .filter(id -> isConnectionSetupTimeout(id, now)) -.collect(Collectors.toSet()); +.collect(Collectors.toCollection(LinkedList::new)); Review comment: We only add elements to the List and then iterate over it. A LinkedList seems slightly better for this case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes
mjsax commented on a change in pull request #8902: URL: https://github.com/apache/kafka/pull/8902#discussion_r452391941 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java ## @@ -45,24 +45,23 @@ protected ProcessorNode currentNode; private long currentSystemTimeMs; -final StateManager stateManager; Review comment: I see. Would it not be sufficient to just keep a ("duplicate") reference of `ProcessorStateManager` within `ProcessorContextImpl`? Just to clarify: I am ok with the proposed changes. Just wondering if it's really the best structure. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes
ableegoldman commented on pull request #8902: URL: https://github.com/apache/kafka/pull/8902#issuecomment-656265143 Aw...the EosBetaUpgradeIntegrationTest failed again?? :/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes
cadonna commented on pull request #8902: URL: https://github.com/apache/kafka/pull/8902#issuecomment-656263751 Unrelated test failure: ``` org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true] ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
ableegoldman commented on a change in pull request #8996: URL: https://github.com/apache/kafka/pull/8996#discussion_r452372357 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java ## @@ -784,7 +784,7 @@ public void close() { } @Test -public void shouldThrowTaskCorruptedWithoutCheckpointNonEmptyDir() throws IOException { +public void shouldThrowTaskCorruptedWithoutPersistentStoreCheckpointAndNonEmptyDir() throws IOException { final long checkpointOffset = 10L; final Map offsets = mkMap( Review comment: Sorry yeah the relevant part doesn't show up on github. Basically we register ``` stateMgr.registerStore(persistentStore, persistentStore.stateRestoreCallback); stateMgr.registerStore(persistentStoreTwo, persistentStoreTwo.stateRestoreCallback); stateMgr.registerStore(nonPersistentStore, nonPersistentStore.stateRestoreCallback); ``` but only write the checkpoint for the `persistentStorePartition`, `nonPersistentStorePartition` and `irrelevantPartition` . I think the point of the `irrelevantPartition` is to make sure that we detect that the `persistentStoreTwoPartition` offset is missing even though the checkpoint technically has the correct number of offsets in total. ie, that we actually map the offsets to a registered changelog This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
mjsax commented on pull request #8996: URL: https://github.com/apache/kafka/pull/8996#issuecomment-656261358 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
ableegoldman commented on a change in pull request #8996: URL: https://github.com/apache/kafka/pull/8996#discussion_r452372357 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java ## @@ -784,7 +784,7 @@ public void close() { } @Test -public void shouldThrowTaskCorruptedWithoutCheckpointNonEmptyDir() throws IOException { +public void shouldThrowTaskCorruptedWithoutPersistentStoreCheckpointAndNonEmptyDir() throws IOException { final long checkpointOffset = 10L; final Map offsets = mkMap( Review comment: Sorry yeah the relevant part doesn't show up on github. Basically we register ``` stateMgr.registerStore(persistentStore, persistentStore.stateRestoreCallback); stateMgr.registerStore(persistentStoreTwo, persistentStoreTwo.stateRestoreCallback); stateMgr.registerStore(nonPersistentStore, nonPersistentStore.stateRestoreCallback); ``` but only write the checkpoint for the `persistentStorePartition`, nonPersistentStorePartition` and `irrelevantPartition` . I think the point of the `irrelevantPartition` is to make sure that we detect that the `persistentStoreTwoPartition` offset is missing even though the checkpoint technically has the correct number of offsets in total. ie, that we actually map the offsets to a registered changelog This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
mjsax commented on pull request #8996: URL: https://github.com/apache/kafka/pull/8996#issuecomment-656261242 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8994: KAFKA-10247: Correctly reset state when task is corrupted
ableegoldman commented on a change in pull request #8994: URL: https://github.com/apache/kafka/pull/8994#discussion_r452381474 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -232,10 +232,16 @@ void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) { log.debug("State store {} initialized from checkpoint with offset {} at changelog {}", store.stateStore.name(), store.offset, store.changelogPartition); } else { -// with EOS, if the previous run did not shutdown gracefully, we may lost the checkpoint file +// With EOS, if the previous run did not shutdown gracefully, we may lost the checkpoint file // and hence we are uncertain that the current local state only contains committed data; // in that case we need to treat it as a task-corrupted exception -if (eosEnabled && !storeDirIsEmpty) { + +// Note, this is a little overzealous, since we aren't checking whether the store's specific +// directory is nonempty, only if there are any directories for any stores. So if there are +// two stores in a task, and one is correctly written and checkpointed, while the other is +// neither written nor checkpointed, we _could_ correctly load the first and recover the second +// but instead we'll consider the whole task corrupted and discard the first and recover both. +if (store.stateStore.persistent() && eosEnabled && !taskDirIsEmpty) { Review comment: I like my fix better :P But seriously, no need to block this PR on mine if it's suddenly causing tests to fail. Mysterious.. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol
mimaison commented on a change in pull request #8295: URL: https://github.com/apache/kafka/pull/8295#discussion_r452374925 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ## @@ -994,30 +1023,29 @@ public void onSuccess(ClientResponse response, RequestFuture f * value of each partition may be null only for v0. In v1 and later the ListOffset API would not * return a null timestamp (-1 is returned instead when necessary). */ -private void handleListOffsetResponse(Map timestampsToSearch, +private void handleListOffsetResponse(Map timestampsToSearch, ListOffsetResponse listOffsetResponse, RequestFuture future) { Map fetchedOffsets = new HashMap<>(); Set partitionsToRetry = new HashSet<>(); Set unauthorizedTopics = new HashSet<>(); -for (Map.Entry entry : timestampsToSearch.entrySet()) { +Map partitionsData = byTopicPartitions(listOffsetResponse.responseData()); Review comment: We now added logic in the AdminClient to handle partial responses from brokers (based on https://github.com/apache/kafka/pull/8295#discussion_r449575550). Shouldn't we do the same here instead of assuming the response is always complete? I'm not even sure if we should retry if a resource is missing from the response but we could at least log it instead of hitting a NPE. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vinothchandar commented on pull request #8948: KAFKA-10174: Prefer --bootstrap-server for configs command in ducker tests
vinothchandar commented on pull request #8948: URL: https://github.com/apache/kafka/pull/8948#issuecomment-656251801 @cmccabe this PR is a sub task under KAFKA-10131, which tracks the bigger goal. I filed a new issue explicitly targetting removal of `use_zk_connection` . For this pr, the public methods affects `alter_message_format`/`set_unclean_leader_election` don't offer this flag.. So this PR should be fine for this scope? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
ableegoldman commented on a change in pull request #8996: URL: https://github.com/apache/kafka/pull/8996#discussion_r452372357 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java ## @@ -784,7 +784,7 @@ public void close() { } @Test -public void shouldThrowTaskCorruptedWithoutCheckpointNonEmptyDir() throws IOException { +public void shouldThrowTaskCorruptedWithoutPersistentStoreCheckpointAndNonEmptyDir() throws IOException { final long checkpointOffset = 10L; final Map offsets = mkMap( Review comment: Sorry yeah the relevant part doesn't show up on github. Basically we register ``` stateMgr.registerStore(persistentStore, persistentStore.stateRestoreCallback); stateMgr.registerStore(persistentStoreTwo, persistentStoreTwo.stateRestoreCallback); stateMgr.registerStore(nonPersistentStore, nonPersistentStore.stateRestoreCallback); ``` but only write the checkpoint for the `persistentStorePartition`, nonPersistentStorePartition` and `irrelevantPartition`. I think the point of the `irrelevantPartition` is to make sure that we detect that the `persistentStoreTwoPartition` offset is missing even though the checkpoint technically has the correct number of offsets in total. ie, that we actually map the offsets to a registered changelog This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10258) Get rid of use_zk_connection flag in kafka.py public methods
Vinoth Chandar created KAFKA-10258: -- Summary: Get rid of use_zk_connection flag in kafka.py public methods Key: KAFKA-10258 URL: https://issues.apache.org/jira/browse/KAFKA-10258 Project: Kafka Issue Type: Sub-task Reporter: Vinoth Chandar -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on pull request #8993: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938)
vvcephei commented on pull request #8993: URL: https://github.com/apache/kafka/pull/8993#issuecomment-656249347 Ah, and the other smoke test usages are messed up: ``` test_id: kafkatest.tests.streams.streams_eos_test.StreamsEosTest.test_rebalance_simple status: FAIL run time: 1 minute 28.121 seconds __init__() takes exactly 4 arguments (3 given) Traceback (most recent call last): File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.8-py2.7.egg/ducktape/tests/runner_client.py", line 134, in run data = self.run_test() File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.8-py2.7.egg/ducktape/tests/runner_client.py", line 192, in run_test return self.test_context.function(self.test) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_eos_test.py", line 41, in test_rebalance_simple self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, self.kafka), TypeError: __init__() takes exactly 4 arguments (3 given) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8996: KAFKA-10249: don't try to read un-checkpointed offsets of in-memory stores
ableegoldman commented on a change in pull request #8996: URL: https://github.com/apache/kafka/pull/8996#discussion_r452370517 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java ## @@ -104,17 +104,16 @@ static void closeStateManager(final Logger log, if (stateDirectory.lock(id)) { try { stateMgr.close(); - +} catch (final ProcessorStateException e) { +firstException.compareAndSet(null, e); +} finally { if (wipeStateStore) { log.debug("Wiping state stores for {} task {}", taskType, id); // we can just delete the whole dir of the task, including the state store images and the checkpoint files, // and then we write an empty checkpoint file indicating that the previous close is graceful and we just // need to re-bootstrap the restoration from the beginning Utils.delete(stateMgr.baseDir()); Review comment: Right, it's not a correctness issue but it's additional needless overhead to go through the whole cycle of initializing a task, getting a TaskCorrupted, wiping it then, and finally restarting it. Of course if we keep hitting an issue during `closeDirty` then we might never wipe the state, which does seem like a real problem. For example if there's some issue with the state, like the files are actually corrupted or something This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10213) Prefer --bootstrap-server in ducktape tests for Kafka clients
[ https://issues.apache.org/jira/browse/KAFKA-10213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vinoth Chandar reassigned KAFKA-10213: -- Assignee: Ron Dagostino (was: Vinoth Chandar) > Prefer --bootstrap-server in ducktape tests for Kafka clients > - > > Key: KAFKA-10213 > URL: https://issues.apache.org/jira/browse/KAFKA-10213 > Project: Kafka > Issue Type: Sub-task >Reporter: Vinoth Chandar >Assignee: Ron Dagostino >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] C0urante commented on pull request #8973: KAFKA-10218: Stop reading config topic in every subsequent tick if catchup fails once
C0urante commented on pull request #8973: URL: https://github.com/apache/kafka/pull/8973#issuecomment-656247489 @gharris1727 @chia7712 @ncliang would any of you be interested in reviewing this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #9003: KAFKA-10240: Stop throwing WakeupExceptions during sink task shutdown
C0urante commented on pull request #9003: URL: https://github.com/apache/kafka/pull/9003#issuecomment-656247517 @gharris1727 @chia7712 @ncliang would any of you be interested in reviewing this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10235) Fix flaky transactions_test.py
[ https://issues.apache.org/jira/browse/KAFKA-10235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-10235: Fix Version/s: (was: 3.0.0) 2.7.0 > Fix flaky transactions_test.py > -- > > Key: KAFKA-10235 > URL: https://issues.apache.org/jira/browse/KAFKA-10235 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > Fix For: 2.7.0 > > > {code} > =hard_bounce.bounce_target=clients.check_order=False.use_group_metadata=False: > FAIL: copier-1 : Message copier didn't make enough progress in 30s. Current > progress: 0 > Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/ducktape/tests/runner_client.py", > line 134, in run > data = self.run_test() > File > "/usr/local/lib/python2.7/dist-packages/ducktape/tests/runner_client.py", > line 192, in run_test > return self.test_context.function(self.test) > File "/usr/local/lib/python2.7/dist-packages/ducktape/mark/_mark.py", line > 429, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File "/opt/kafka-dev/tests/kafkatest/tests/core/transactions_test.py", line > 254, in test_transactions > num_messages_to_copy=self.num_seed_messages, > use_group_metadata=use_group_metadata) > File "/opt/kafka-dev/tests/kafkatest/tests/core/transactions_test.py", line > 195, in copy_messages_transactionally > self.bounce_copiers(copiers, clean_shutdown) > File "/opt/kafka-dev/tests/kafkatest/tests/core/transactions_test.py", line > 120, in bounce_copiers > % (copier.transactional_id, str(copier.progress_percent( > File "/usr/local/lib/python2.7/dist-packages/ducktape/utils/util.py", line > 41, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) > TimeoutError: copier-1 : Message copier didn't make enough progress in 30s. > Current progress: 0 > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on pull request #8993: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938)
vvcephei commented on pull request #8993: URL: https://github.com/apache/kafka/pull/8993#issuecomment-656247859 Looks like there was a spurious failure of the new test: ``` test_id: kafkatest.tests.streams.streams_application_upgrade_test.StreamsUpgradeTest.test_app_upgrade.to_version=2.5.0-SNAPSHOT.from_version=2.0.1.bounce_type=full status: FAIL run time: 1 minute 43.164 seconds Server connection dropped: Traceback (most recent call last): File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.8-py2.7.egg/ducktape/tests/runner_client.py", line 134, in run data = self.run_test() File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.8-py2.7.egg/ducktape/tests/runner_client.py", line 192, in run_test return self.test_context.function(self.test) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.8-py2.7.egg/ducktape/mark/_mark.py", line 429, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_application_upgrade_test.py", line 108, in test_app_upgrade self.restart_all_nodes_with(to_version) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/tests/streams/streams_application_upgrade_test.py", line 173, in restart_all_nodes_with self.processor2.start_node(self.processor2.node) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/tests/kafkatest/services/streams.py", line 305, in start_node node.account.create_file(self.CONFIG_FILE, prop_file) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.8-py2.7.egg/ducktape/cluster/remoteaccount.py", line 588, in create_file with self.sftp_client.open(path, "w") as f: File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/paramiko-2.6.0-py2.7.egg/paramiko/sftp_client.py", line 372, in open t, msg = self._request(CMD_OPEN, filename, imode, attrblock) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/paramiko-2.6.0-py2.7.egg/paramiko/sftp_client.py", line 813, in _request return self._read_response(num) File "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python2.7/site-packages/paramiko-2.6.0-py2.7.egg/paramiko/sftp_client.py", line 845, in _read_response raise SSHException("Server connection dropped: {}".format(e)) SSHException: Server connection dropped: ``` I'll re-run it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante opened a new pull request #9003: KAFKA-10240: Stop throwing WakeupExceptions during sink task shutdown
C0urante opened a new pull request #9003: URL: https://github.com/apache/kafka/pull/9003 A benign `WakeupException` can be thrown by a sink task's consumer if it's scheduled for shutdown by the worker. This is caught and handled gracefully if the exception is thrown when calling `poll` on the consumer, but not if called `commitSync`, which is invoked by a task during shutdown and also when its partition assignment is updated. If thrown during a partition assignment update, the `WakeupException` is caught and handled gracefully as part of the task's `iteration` loop. If thrown during shutdown, however, it is not caught and instead leads to the scary-looking log message "Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted.". These changes catch the `WakeupException` during shutdown and handle it gracefully with a `TRACE`-level log message. A unit test is added to verify this behavior by simulating a thrown `WakeupException` during `Consumer::commitSync`, running through the `WorkerSinkTask::execute` method, and confirming that it does not throw a `WakeupException` itself. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10174) Prefer --bootstrap-server ducktape tests using kafka_configs.sh
[ https://issues.apache.org/jira/browse/KAFKA-10174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17154741#comment-17154741 ] Vinoth Chandar commented on KAFKA-10174: [https://github.com/apache/kafka/pull/8948] > Prefer --bootstrap-server ducktape tests using kafka_configs.sh > --- > > Key: KAFKA-10174 > URL: https://issues.apache.org/jira/browse/KAFKA-10174 > Project: Kafka > Issue Type: Sub-task >Reporter: Vinoth Chandar >Assignee: Vinoth Chandar >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] omkreddy commented on pull request #8878: MINOR: Generator config-specific HTML ids
omkreddy commented on pull request #8878: URL: https://github.com/apache/kafka/pull/8878#issuecomment-656246737 I agree, it's a useful fix. I think its OK to break existing links. I have not seen anywhere using those links. Lets see what others think. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org