[GitHub] [kafka] satishd closed pull request #11197: 28x TS changes
satishd closed pull request #11197: URL: https://github.com/apache/kafka/pull/11197 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #11198: MINOR: Use compressionType parameter in SnapshotWriter
dengziming commented on pull request #11198: URL: https://github.com/apache/kafka/pull/11198#issuecomment-80080 @showuon Thank you, I am thinking how can I test this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11124: KAFKA-12839: Let SlidingWindows aggregation support window size of 0
mjsax commented on a change in pull request #11124: URL: https://github.com/apache/kafka/pull/11124#discussion_r690013977 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -100,17 +99,78 @@ private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); private final String threadId = Thread.currentThread().getName(); +private final String topic = "topic"; +private final String defaultInOrderName = "InOrder"; +private final String defaultReverseName = "Reverse"; +private final long defaultWindowSize = 10L; +private final long defaultRetentionPeriod = 5000L; + +private WindowBytesStoreSupplier getStoreSupplier(final boolean inOrderIterator, + final String inOrderName, + final String reverseName, + final long windowSize) { +return inOrderIterator +? new InOrderMemoryWindowStoreSupplier(inOrderName, defaultRetentionPeriod, windowSize, false) +: Stores.inMemoryWindowStore(reverseName, ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false); +} + +@SuppressWarnings("unchecked") +@Test +public void testAggregateSmallInputWithZeroTimeDifference() { +final StreamsBuilder builder = new StreamsBuilder(); + +// We use TimeWindow to represent the "windowed KTable" internally, so, the window size must be greater than 0 here +final WindowBytesStoreSupplier storeSupplier = getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 1L); Review comment: Hmmm... Seems to be in issue... The actual final return type is `KTable, V>` and thus is window-type agnostic. So we already have such a "container". -- However, `windowedBy(SlidingWindow)` returns a `TimeWindowedKStream`... Return types are not easy to change... And I don't think we can just switch from `TimeWindow` to `SlidingWindow` as concrete type either for the sliding window case... Maybe we are stuck and cannot fix the bug without a breaking change? For this case, we would indeed need to carry on with the KIP (but we could only do it in 4.0...), but I am wondering if it's worth fixing given the impact? Also: we have a few issues with the current DSL that we cannot fix easily (eg KIP-300). Thus, a long term solution could be, to leave the current API as-is, and built a new DSL in parallel (we did this in the past when we introduced `StreamsBuilder`). This way, we can change the API in any way, but it would be a long-term solution only. It might also help with regard to the new PAPI that uses `Record` instead of `` type, and that is not easily adopted for `transform()` (and siblings). We could change the whole DSL to `Record` (ie, `KStream` -- or course we don't need `Record` in the generic type -- it's just for illustrative purpose). It would also cover the "add headers" KIP, fix KIP-300, we could introduce a `PartitionedKStream` (cf current KIP-759 discussion) and a few other minor issue (like rename `KGroupedStream` to `GroupedKStream`) all at once... And we could cleanup the topology optimization step and operator naming rules (which are a big mess to understand which `Named` object overwrites others...) -- We can also get rid of the wrappers for `KeyValueStore` to `TimestampedKeyValueStore` and change the interface from `Materialized` to `Materialized
[GitHub] [kafka] showuon commented on a change in pull request #11218: MINOR: optimize performAssignment to skip unnecessary check
showuon commented on a change in pull request #11218: URL: https://github.com/apache/kafka/pull/11218#discussion_r690007620 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java ## @@ -48,6 +53,16 @@ public class ConsumerConfig extends AbstractConfig { private static final ConfigDef CONFIG; +// a list contains all the in-product assignor names. Should be updated when new assignor added. +// This is to help optimize the ConsumerCoordinator#performAssignment +public static final List IN_PRODUCT_ASSIGNOR_NAMES = +Collections.unmodifiableList(Arrays.asList( +RANGE_ASSIGNOR_NAME, +ROUNDROBIN_ASSIGNOR_NAME, +STICKY_ASSIGNOR_NAME, +COOPERATIVE_STICKY_ASSIGNOR_NAME Review comment: Question: Should we add `StreamsPartitionAssignor` here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned
jsancio commented on a change in pull request #11216: URL: https://github.com/apache/kafka/pull/11216#discussion_r689978096 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -2074,48 +2073,23 @@ class ReplicaManager(val config: KafkaConfig, } } - private[kafka] def calculateDeltaChanges(delta: TopicsDelta) -: (mutable.HashMap[TopicPartition, Boolean], - mutable.HashMap[TopicPartition, LocalLeaderInfo], - mutable.HashMap[TopicPartition, LocalLeaderInfo]) = { -val deleted = new mutable.HashMap[TopicPartition, Boolean]() -delta.deletedTopicIds().forEach { topicId => - val topicImage = delta.image().getTopic(topicId) - topicImage.partitions().keySet().forEach { partitionId => -deleted.put(new TopicPartition(topicImage.name(), partitionId), true) - } -} -val newLocalLeaders = new mutable.HashMap[TopicPartition, LocalLeaderInfo]() -val newLocalFollowers = new mutable.HashMap[TopicPartition, LocalLeaderInfo]() -delta.changedTopics().values().forEach { topicDelta => - topicDelta.newLocalLeaders(config.nodeId).forEach { e => -newLocalLeaders.put(new TopicPartition(topicDelta.name(), e.getKey), - LocalLeaderInfo(topicDelta.id(), e.getValue)) - } - topicDelta.newLocalFollowers(config.nodeId).forEach { e => -newLocalFollowers.put(new TopicPartition(topicDelta.name(), e.getKey), - LocalLeaderInfo(topicDelta.id(), e.getValue)) - } -} -(deleted, newLocalLeaders, newLocalFollowers) - } - /** * Apply a KRaft topic change delta. * * @param newImageThe new metadata image. * @param delta The delta to apply. */ def applyDelta(newImage: MetadataImage, delta: TopicsDelta): Unit = { -// Before taking the lock, build some hash maps that we will need. -val (deleted, newLocalLeaders, newLocalFollowers) = calculateDeltaChanges(delta) +// Before taking the lock, compute the local changes +val localChanges = delta.localChanges(config.nodeId) replicaStateChangeLock.synchronized { // Handle deleted partitions. We need to do this first because we might subsequently // create new partitions with the same names as the ones we are deleting here. - if (!deleted.isEmpty) { -stateChangeLogger.info(s"Deleting ${deleted.size} partition(s).") -stopPartitions(deleted).foreach { case (topicPartition, e) => + if (!localChanges.deletes.isEmpty) { Review comment: If that happens then `TopicsDelta` would remove it from `changedTopics`: https://github.com/apache/kafka/pull/11216/files#diff-521beb14ca284c8e5ed56e92271216167da342c44b86992add15f33c39128ecbR93 ``` public String replay(RemoveTopicRecord record) { TopicDelta topicDelta = changedTopics.remove(record.topicId()); ... } ``` Let me write a test that shows this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned
jsancio commented on a change in pull request #11216: URL: https://github.com/apache/kafka/pull/11216#discussion_r689978096 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -2074,48 +2073,23 @@ class ReplicaManager(val config: KafkaConfig, } } - private[kafka] def calculateDeltaChanges(delta: TopicsDelta) -: (mutable.HashMap[TopicPartition, Boolean], - mutable.HashMap[TopicPartition, LocalLeaderInfo], - mutable.HashMap[TopicPartition, LocalLeaderInfo]) = { -val deleted = new mutable.HashMap[TopicPartition, Boolean]() -delta.deletedTopicIds().forEach { topicId => - val topicImage = delta.image().getTopic(topicId) - topicImage.partitions().keySet().forEach { partitionId => -deleted.put(new TopicPartition(topicImage.name(), partitionId), true) - } -} -val newLocalLeaders = new mutable.HashMap[TopicPartition, LocalLeaderInfo]() -val newLocalFollowers = new mutable.HashMap[TopicPartition, LocalLeaderInfo]() -delta.changedTopics().values().forEach { topicDelta => - topicDelta.newLocalLeaders(config.nodeId).forEach { e => -newLocalLeaders.put(new TopicPartition(topicDelta.name(), e.getKey), - LocalLeaderInfo(topicDelta.id(), e.getValue)) - } - topicDelta.newLocalFollowers(config.nodeId).forEach { e => -newLocalFollowers.put(new TopicPartition(topicDelta.name(), e.getKey), - LocalLeaderInfo(topicDelta.id(), e.getValue)) - } -} -(deleted, newLocalLeaders, newLocalFollowers) - } - /** * Apply a KRaft topic change delta. * * @param newImageThe new metadata image. * @param delta The delta to apply. */ def applyDelta(newImage: MetadataImage, delta: TopicsDelta): Unit = { -// Before taking the lock, build some hash maps that we will need. -val (deleted, newLocalLeaders, newLocalFollowers) = calculateDeltaChanges(delta) +// Before taking the lock, compute the local changes +val localChanges = delta.localChanges(config.nodeId) replicaStateChangeLock.synchronized { // Handle deleted partitions. We need to do this first because we might subsequently // create new partitions with the same names as the ones we are deleting here. - if (!deleted.isEmpty) { -stateChangeLogger.info(s"Deleting ${deleted.size} partition(s).") -stopPartitions(deleted).foreach { case (topicPartition, e) => + if (!localChanges.deletes.isEmpty) { Review comment: If that happens then `TopicsDelta` would remove it from `changedTopics`: https://github.com/apache/kafka/pull/11216/files#diff-521beb14ca284c8e5ed56e92271216167da342c44b86992add15f33c39128ecbR93 Let me write a test that shows this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned
junrao commented on a change in pull request #11216: URL: https://github.com/apache/kafka/pull/11216#discussion_r689974956 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -2074,48 +2073,23 @@ class ReplicaManager(val config: KafkaConfig, } } - private[kafka] def calculateDeltaChanges(delta: TopicsDelta) -: (mutable.HashMap[TopicPartition, Boolean], - mutable.HashMap[TopicPartition, LocalLeaderInfo], - mutable.HashMap[TopicPartition, LocalLeaderInfo]) = { -val deleted = new mutable.HashMap[TopicPartition, Boolean]() -delta.deletedTopicIds().forEach { topicId => - val topicImage = delta.image().getTopic(topicId) - topicImage.partitions().keySet().forEach { partitionId => -deleted.put(new TopicPartition(topicImage.name(), partitionId), true) - } -} -val newLocalLeaders = new mutable.HashMap[TopicPartition, LocalLeaderInfo]() -val newLocalFollowers = new mutable.HashMap[TopicPartition, LocalLeaderInfo]() -delta.changedTopics().values().forEach { topicDelta => - topicDelta.newLocalLeaders(config.nodeId).forEach { e => -newLocalLeaders.put(new TopicPartition(topicDelta.name(), e.getKey), - LocalLeaderInfo(topicDelta.id(), e.getValue)) - } - topicDelta.newLocalFollowers(config.nodeId).forEach { e => -newLocalFollowers.put(new TopicPartition(topicDelta.name(), e.getKey), - LocalLeaderInfo(topicDelta.id(), e.getValue)) - } -} -(deleted, newLocalLeaders, newLocalFollowers) - } - /** * Apply a KRaft topic change delta. * * @param newImageThe new metadata image. * @param delta The delta to apply. */ def applyDelta(newImage: MetadataImage, delta: TopicsDelta): Unit = { -// Before taking the lock, build some hash maps that we will need. -val (deleted, newLocalLeaders, newLocalFollowers) = calculateDeltaChanges(delta) +// Before taking the lock, compute the local changes +val localChanges = delta.localChanges(config.nodeId) replicaStateChangeLock.synchronized { // Handle deleted partitions. We need to do this first because we might subsequently // create new partitions with the same names as the ones we are deleting here. - if (!deleted.isEmpty) { -stateChangeLogger.info(s"Deleting ${deleted.size} partition(s).") -stopPartitions(deleted).foreach { case (topicPartition, e) => + if (!localChanges.deletes.isEmpty) { Review comment: Is it always correct to process the deletes first? For example, a topic could be first created and then deleted. By processing the deletes first, we would be keeping the partition and the log when they should be deleted? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11124: KAFKA-12839: Let SlidingWindows aggregation support window size of 0
showuon commented on a change in pull request #11124: URL: https://github.com/apache/kafka/pull/11124#discussion_r689953353 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -100,17 +99,78 @@ private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); private final String threadId = Thread.currentThread().getName(); +private final String topic = "topic"; +private final String defaultInOrderName = "InOrder"; +private final String defaultReverseName = "Reverse"; +private final long defaultWindowSize = 10L; +private final long defaultRetentionPeriod = 5000L; + +private WindowBytesStoreSupplier getStoreSupplier(final boolean inOrderIterator, + final String inOrderName, + final String reverseName, + final long windowSize) { +return inOrderIterator +? new InOrderMemoryWindowStoreSupplier(inOrderName, defaultRetentionPeriod, windowSize, false) +: Stores.inMemoryWindowStore(reverseName, ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false); +} + +@SuppressWarnings("unchecked") +@Test +public void testAggregateSmallInputWithZeroTimeDifference() { +final StreamsBuilder builder = new StreamsBuilder(); + +// We use TimeWindow to represent the "windowed KTable" internally, so, the window size must be greater than 0 here +final WindowBytesStoreSupplier storeSupplier = getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 1L); Review comment: @mjsax , this is the one remaining question left in this PR. Because we use `TimeWindow` to represent the windowed key, so that we can only set window size > 0. One solution provided by @ableegoldman is that we can have a window container that only holds the start and end time. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #11124: KAFKA-12839: Let SlidingWindows aggregation support window size of 0
mjsax commented on a change in pull request #11124: URL: https://github.com/apache/kafka/pull/11124#discussion_r689950309 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -100,17 +99,78 @@ private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); private final String threadId = Thread.currentThread().getName(); +private final String topic = "topic"; +private final String defaultInOrderName = "InOrder"; +private final String defaultReverseName = "Reverse"; +private final long defaultWindowSize = 10L; +private final long defaultRetentionPeriod = 5000L; + +private WindowBytesStoreSupplier getStoreSupplier(final boolean inOrderIterator, + final String inOrderName, + final String reverseName, + final long windowSize) { +return inOrderIterator +? new InOrderMemoryWindowStoreSupplier(inOrderName, defaultRetentionPeriod, windowSize, false) +: Stores.inMemoryWindowStore(reverseName, ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false); +} + +@SuppressWarnings("unchecked") +@Test +public void testAggregateSmallInputWithZeroTimeDifference() { +final StreamsBuilder builder = new StreamsBuilder(); + +// We use TimeWindow to represent the "windowed KTable" internally, so, the window size must be greater than 0 here +final WindowBytesStoreSupplier storeSupplier = getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 1L); Review comment: Size should be `0L` ? ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -100,17 +99,78 @@ private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); private final String threadId = Thread.currentThread().getName(); +private final String topic = "topic"; +private final String defaultInOrderName = "InOrder"; +private final String defaultReverseName = "Reverse"; +private final long defaultWindowSize = 10L; +private final long defaultRetentionPeriod = 5000L; + +private WindowBytesStoreSupplier getStoreSupplier(final boolean inOrderIterator, + final String inOrderName, + final String reverseName, + final long windowSize) { +return inOrderIterator +? new InOrderMemoryWindowStoreSupplier(inOrderName, defaultRetentionPeriod, windowSize, false) +: Stores.inMemoryWindowStore(reverseName, ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false); +} + +@SuppressWarnings("unchecked") +@Test +public void testAggregateSmallInputWithZeroTimeDifference() { +final StreamsBuilder builder = new StreamsBuilder(); + +// We use TimeWindow to represent the "windowed KTable" internally, so, the window size must be greater than 0 here Review comment: Seems this comment needs to be updated? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned
jsancio commented on a change in pull request #11216: URL: https://github.com/apache/kafka/pull/11216#discussion_r689940858 ## File path: metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java ## @@ -126,6 +133,158 @@ private static TopicImage newTopicImage(String name, Uuid id, PartitionRegistrat IMAGE2 = new TopicsImage(newTopicsByIdMap(topics2), newTopicsByNameMap(topics2)); } +private PartitionRegistration newPartition(int[] replicas) { +return new PartitionRegistration(replicas, replicas, Replicas.NONE, Replicas.NONE, replicas[0], 1, 1); +} + +@Test +public void testLocalReplicaChanges() { +int localId = 3; +Uuid newFooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w"); + +List topics = new ArrayList<>(TOPIC_IMAGES1); +topics.add( +newTopicImage( +"foo", +newFooId, +newPartition(new int[] {0, 1, 3}), +newPartition(new int[] {3, 1, 2}), +newPartition(new int[] {0, 1, 3}), +newPartition(new int[] {3, 1, 2}), +newPartition(new int[] {0, 1, 2}), +newPartition(new int[] {0, 1, 2}) +) +); + +TopicsImage image = new TopicsImage(newTopicsByIdMap(topics), newTopicsByNameMap(topics)); + +List topicRecords = new ArrayList<>(DELTA1_RECORDS); +// foo-0 - follower to leader +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(0) + .setLeader(3), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-1 - leader to follower +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(1) + .setLeader(1), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-2 - follower to removed +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(2) + .setIsr(Arrays.asList(0, 1, 2)) + .setReplicas(Arrays.asList(0, 1, 2)), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-3 - leader to removed +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(3) + .setLeader(0) + .setIsr(Arrays.asList(0, 1, 2)) + .setReplicas(Arrays.asList(0, 1, 2)), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-4 - not replica to leader +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(4) + .setLeader(3) + .setIsr(Arrays.asList(3, 1, 2)) + .setReplicas(Arrays.asList(3, 1, 2)), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-5 - not replica to follower +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(5) + .setIsr(Arrays.asList(0, 1, 3)) + .setReplicas(Arrays.asList(0, 1, 3)), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); + +/* Changes already include in DELTA1_RECORDS: + * foo - topic id deleted + * bar-0 - stay as follower with different partition epoch + * baz-0 - new topic to leader + */ + +// baz-1 - new topic to follower +topicRecords.add( +new ApiMessageAndVersion( +new PartitionRecord() +.setPartitionId(1) +.setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg")) +.setReplicas(Arrays.asList(4, 2, 3)) +.setIsr(Arrays.asList(4, 2, 3)) +.setLeader(4) +.setLeaderEpoch(2) +.setPartitionEpoch(1), +PARTITION_RECORD.highestSupportedVersion() +) +); + +TopicsDelta delta = new TopicsDelta(image); +RecordTestUtils.replayAll(delta, topicRecords); + +LocalReplicaChanges changes = delta.localChanges(localId); +assertEquals( +new HashSet<>( +
[jira] [Created] (KAFKA-13208) Use TopicIdPartition instead of TopicPartition when computing the topic delta
Jose Armando Garcia Sancio created KAFKA-13208: -- Summary: Use TopicIdPartition instead of TopicPartition when computing the topic delta Key: KAFKA-13208 URL: https://issues.apache.org/jira/browse/KAFKA-13208 Project: Kafka Issue Type: Improvement Components: kraft, replication Reporter: Jose Armando Garcia Sancio Assignee: Jose Armando Garcia Sancio {{TopicPartition}} is used as the key when computing the local changes in {{TopicsDelta}}. The topic id is included in the Map value return by {{localChanges}}. I think that the handling of this code and the corresponding code in {{ReplicaManager}} could be simplified if {{localChanges}} instead returned something like {code:java} { deletes: Set[TopicIdPartition], leaders: Map[TopicIdPartition, PartitionRegistration], followers: Map[TopicIdPartition, PartitionRegistration] }{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned
jsancio commented on a change in pull request #11216: URL: https://github.com/apache/kafka/pull/11216#discussion_r689937435 ## File path: core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala ## @@ -410,6 +421,32 @@ class KRaftClusterTest { } } + private def checkReplicaManager(cluster: KafkaClusterTestKit, expectedHosting: List[(Int, List[Boolean])]): Unit = { +for ((brokerId, partitionsIsHosted) <- expectedHosting) { + val broker = cluster.brokers().get(brokerId) + // lock and unlock so we can read the replica manager Review comment: I think we need to do this because `var replicaManager` in `BrokerServer` is set in a different thread by `KafkaClusterTestKit`. If my understanding of Java Memory Model is correct we need to lock/synchronize so that we force a memory barrier. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned
jsancio commented on a change in pull request #11216: URL: https://github.com/apache/kafka/pull/11216#discussion_r689870784 ## File path: metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java ## @@ -126,6 +133,158 @@ private static TopicImage newTopicImage(String name, Uuid id, PartitionRegistrat IMAGE2 = new TopicsImage(newTopicsByIdMap(topics2), newTopicsByNameMap(topics2)); } +private PartitionRegistration newPartition(int[] replicas) { +return new PartitionRegistration(replicas, replicas, Replicas.NONE, Replicas.NONE, replicas[0], 1, 1); +} + +@Test +public void testLocalReplicaChanges() { +int localId = 3; +Uuid newFooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w"); + +List topics = new ArrayList<>(TOPIC_IMAGES1); +topics.add( +newTopicImage( +"foo", +newFooId, +newPartition(new int[] {0, 1, 3}), +newPartition(new int[] {3, 1, 2}), +newPartition(new int[] {0, 1, 3}), +newPartition(new int[] {3, 1, 2}), +newPartition(new int[] {0, 1, 2}), +newPartition(new int[] {0, 1, 2}) +) +); + +TopicsImage image = new TopicsImage(newTopicsByIdMap(topics), newTopicsByNameMap(topics)); + +List topicRecords = new ArrayList<>(DELTA1_RECORDS); +// foo-0 - follower to leader +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(0) + .setLeader(3), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-1 - leader to follower +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(1) + .setLeader(1), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-2 - follower to removed +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(2) + .setIsr(Arrays.asList(0, 1, 2)) + .setReplicas(Arrays.asList(0, 1, 2)), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-3 - leader to removed +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(3) + .setLeader(0) + .setIsr(Arrays.asList(0, 1, 2)) + .setReplicas(Arrays.asList(0, 1, 2)), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-4 - not replica to leader +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(4) + .setLeader(3) + .setIsr(Arrays.asList(3, 1, 2)) + .setReplicas(Arrays.asList(3, 1, 2)), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-5 - not replica to follower +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(5) + .setIsr(Arrays.asList(0, 1, 3)) + .setReplicas(Arrays.asList(0, 1, 3)), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); + +/* Changes already include in DELTA1_RECORDS: + * foo - topic id deleted + * bar-0 - stay as follower with different partition epoch + * baz-0 - new topic to leader + */ + +// baz-1 - new topic to follower +topicRecords.add( +new ApiMessageAndVersion( +new PartitionRecord() +.setPartitionId(1) +.setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg")) +.setReplicas(Arrays.asList(4, 2, 3)) +.setIsr(Arrays.asList(4, 2, 3)) +.setLeader(4) +.setLeaderEpoch(2) +.setPartitionEpoch(1), +PARTITION_RECORD.highestSupportedVersion() +) +); + +TopicsDelta delta = new TopicsDelta(image); +RecordTestUtils.replayAll(delta, topicRecords); + +LocalReplicaChanges changes = delta.localChanges(localId); +assertEquals( +new HashSet<>( +
[jira] [Commented] (KAFKA-12713) Report "REAL" follower/consumer fetch latency
[ https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17400039#comment-17400039 ] Ismael Juma commented on KAFKA-12713: - I had missed that. Worth bumping that thread for feedback. > Report "REAL" follower/consumer fetch latency > - > > Key: KAFKA-12713 > URL: https://issues.apache.org/jira/browse/KAFKA-12713 > Project: Kafka > Issue Type: Bug >Reporter: Ming Liu >Assignee: Kai Huang >Priority: Major > > The fetch latency is an important metrics to monitor for the cluster > performance. With ACK=ALL, the produce latency is affected primarily by > broker fetch latency. > However, currently the reported fetch latency didn't reflect the true fetch > latency because it sometimes need to stay in purgatory and wait for > replica.fetch.wait.max.ms when data is not available. This greatly affect the > real P50, P99 etc. > I like to propose a KIP to be able track the real fetch latency for both > broker follower and consumer. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12713) Report "REAL" follower/consumer fetch latency
[ https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17400037#comment-17400037 ] Kai Huang commented on KAFKA-12713: --- [~ijuma] here is the [discussion thread|https://lists.apache.org/thread.html/r261915b64c819129bc6017adaa12e8f9a0feb74c24ba331f4a08f30c%40%3Cdev.kafka.apache.org%3E] that Ming created a while ago. > Report "REAL" follower/consumer fetch latency > - > > Key: KAFKA-12713 > URL: https://issues.apache.org/jira/browse/KAFKA-12713 > Project: Kafka > Issue Type: Bug >Reporter: Ming Liu >Assignee: Kai Huang >Priority: Major > > The fetch latency is an important metrics to monitor for the cluster > performance. With ACK=ALL, the produce latency is affected primarily by > broker fetch latency. > However, currently the reported fetch latency didn't reflect the true fetch > latency because it sometimes need to stay in purgatory and wait for > replica.fetch.wait.max.ms when data is not available. This greatly affect the > real P50, P99 etc. > I like to propose a KIP to be able track the real fetch latency for both > broker follower and consumer. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned
junrao commented on a change in pull request #11216: URL: https://github.com/apache/kafka/pull/11216#discussion_r689907759 ## File path: metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java ## @@ -126,6 +133,158 @@ private static TopicImage newTopicImage(String name, Uuid id, PartitionRegistrat IMAGE2 = new TopicsImage(newTopicsByIdMap(topics2), newTopicsByNameMap(topics2)); } +private PartitionRegistration newPartition(int[] replicas) { +return new PartitionRegistration(replicas, replicas, Replicas.NONE, Replicas.NONE, replicas[0], 1, 1); +} + +@Test +public void testLocalReplicaChanges() { +int localId = 3; +Uuid newFooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w"); + +List topics = new ArrayList<>(TOPIC_IMAGES1); +topics.add( +newTopicImage( +"foo", +newFooId, +newPartition(new int[] {0, 1, 3}), +newPartition(new int[] {3, 1, 2}), +newPartition(new int[] {0, 1, 3}), +newPartition(new int[] {3, 1, 2}), +newPartition(new int[] {0, 1, 2}), +newPartition(new int[] {0, 1, 2}) +) +); + +TopicsImage image = new TopicsImage(newTopicsByIdMap(topics), newTopicsByNameMap(topics)); + +List topicRecords = new ArrayList<>(DELTA1_RECORDS); +// foo-0 - follower to leader +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(0) + .setLeader(3), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-1 - leader to follower +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(1) + .setLeader(1), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-2 - follower to removed +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(2) + .setIsr(Arrays.asList(0, 1, 2)) + .setReplicas(Arrays.asList(0, 1, 2)), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-3 - leader to removed +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(3) + .setLeader(0) + .setIsr(Arrays.asList(0, 1, 2)) + .setReplicas(Arrays.asList(0, 1, 2)), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-4 - not replica to leader +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(4) + .setLeader(3) + .setIsr(Arrays.asList(3, 1, 2)) + .setReplicas(Arrays.asList(3, 1, 2)), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-5 - not replica to follower +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(5) + .setIsr(Arrays.asList(0, 1, 3)) + .setReplicas(Arrays.asList(0, 1, 3)), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); + +/* Changes already include in DELTA1_RECORDS: + * foo - topic id deleted + * bar-0 - stay as follower with different partition epoch + * baz-0 - new topic to leader + */ + +// baz-1 - new topic to follower +topicRecords.add( +new ApiMessageAndVersion( +new PartitionRecord() +.setPartitionId(1) +.setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg")) +.setReplicas(Arrays.asList(4, 2, 3)) +.setIsr(Arrays.asList(4, 2, 3)) +.setLeader(4) +.setLeaderEpoch(2) +.setPartitionEpoch(1), +PARTITION_RECORD.highestSupportedVersion() +) +); + +TopicsDelta delta = new TopicsDelta(image); +RecordTestUtils.replayAll(delta, topicRecords); + +LocalReplicaChanges changes = delta.localChanges(localId); +assertEquals( +new HashSet<>( +
[GitHub] [kafka] hachikuji commented on a change in pull request #11221: KAFKA-13207: Don't partition state on fetch response with diverging epoch if partition removed from fetcher
hachikuji commented on a change in pull request #11221: URL: https://github.com/apache/kafka/pull/11221#discussion_r689886267 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -268,7 +262,10 @@ abstract class AbstractFetcherThread(name: String, val fetchOffsets = mutable.HashMap.empty[TopicPartition, OffsetTruncationState] val partitionsWithError = mutable.HashSet.empty[TopicPartition] -fetchedEpochs.forKeyValue { (tp, leaderEpochOffset) => +// Partitions may have been removed from the fetcher while the thread was waiting for fetch +// response. Filter out removed partitions while holding `partitionMapLock` to ensure that we +// don't update state for any partition that may have already been migrated to another thread. +fetchedEpochs.filter { case (tp, _) => partitionStates.contains(tp) }.forKeyValue { (tp, leaderEpochOffset) => Review comment: Related to David's comment, but I think the `filter` here still builds a collection. Alternatively, we could move the check into `forKeyValue`. Perhaps it would even be useful having a `trace` level log message in the `else` case when we ignore the result. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned
jsancio commented on a change in pull request #11216: URL: https://github.com/apache/kafka/pull/11216#discussion_r689874304 ## File path: metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java ## @@ -126,6 +133,158 @@ private static TopicImage newTopicImage(String name, Uuid id, PartitionRegistrat IMAGE2 = new TopicsImage(newTopicsByIdMap(topics2), newTopicsByNameMap(topics2)); } +private PartitionRegistration newPartition(int[] replicas) { +return new PartitionRegistration(replicas, replicas, Replicas.NONE, Replicas.NONE, replicas[0], 1, 1); +} + +@Test +public void testLocalReplicaChanges() { +int localId = 3; +Uuid newFooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w"); + +List topics = new ArrayList<>(TOPIC_IMAGES1); +topics.add( +newTopicImage( +"foo", +newFooId, +newPartition(new int[] {0, 1, 3}), +newPartition(new int[] {3, 1, 2}), +newPartition(new int[] {0, 1, 3}), +newPartition(new int[] {3, 1, 2}), +newPartition(new int[] {0, 1, 2}), +newPartition(new int[] {0, 1, 2}) +) +); + +TopicsImage image = new TopicsImage(newTopicsByIdMap(topics), newTopicsByNameMap(topics)); + +List topicRecords = new ArrayList<>(DELTA1_RECORDS); +// foo-0 - follower to leader +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(0) + .setLeader(3), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-1 - leader to follower +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(1) + .setLeader(1), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-2 - follower to removed +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(2) + .setIsr(Arrays.asList(0, 1, 2)) + .setReplicas(Arrays.asList(0, 1, 2)), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-3 - leader to removed +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(3) + .setLeader(0) + .setIsr(Arrays.asList(0, 1, 2)) + .setReplicas(Arrays.asList(0, 1, 2)), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-4 - not replica to leader +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(4) + .setLeader(3) + .setIsr(Arrays.asList(3, 1, 2)) + .setReplicas(Arrays.asList(3, 1, 2)), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-5 - not replica to follower +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(5) + .setIsr(Arrays.asList(0, 1, 3)) + .setReplicas(Arrays.asList(0, 1, 3)), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); + +/* Changes already include in DELTA1_RECORDS: + * foo - topic id deleted + * bar-0 - stay as follower with different partition epoch + * baz-0 - new topic to leader + */ + +// baz-1 - new topic to follower +topicRecords.add( +new ApiMessageAndVersion( +new PartitionRecord() +.setPartitionId(1) +.setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg")) +.setReplicas(Arrays.asList(4, 2, 3)) +.setIsr(Arrays.asList(4, 2, 3)) +.setLeader(4) +.setLeaderEpoch(2) +.setPartitionEpoch(1), +PARTITION_RECORD.highestSupportedVersion() +) +); + +TopicsDelta delta = new TopicsDelta(image); +RecordTestUtils.replayAll(delta, topicRecords); + +LocalReplicaChanges changes = delta.localChanges(localId); +assertEquals( +new HashSet<>( +
[GitHub] [kafka] jsancio commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned
jsancio commented on a change in pull request #11216: URL: https://github.com/apache/kafka/pull/11216#discussion_r689871542 ## File path: metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java ## @@ -126,6 +133,158 @@ private static TopicImage newTopicImage(String name, Uuid id, PartitionRegistrat IMAGE2 = new TopicsImage(newTopicsByIdMap(topics2), newTopicsByNameMap(topics2)); } +private PartitionRegistration newPartition(int[] replicas) { +return new PartitionRegistration(replicas, replicas, Replicas.NONE, Replicas.NONE, replicas[0], 1, 1); +} + +@Test +public void testLocalReplicaChanges() { +int localId = 3; +Uuid newFooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w"); + +List topics = new ArrayList<>(TOPIC_IMAGES1); +topics.add( +newTopicImage( +"foo", +newFooId, +newPartition(new int[] {0, 1, 3}), +newPartition(new int[] {3, 1, 2}), +newPartition(new int[] {0, 1, 3}), +newPartition(new int[] {3, 1, 2}), +newPartition(new int[] {0, 1, 2}), +newPartition(new int[] {0, 1, 2}) +) +); + +TopicsImage image = new TopicsImage(newTopicsByIdMap(topics), newTopicsByNameMap(topics)); + +List topicRecords = new ArrayList<>(DELTA1_RECORDS); +// foo-0 - follower to leader +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(0) + .setLeader(3), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-1 - leader to follower +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(1) + .setLeader(1), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-2 - follower to removed +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(2) + .setIsr(Arrays.asList(0, 1, 2)) + .setReplicas(Arrays.asList(0, 1, 2)), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-3 - leader to removed +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(3) + .setLeader(0) + .setIsr(Arrays.asList(0, 1, 2)) + .setReplicas(Arrays.asList(0, 1, 2)), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-4 - not replica to leader +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(4) + .setLeader(3) + .setIsr(Arrays.asList(3, 1, 2)) + .setReplicas(Arrays.asList(3, 1, 2)), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-5 - not replica to follower +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(5) + .setIsr(Arrays.asList(0, 1, 3)) + .setReplicas(Arrays.asList(0, 1, 3)), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); + +/* Changes already include in DELTA1_RECORDS: + * foo - topic id deleted + * bar-0 - stay as follower with different partition epoch + * baz-0 - new topic to leader + */ + +// baz-1 - new topic to follower +topicRecords.add( +new ApiMessageAndVersion( +new PartitionRecord() +.setPartitionId(1) +.setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg")) +.setReplicas(Arrays.asList(4, 2, 3)) +.setIsr(Arrays.asList(4, 2, 3)) +.setLeader(4) +.setLeaderEpoch(2) +.setPartitionEpoch(1), +PARTITION_RECORD.highestSupportedVersion() +) +); + +TopicsDelta delta = new TopicsDelta(image); +RecordTestUtils.replayAll(delta, topicRecords); + +LocalReplicaChanges changes = delta.localChanges(localId); +assertEquals( +new HashSet<>( +
[GitHub] [kafka] jsancio commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned
jsancio commented on a change in pull request #11216: URL: https://github.com/apache/kafka/pull/11216#discussion_r689870784 ## File path: metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java ## @@ -126,6 +133,158 @@ private static TopicImage newTopicImage(String name, Uuid id, PartitionRegistrat IMAGE2 = new TopicsImage(newTopicsByIdMap(topics2), newTopicsByNameMap(topics2)); } +private PartitionRegistration newPartition(int[] replicas) { +return new PartitionRegistration(replicas, replicas, Replicas.NONE, Replicas.NONE, replicas[0], 1, 1); +} + +@Test +public void testLocalReplicaChanges() { +int localId = 3; +Uuid newFooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w"); + +List topics = new ArrayList<>(TOPIC_IMAGES1); +topics.add( +newTopicImage( +"foo", +newFooId, +newPartition(new int[] {0, 1, 3}), +newPartition(new int[] {3, 1, 2}), +newPartition(new int[] {0, 1, 3}), +newPartition(new int[] {3, 1, 2}), +newPartition(new int[] {0, 1, 2}), +newPartition(new int[] {0, 1, 2}) +) +); + +TopicsImage image = new TopicsImage(newTopicsByIdMap(topics), newTopicsByNameMap(topics)); + +List topicRecords = new ArrayList<>(DELTA1_RECORDS); +// foo-0 - follower to leader +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(0) + .setLeader(3), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-1 - leader to follower +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(1) + .setLeader(1), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-2 - follower to removed +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(2) + .setIsr(Arrays.asList(0, 1, 2)) + .setReplicas(Arrays.asList(0, 1, 2)), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-3 - leader to removed +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(3) + .setLeader(0) + .setIsr(Arrays.asList(0, 1, 2)) + .setReplicas(Arrays.asList(0, 1, 2)), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-4 - not replica to leader +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(4) + .setLeader(3) + .setIsr(Arrays.asList(3, 1, 2)) + .setReplicas(Arrays.asList(3, 1, 2)), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-5 - not replica to follower +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(5) + .setIsr(Arrays.asList(0, 1, 3)) + .setReplicas(Arrays.asList(0, 1, 3)), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); + +/* Changes already include in DELTA1_RECORDS: + * foo - topic id deleted + * bar-0 - stay as follower with different partition epoch + * baz-0 - new topic to leader + */ + +// baz-1 - new topic to follower +topicRecords.add( +new ApiMessageAndVersion( +new PartitionRecord() +.setPartitionId(1) +.setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg")) +.setReplicas(Arrays.asList(4, 2, 3)) +.setIsr(Arrays.asList(4, 2, 3)) +.setLeader(4) +.setLeaderEpoch(2) +.setPartitionEpoch(1), +PARTITION_RECORD.highestSupportedVersion() +) +); + +TopicsDelta delta = new TopicsDelta(image); +RecordTestUtils.replayAll(delta, topicRecords); + +LocalReplicaChanges changes = delta.localChanges(localId); +assertEquals( +new HashSet<>( +
[GitHub] [kafka] rajinisivaram commented on a change in pull request #11221: KAFKA-13207: Don't partition state on fetch response with diverging epoch if partition removed from fetcher
rajinisivaram commented on a change in pull request #11221: URL: https://github.com/apache/kafka/pull/11221#discussion_r689860826 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -229,9 +229,16 @@ abstract class AbstractFetcherThread(name: String, } } - protected def truncateOnFetchResponse(epochEndOffsets: Map[TopicPartition, EpochEndOffset]): Unit = { + // Visibility for unit tests + protected[server] def truncateOnFetchResponse(epochEndOffsets: Map[TopicPartition, EpochEndOffset]): Unit = { inLock(partitionMapLock) { - val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncateToEpochEndOffsets(epochEndOffsets, Map.empty) + // Partitions may have been removed from the fetcher while the thread was waiting for fetch + // response. Filter out removed partitions while holding `partitionMapLock` to ensure that we + // don't update state for any partition that may have already been migrated to another thread. + val filteredEpochEndOffsets = epochEndOffsets.filter { case (tp, _) => +partitionStates.contains(tp) + } Review comment: @dajac Thanks for the review, updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned
junrao commented on a change in pull request #11216: URL: https://github.com/apache/kafka/pull/11216#discussion_r689841279 ## File path: metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java ## @@ -126,6 +133,158 @@ private static TopicImage newTopicImage(String name, Uuid id, PartitionRegistrat IMAGE2 = new TopicsImage(newTopicsByIdMap(topics2), newTopicsByNameMap(topics2)); } +private PartitionRegistration newPartition(int[] replicas) { +return new PartitionRegistration(replicas, replicas, Replicas.NONE, Replicas.NONE, replicas[0], 1, 1); +} + +@Test +public void testLocalReplicaChanges() { +int localId = 3; +Uuid newFooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w"); + +List topics = new ArrayList<>(TOPIC_IMAGES1); +topics.add( +newTopicImage( +"foo", +newFooId, +newPartition(new int[] {0, 1, 3}), +newPartition(new int[] {3, 1, 2}), +newPartition(new int[] {0, 1, 3}), +newPartition(new int[] {3, 1, 2}), +newPartition(new int[] {0, 1, 2}), +newPartition(new int[] {0, 1, 2}) +) +); + +TopicsImage image = new TopicsImage(newTopicsByIdMap(topics), newTopicsByNameMap(topics)); + +List topicRecords = new ArrayList<>(DELTA1_RECORDS); +// foo-0 - follower to leader +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(0) + .setLeader(3), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-1 - leader to follower +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(1) + .setLeader(1), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-2 - follower to removed +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(2) + .setIsr(Arrays.asList(0, 1, 2)) + .setReplicas(Arrays.asList(0, 1, 2)), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-3 - leader to removed +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(3) + .setLeader(0) + .setIsr(Arrays.asList(0, 1, 2)) + .setReplicas(Arrays.asList(0, 1, 2)), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-4 - not replica to leader +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(4) + .setLeader(3) + .setIsr(Arrays.asList(3, 1, 2)) + .setReplicas(Arrays.asList(3, 1, 2)), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); +// foo-5 - not replica to follower +topicRecords.add( +new ApiMessageAndVersion( +new PartitionChangeRecord() + .setTopicId(newFooId) + .setPartitionId(5) + .setIsr(Arrays.asList(0, 1, 3)) + .setReplicas(Arrays.asList(0, 1, 3)), +PARTITION_CHANGE_RECORD.highestSupportedVersion() +) +); + +/* Changes already include in DELTA1_RECORDS: + * foo - topic id deleted + * bar-0 - stay as follower with different partition epoch + * baz-0 - new topic to leader + */ + +// baz-1 - new topic to follower +topicRecords.add( +new ApiMessageAndVersion( +new PartitionRecord() +.setPartitionId(1) +.setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg")) +.setReplicas(Arrays.asList(4, 2, 3)) +.setIsr(Arrays.asList(4, 2, 3)) +.setLeader(4) +.setLeaderEpoch(2) +.setPartitionEpoch(1), +PARTITION_RECORD.highestSupportedVersion() +) +); + +TopicsDelta delta = new TopicsDelta(image); +RecordTestUtils.replayAll(delta, topicRecords); + +LocalReplicaChanges changes = delta.localChanges(localId); +assertEquals( +new HashSet<>( +
[jira] [Commented] (KAFKA-12713) Report "REAL" follower/consumer fetch latency
[ https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1730#comment-1730 ] Ismael Juma commented on KAFKA-12713: - [~kaihuang] Thanks for the KIP. I suggest starting a discuss thread for KIP-736 and we can discuss it further over there. :) > Report "REAL" follower/consumer fetch latency > - > > Key: KAFKA-12713 > URL: https://issues.apache.org/jira/browse/KAFKA-12713 > Project: Kafka > Issue Type: Bug >Reporter: Ming Liu >Assignee: Kai Huang >Priority: Major > > The fetch latency is an important metrics to monitor for the cluster > performance. With ACK=ALL, the produce latency is affected primarily by > broker fetch latency. > However, currently the reported fetch latency didn't reflect the true fetch > latency because it sometimes need to stay in purgatory and wait for > replica.fetch.wait.max.ms when data is not available. This greatly affect the > real P50, P99 etc. > I like to propose a KIP to be able track the real fetch latency for both > broker follower and consumer. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13206) shutting down broker needs to stop fetching as a follower in KRaft mode
[ https://issues.apache.org/jira/browse/KAFKA-13206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17399978#comment-17399978 ] Jun Rao commented on KAFKA-13206: - It's probably after the controller has acted on the controlled shutdown request. > shutting down broker needs to stop fetching as a follower in KRaft mode > --- > > Key: KAFKA-13206 > URL: https://issues.apache.org/jira/browse/KAFKA-13206 > Project: Kafka > Issue Type: Bug > Components: core, kraft, replication >Affects Versions: 3.0.0 >Reporter: Jun Rao >Priority: Major > Labels: kip-500 > > In the ZK mode, the controller will send a stopReplica(with deletion flag as > false) request to the shutting down broker so that it will stop the followers > from fetching. In KRaft mode, we don't have a corresponding logic. This means > unnecessary rejected fetch follower requests during controlled shutdown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13206) shutting down broker needs to stop fetching as a follower in KRaft mode
[ https://issues.apache.org/jira/browse/KAFKA-13206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio updated KAFKA-13206: --- Component/s: replication kraft > shutting down broker needs to stop fetching as a follower in KRaft mode > --- > > Key: KAFKA-13206 > URL: https://issues.apache.org/jira/browse/KAFKA-13206 > Project: Kafka > Issue Type: Bug > Components: core, kraft, replication >Affects Versions: 3.0.0 >Reporter: Jun Rao >Priority: Major > Labels: kip-500 > > In the ZK mode, the controller will send a stopReplica(with deletion flag as > false) request to the shutting down broker so that it will stop the followers > from fetching. In KRaft mode, we don't have a corresponding logic. This means > unnecessary rejected fetch follower requests during controlled shutdown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13206) shutting down broker needs to stop fetching as a follower in KRaft mode
[ https://issues.apache.org/jira/browse/KAFKA-13206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17399976#comment-17399976 ] Jose Armando Garcia Sancio commented on KAFKA-13206: Thanks for the issue [~junrao] . When is it safe to "stop" this replica without deleting? Is it when the {{ReplicaManager}} is {{isShuttingDown}} and the replica is a follower? > shutting down broker needs to stop fetching as a follower in KRaft mode > --- > > Key: KAFKA-13206 > URL: https://issues.apache.org/jira/browse/KAFKA-13206 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.0.0 >Reporter: Jun Rao >Priority: Major > Labels: kip-500 > > In the ZK mode, the controller will send a stopReplica(with deletion flag as > false) request to the shutting down broker so that it will stop the followers > from fetching. In KRaft mode, we don't have a corresponding logic. This means > unnecessary rejected fetch follower requests during controlled shutdown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rondagostino opened a new pull request #11222: MINOR: Test ReplicaManager MBean names
rondagostino opened a new pull request #11222: URL: https://github.com/apache/kafka/pull/11222 This patch closes a testing gap by confirming that ReplicaManager metrics are exposed with the expected MBean names. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned
jsancio commented on a change in pull request #11216: URL: https://github.com/apache/kafka/pull/11216#discussion_r689787245 ## File path: metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java ## @@ -162,4 +163,39 @@ public boolean topicWasDeleted(String topicName) { public Set deletedTopicIds() { return deletedTopicIds; } + +/** + * Find the topic partitions that have change base on the replica given. + * + * The changes identified are: + * 1. topic partitions for which the broker is not a replica anymore + * 2. topic partitions for which the broker is now the leader + * 3. topic partitions for which the broker is now a follower + * + * @param brokerId the broker id + * @return the list of topic partitions which the broker should remove, become leader or become follower. + */ +public LocalReplicaChanges localChanges(int brokerId) { +Set deletes = new HashSet<>(); +Map leaders = new HashMap<>(); +Map followers = new HashMap<>(); + +for (TopicDelta delta : changedTopics.values()) { +LocalReplicaChanges changes = delta.localChanges(brokerId); + +deletes.addAll(changes.deletes()); +leaders.putAll(changes.leaders()); +followers.putAll(changes.followers()); +} + +// Add all of the deleted topic partitions to the map of locally removed partitions +deletedTopicIds().forEach(topicId -> { +TopicImage topicImage = image().getTopic(topicId); +topicImage.partitions().keySet().forEach(partitionId -> { +deletes.add(new TopicPartition(topicImage.name(), partitionId)); Review comment: Yes. Fixed and updated the test in `TopicsImageTest` to check for this case. ## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ## @@ -3020,34 +2949,183 @@ class ReplicaManagerTest { TestUtils.assertNoNonDaemonThreads(this.getClass.getName) } + @Test + def testDeltaFollowerToNotReplica(): Unit = { +val localId = 1 +val otherId = localId + 1 +val topicPartition = new TopicPartition("foo", 0) +val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId) + +try { + // Make the local replica the follower + val followerTopicsDelta = topicsCreateDelta(localId, false) + val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) + replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta) + + // Check the state of that partition and fetcher + val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition) + assertFalse(followerPartition.isLeader) + assertEquals(0, followerPartition.getLeaderEpoch) + + val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition) + assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.sourceBroker)) + + // Apply changes that remove replica + val notReplicaTopicsDelta = topicsChangeDelta(followerMetadataImage.topics(), otherId, true) + val notReplicaMetadataImage = imageFromTopics(notReplicaTopicsDelta.apply()) + replicaManager.applyDelta(notReplicaMetadataImage, notReplicaTopicsDelta) + + // Check that the partition was removed + assertEquals(HostedPartition.None, replicaManager.getPartition(topicPartition)) + assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition)) + assertEquals(None, replicaManager.logManager.getLog(topicPartition)) +} finally { + replicaManager.shutdown() +} + +TestUtils.assertNoNonDaemonThreads(this.getClass.getName) + } + + @Test + def testDeltaFollowerRemovedTopic(): Unit = { +val localId = 1 +val otherId = localId + 1 +val topicPartition = new TopicPartition("foo", 0) +val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId) + +try { + // Make the local replica the follower + val followerTopicsDelta = topicsCreateDelta(localId, false) + val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) + replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta) + + // Check the state of that partition and fetcher + val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition) + assertFalse(followerPartition.isLeader) + assertEquals(0, followerPartition.getLeaderEpoch) + + val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition) + assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.sourceBroker)) + + // Apply changes that remove topic and replica + val removeTopicsDelta = topicsDeleteDelta(followerMetadataImage.topics()) + val removeMetadataImage =
[GitHub] [kafka] cmccabe merged pull request #11219: MINOR: clarify assertion in handleListPartitionReassignmentsRequest
cmccabe merged pull request #11219: URL: https://github.com/apache/kafka/pull/11219 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-12713) Report "REAL" follower/consumer fetch latency
[ https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17399940#comment-17399940 ] Kai Huang edited comment on KAFKA-12713 at 8/16/21, 6:48 PM: - [~ijuma] I would like to follow up on this ticket, and continue the discussion. I replied to the [discussion|https://lists.apache.org/thread.html/r7f82dde9133bf9d3a8b688ca7ae02ad761c52e7b79212c9247b276c5%40%3Cdev.kafka.apache.org%3E] thread and illustrate how the fetch latency metric should work in [KIP-736|https://cwiki.apache.org/confluence/display/KAFKA/KIP-736%3A+Report+the+true+end+to+end+fetch+latency]. Could you please take a look and see if that clarifies your question? was (Author: kaihuang): [~ijuma] I would like to follow up on this ticket, and continue the discussion. I replied to your [discussion|https://lists.apache.org/thread.html/r7f82dde9133bf9d3a8b688ca7ae02ad761c52e7b79212c9247b276c5%40%3Cdev.kafka.apache.org%3E] thread and illustrate how the fetch latency metric should work in [KIP-736|https://cwiki.apache.org/confluence/display/KAFKA/KIP-736%3A+Report+the+true+end+to+end+fetch+latency]. Could you please take a look and see if that clarifies your question? > Report "REAL" follower/consumer fetch latency > - > > Key: KAFKA-12713 > URL: https://issues.apache.org/jira/browse/KAFKA-12713 > Project: Kafka > Issue Type: Bug >Reporter: Ming Liu >Assignee: Kai Huang >Priority: Major > > The fetch latency is an important metrics to monitor for the cluster > performance. With ACK=ALL, the produce latency is affected primarily by > broker fetch latency. > However, currently the reported fetch latency didn't reflect the true fetch > latency because it sometimes need to stay in purgatory and wait for > replica.fetch.wait.max.ms when data is not available. This greatly affect the > real P50, P99 etc. > I like to propose a KIP to be able track the real fetch latency for both > broker follower and consumer. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12713) Report "REAL" follower/consumer fetch latency
[ https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17399940#comment-17399940 ] Kai Huang commented on KAFKA-12713: --- [~ijuma] I would like to follow up on this ticket, and continue the discussion. I replied to your [discussion|https://lists.apache.org/thread.html/r7f82dde9133bf9d3a8b688ca7ae02ad761c52e7b79212c9247b276c5%40%3Cdev.kafka.apache.org%3E] thread and illustrate how the fetch latency metric should work in [KIP-736|https://cwiki.apache.org/confluence/display/KAFKA/KIP-736%3A+Report+the+true+end+to+end+fetch+latency]. Could you please take a look and see if that clarifies your question? > Report "REAL" follower/consumer fetch latency > - > > Key: KAFKA-12713 > URL: https://issues.apache.org/jira/browse/KAFKA-12713 > Project: Kafka > Issue Type: Bug >Reporter: Ming Liu >Assignee: Kai Huang >Priority: Major > > The fetch latency is an important metrics to monitor for the cluster > performance. With ACK=ALL, the produce latency is affected primarily by > broker fetch latency. > However, currently the reported fetch latency didn't reflect the true fetch > latency because it sometimes need to stay in purgatory and wait for > replica.fetch.wait.max.ms when data is not available. This greatly affect the > real P50, P99 etc. > I like to propose a KIP to be able track the real fetch latency for both > broker follower and consumer. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on a change in pull request #11221: KAFKA-13207: Don't partition state on fetch response with diverging epoch if partition removed from fetcher
dajac commented on a change in pull request #11221: URL: https://github.com/apache/kafka/pull/11221#discussion_r689770112 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -229,9 +229,16 @@ abstract class AbstractFetcherThread(name: String, } } - protected def truncateOnFetchResponse(epochEndOffsets: Map[TopicPartition, EpochEndOffset]): Unit = { + // Visibility for unit tests + protected[server] def truncateOnFetchResponse(epochEndOffsets: Map[TopicPartition, EpochEndOffset]): Unit = { inLock(partitionMapLock) { - val ResultWithPartitions(fetchOffsets, partitionsWithError) = maybeTruncateToEpochEndOffsets(epochEndOffsets, Map.empty) + // Partitions may have been removed from the fetcher while the thread was waiting for fetch + // response. Filter out removed partitions while holding `partitionMapLock` to ensure that we + // don't update state for any partition that may have already been migrated to another thread. + val filteredEpochEndOffsets = epochEndOffsets.filter { case (tp, _) => +partitionStates.contains(tp) + } Review comment: Did you consider pushing this down into `maybeTruncateToEpochEndOffsets`? This would avoid creating an extra collection here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13195) StateSerde don't honor DeserializationExceptionHandler
[ https://issues.apache.org/jira/browse/KAFKA-13195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17399913#comment-17399913 ] Ludo commented on KAFKA-13195: -- Thanks for all the clarification, really appreciated. I will go to the Transformer for now (I seen that I have power here later today), just need to apply everywhere (since I have a lot of Transformer in my app) > StateSerde don't honor DeserializationExceptionHandler > -- > > Key: KAFKA-13195 > URL: https://issues.apache.org/jira/browse/KAFKA-13195 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.8.0 >Reporter: Ludo >Priority: Major > > Kafka streams allow to configure an > [DeserializationExceptionHandler|https://docs.confluent.io/platform/current/streams/faq.html#failure-and-exception-handling] > > When you are using a StateStore most of message will be a copy of original > message in internal topic and mostly will use the same serializer if the > message is another type. > You can see > [here|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java#L159-L161] > that StateSerde is using the raw Deserializer and not honor the > {{StreamsConfig.}}{{DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG}}. > Leading to crash the application (reaching the > {{setUncaughtExceptionHandler}} method). > I think the state store must have the same behavior than the > {{RecordDeserializer}} and honor the DeserializationExceptionHandler. > > Stacktrace (coming from kafka stream 2.6.1) : > > {code:java} > Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing > !Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing > !org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=1_14, processor=workertaskjoined-repartition-source, > topic=kestra_executor-workertaskjoined-repartition, partition=14, > offset=167500, > stacktrace=org.apache.kafka.common.errors.SerializationException: > com.fasterxml.jackson.databind.exc.InvalidFormatException: Cannot deserialize > value of type `io.kestra.plugin.gcp.bigquery.ExtractToGcs$Format` from String > "txt": not one of the values accepted for Enum class: > [NEWLINE_DELIMITED_JSON, AVRO, PARQUET, CSV] at [Source: > (byte[])"{[truncated 1270 bytes]; line: 1, column: 1187] (through > reference chain: > io.kestra.core.models.flows.Flow["tasks"]->java.util.ArrayList[1]->org.kestra.task.gcp.bigquery.ExtractToGcs["format"]) > at > com.fasterxml.jackson.databind.exc.InvalidFormatException.from(InvalidFormatException.java:67) > at > com.fasterxml.jackson.databind.DeserializationContext.weirdStringException(DeserializationContext.java:1851) > at > com.fasterxml.jackson.databind.DeserializationContext.handleWeirdStringValue(DeserializationContext.java:1079) > at > com.fasterxml.jackson.databind.deser.std.EnumDeserializer._deserializeAltString(EnumDeserializer.java:327) > at > com.fasterxml.jackson.databind.deser.std.EnumDeserializer._fromString(EnumDeserializer.java:214) > at > com.fasterxml.jackson.databind.deser.std.EnumDeserializer.deserialize(EnumDeserializer.java:188) > at > com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:138) > at > com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:324) > at > com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:225) > at > com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:197) > at > com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:137) > at > com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:107) > at > com.fasterxml.jackson.databind.deser.AbstractDeserializer.deserializeWithType(AbstractDeserializer.java:263) > at > com.fasterxml.jackson.databind.deser.std.CollectionDeserializer._deserializeFromArray(CollectionDeserializer.java:357) > at > com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:244) > at > com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:28) > at > com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:542) > at > com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:565) > at > com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:449) > at >
[jira] [Commented] (KAFKA-13206) shutting down broker needs to stop fetching as a follower in KRaft mode
[ https://issues.apache.org/jira/browse/KAFKA-13206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17399908#comment-17399908 ] Jun Rao commented on KAFKA-13206: - [~cmccabe] mentioned the following. The broker being shut down in kraft controlled shutdown usually does continue trying to fetch, but that's because we shut down its metadata listener (that might be something we should not do, to optimize more...). So, as long as it doesn't degrade rolling performance noticeably, this might not be an issue. > shutting down broker needs to stop fetching as a follower in KRaft mode > --- > > Key: KAFKA-13206 > URL: https://issues.apache.org/jira/browse/KAFKA-13206 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.0.0 >Reporter: Jun Rao >Priority: Major > Labels: kip-500 > > In the ZK mode, the controller will send a stopReplica(with deletion flag as > false) request to the shutting down broker so that it will stop the followers > from fetching. In KRaft mode, we don't have a corresponding logic. This means > unnecessary rejected fetch follower requests during controlled shutdown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13195) StateSerde don't honor DeserializationExceptionHandler
[ https://issues.apache.org/jira/browse/KAFKA-13195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17399902#comment-17399902 ] Matthias J. Sax commented on KAFKA-13195: - Your transformer is in the stacktrace: {quote}io.kestra.runner.kafka.streams.FlowWithTriggerTransformer.transform(FlowWithTriggerTransformer.java:39) at {quote} Thus, you should be able to catch `StreamsException` when you call `store.get()/put()`, inspect if it's a `SerializationException` and react to it. If it's a different exception type, you would rethrow. {quote} * Reset the application is also not an option, since the deserialization will failed on deserialization on my case even with reprocessing (still no idea of what plugins have done).{quote} Why? For this case, you could use the deserialization handler to skip poison pills. {quote}In a real world, stream application must never crashed, and give the control to handle invalid data on state store (skip, dead letter queue, delete from state store ...) is a real need in my opinion. {quote} I agree in general, but I (personally) don't think that applying a handler to the state store is the right solution. We can keep this ticket open though, to see if there is demand for such a feature. > StateSerde don't honor DeserializationExceptionHandler > -- > > Key: KAFKA-13195 > URL: https://issues.apache.org/jira/browse/KAFKA-13195 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Ludo >Priority: Major > > Kafka streams allow to configure an > [DeserializationExceptionHandler|https://docs.confluent.io/platform/current/streams/faq.html#failure-and-exception-handling] > > When you are using a StateStore most of message will be a copy of original > message in internal topic and mostly will use the same serializer if the > message is another type. > You can see > [here|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java#L159-L161] > that StateSerde is using the raw Deserializer and not honor the > {{StreamsConfig.}}{{DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG}}. > Leading to crash the application (reaching the > {{setUncaughtExceptionHandler}} method). > I think the state store must have the same behavior than the > {{RecordDeserializer}} and honor the DeserializationExceptionHandler. > > Stacktrace (coming from kafka stream 2.6.1) : > > {code:java} > Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing > !Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing > !org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=1_14, processor=workertaskjoined-repartition-source, > topic=kestra_executor-workertaskjoined-repartition, partition=14, > offset=167500, > stacktrace=org.apache.kafka.common.errors.SerializationException: > com.fasterxml.jackson.databind.exc.InvalidFormatException: Cannot deserialize > value of type `io.kestra.plugin.gcp.bigquery.ExtractToGcs$Format` from String > "txt": not one of the values accepted for Enum class: > [NEWLINE_DELIMITED_JSON, AVRO, PARQUET, CSV] at [Source: > (byte[])"{[truncated 1270 bytes]; line: 1, column: 1187] (through > reference chain: > io.kestra.core.models.flows.Flow["tasks"]->java.util.ArrayList[1]->org.kestra.task.gcp.bigquery.ExtractToGcs["format"]) > at > com.fasterxml.jackson.databind.exc.InvalidFormatException.from(InvalidFormatException.java:67) > at > com.fasterxml.jackson.databind.DeserializationContext.weirdStringException(DeserializationContext.java:1851) > at > com.fasterxml.jackson.databind.DeserializationContext.handleWeirdStringValue(DeserializationContext.java:1079) > at > com.fasterxml.jackson.databind.deser.std.EnumDeserializer._deserializeAltString(EnumDeserializer.java:327) > at > com.fasterxml.jackson.databind.deser.std.EnumDeserializer._fromString(EnumDeserializer.java:214) > at > com.fasterxml.jackson.databind.deser.std.EnumDeserializer.deserialize(EnumDeserializer.java:188) > at > com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:138) > at > com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:324) > at > com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:225) > at > com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:197) > at > com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:137) > at > com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:107) >
[jira] [Updated] (KAFKA-13195) StateSerde don't honor DeserializationExceptionHandler
[ https://issues.apache.org/jira/browse/KAFKA-13195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-13195: Issue Type: Improvement (was: Bug) > StateSerde don't honor DeserializationExceptionHandler > -- > > Key: KAFKA-13195 > URL: https://issues.apache.org/jira/browse/KAFKA-13195 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.8.0 >Reporter: Ludo >Priority: Major > > Kafka streams allow to configure an > [DeserializationExceptionHandler|https://docs.confluent.io/platform/current/streams/faq.html#failure-and-exception-handling] > > When you are using a StateStore most of message will be a copy of original > message in internal topic and mostly will use the same serializer if the > message is another type. > You can see > [here|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java#L159-L161] > that StateSerde is using the raw Deserializer and not honor the > {{StreamsConfig.}}{{DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG}}. > Leading to crash the application (reaching the > {{setUncaughtExceptionHandler}} method). > I think the state store must have the same behavior than the > {{RecordDeserializer}} and honor the DeserializationExceptionHandler. > > Stacktrace (coming from kafka stream 2.6.1) : > > {code:java} > Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing > !Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing > !org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=1_14, processor=workertaskjoined-repartition-source, > topic=kestra_executor-workertaskjoined-repartition, partition=14, > offset=167500, > stacktrace=org.apache.kafka.common.errors.SerializationException: > com.fasterxml.jackson.databind.exc.InvalidFormatException: Cannot deserialize > value of type `io.kestra.plugin.gcp.bigquery.ExtractToGcs$Format` from String > "txt": not one of the values accepted for Enum class: > [NEWLINE_DELIMITED_JSON, AVRO, PARQUET, CSV] at [Source: > (byte[])"{[truncated 1270 bytes]; line: 1, column: 1187] (through > reference chain: > io.kestra.core.models.flows.Flow["tasks"]->java.util.ArrayList[1]->org.kestra.task.gcp.bigquery.ExtractToGcs["format"]) > at > com.fasterxml.jackson.databind.exc.InvalidFormatException.from(InvalidFormatException.java:67) > at > com.fasterxml.jackson.databind.DeserializationContext.weirdStringException(DeserializationContext.java:1851) > at > com.fasterxml.jackson.databind.DeserializationContext.handleWeirdStringValue(DeserializationContext.java:1079) > at > com.fasterxml.jackson.databind.deser.std.EnumDeserializer._deserializeAltString(EnumDeserializer.java:327) > at > com.fasterxml.jackson.databind.deser.std.EnumDeserializer._fromString(EnumDeserializer.java:214) > at > com.fasterxml.jackson.databind.deser.std.EnumDeserializer.deserialize(EnumDeserializer.java:188) > at > com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:138) > at > com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:324) > at > com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:225) > at > com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:197) > at > com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:137) > at > com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:107) > at > com.fasterxml.jackson.databind.deser.AbstractDeserializer.deserializeWithType(AbstractDeserializer.java:263) > at > com.fasterxml.jackson.databind.deser.std.CollectionDeserializer._deserializeFromArray(CollectionDeserializer.java:357) > at > com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:244) > at > com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:28) > at > com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:542) > at > com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:565) > at > com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:449) > at > com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1405) > at >
[GitHub] [kafka] rajinisivaram opened a new pull request #11221: KAFKA-13207: Don't partition state on fetch response with diverging epoch if partition removed from fetcher
rajinisivaram opened a new pull request #11221: URL: https://github.com/apache/kafka/pull/11221 `AbstractFetcherThread#truncateOnFetchResponse` is used with IBP 2.7 and above to truncate partitions based on diverging epoch returned in fetch responses. Truncation should only be performed for partitions that are still owned by the fetcher and this check should be done while holding `partitionMapLock` to ensure that any partitions removed from the fetcher thread are not truncated. The PR adds this check. Truncation will be performed by any new fetcher that owns the partition when it restarts fetching. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13207) Replica fetcher should not update partition state on diverging epoch if partition removed from fetcher
Rajini Sivaram created KAFKA-13207: -- Summary: Replica fetcher should not update partition state on diverging epoch if partition removed from fetcher Key: KAFKA-13207 URL: https://issues.apache.org/jira/browse/KAFKA-13207 Project: Kafka Issue Type: New Feature Components: core Affects Versions: 2.8.0 Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 3.0.0, 2.8.1 {{AbstractFetcherThread#truncateOnFetchResponse}}{color:#24292e} is used with IBP 2.7 and above to truncate partitions based on diverging epoch returned in fetch responses. Truncation should only be performed for partitions that are still owned by the fetcher and this check should be done while holding {color}{{partitionMapLock}}{color:#24292e} to ensure that any partitions removed from the fetcher thread are not truncated{color} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13206) shutting down broker needs to stop fetching as a follower in KRaft mode
[ https://issues.apache.org/jira/browse/KAFKA-13206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-13206: Labels: kip-500 (was: ) > shutting down broker needs to stop fetching as a follower in KRaft mode > --- > > Key: KAFKA-13206 > URL: https://issues.apache.org/jira/browse/KAFKA-13206 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.0.0 >Reporter: Jun Rao >Priority: Major > Labels: kip-500 > > In the ZK mode, the controller will send a stopReplica(with deletion flag as > false) request to the shutting down broker so that it will stop the followers > from fetching. In KRaft mode, we don't have a corresponding logic. This means > unnecessary rejected fetch follower requests during controlled shutdown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13206) shutting down broker needs to stop fetching as a follower in KRaft mode
Jun Rao created KAFKA-13206: --- Summary: shutting down broker needs to stop fetching as a follower in KRaft mode Key: KAFKA-13206 URL: https://issues.apache.org/jira/browse/KAFKA-13206 Project: Kafka Issue Type: Bug Components: core Affects Versions: 3.0.0 Reporter: Jun Rao In the ZK mode, the controller will send a stopReplica(with deletion flag as false) request to the shutting down broker so that it will stop the followers from fetching. In KRaft mode, we don't have a corresponding logic. This means unnecessary rejected fetch follower requests during controlled shutdown. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned
junrao commented on a change in pull request #11216: URL: https://github.com/apache/kafka/pull/11216#discussion_r689659905 ## File path: metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java ## @@ -162,4 +163,39 @@ public boolean topicWasDeleted(String topicName) { public Set deletedTopicIds() { return deletedTopicIds; } + +/** + * Find the topic partitions that have change base on the replica given. Review comment: base => based ## File path: metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java ## @@ -162,4 +163,39 @@ public boolean topicWasDeleted(String topicName) { public Set deletedTopicIds() { return deletedTopicIds; } + +/** + * Find the topic partitions that have change base on the replica given. + * + * The changes identified are: + * 1. topic partitions for which the broker is not a replica anymore + * 2. topic partitions for which the broker is now the leader + * 3. topic partitions for which the broker is now a follower + * + * @param brokerId the broker id + * @return the list of topic partitions which the broker should remove, become leader or become follower. + */ +public LocalReplicaChanges localChanges(int brokerId) { +Set deletes = new HashSet<>(); +Map leaders = new HashMap<>(); +Map followers = new HashMap<>(); + +for (TopicDelta delta : changedTopics.values()) { +LocalReplicaChanges changes = delta.localChanges(brokerId); + +deletes.addAll(changes.deletes()); +leaders.putAll(changes.leaders()); +followers.putAll(changes.followers()); +} + +// Add all of the deleted topic partitions to the map of locally removed partitions +deletedTopicIds().forEach(topicId -> { +TopicImage topicImage = image().getTopic(topicId); +topicImage.partitions().keySet().forEach(partitionId -> { +deletes.add(new TopicPartition(topicImage.name(), partitionId)); Review comment: Should we further check that the deleted partition has a replica on brokerId before adding it to deletes? ## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ## @@ -3020,34 +2949,183 @@ class ReplicaManagerTest { TestUtils.assertNoNonDaemonThreads(this.getClass.getName) } + @Test + def testDeltaFollowerToNotReplica(): Unit = { +val localId = 1 +val otherId = localId + 1 +val topicPartition = new TopicPartition("foo", 0) +val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId) + +try { + // Make the local replica the follower + val followerTopicsDelta = topicsCreateDelta(localId, false) + val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) + replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta) + + // Check the state of that partition and fetcher + val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition) + assertFalse(followerPartition.isLeader) + assertEquals(0, followerPartition.getLeaderEpoch) + + val fetcher = replicaManager.replicaFetcherManager.getFetcher(topicPartition) + assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), fetcher.map(_.sourceBroker)) + + // Apply changes that remove replica + val notReplicaTopicsDelta = topicsChangeDelta(followerMetadataImage.topics(), otherId, true) + val notReplicaMetadataImage = imageFromTopics(notReplicaTopicsDelta.apply()) + replicaManager.applyDelta(notReplicaMetadataImage, notReplicaTopicsDelta) + + // Check that the partition was removed + assertEquals(HostedPartition.None, replicaManager.getPartition(topicPartition)) + assertEquals(None, replicaManager.replicaFetcherManager.getFetcher(topicPartition)) + assertEquals(None, replicaManager.logManager.getLog(topicPartition)) +} finally { + replicaManager.shutdown() +} + +TestUtils.assertNoNonDaemonThreads(this.getClass.getName) + } + + @Test + def testDeltaFollowerRemovedTopic(): Unit = { +val localId = 1 +val otherId = localId + 1 +val topicPartition = new TopicPartition("foo", 0) +val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), localId) + +try { + // Make the local replica the follower + val followerTopicsDelta = topicsCreateDelta(localId, false) + val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) + replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta) + + // Check the state of that partition and fetcher + val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition) + assertFalse(followerPartition.isLeader) + assertEquals(0,
[GitHub] [kafka] OmniaGM opened a new pull request #11220: KAFKA-10777: Add additional configuration to control MirrorMaker 2 internal topics naming convention
OmniaGM opened a new pull request #11220: URL: https://github.com/apache/kafka/pull/11220 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #11219: MINOR: Forward list partition reassignments
rondagostino commented on pull request #11219: URL: https://github.com/apache/kafka/pull/11219#issuecomment-899637052 Actually, I see now that this is simply fixing the error message. LGTM as-is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xdgrulez commented on pull request #10897: MINOR: Reduced severity for "skipping records" falling out of time windows
xdgrulez commented on pull request #10897: URL: https://github.com/apache/kafka/pull/10897#issuecomment-899621818 Hi, I've fixed the tests now - on my local machine, the complete :streams:test suite is now working perfectly fine again :) Best regards, Ralph -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #11219: MINOR: Forward list partition reassignments
rondagostino commented on pull request #11219: URL: https://github.com/apache/kafka/pull/11219#issuecomment-899567795 Good catch. I wonder if this should be fixed for 3.0 -- i.e. should it be a blocker? Also, is there a way to close the testing gap that allowed this to exist -- maybe have `reassign_partitions_test.py` list the partitions reassignments at least one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10643) Static membership - repetitive PreparingRebalance with updating metadata for member reason
[ https://issues.apache.org/jira/browse/KAFKA-10643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17399752#comment-17399752 ] Eran Levy commented on KAFKA-10643: --- Actually I think that we never solved it but it reduced alot for some reason. I cant point my finger on any specific reason that resolved it. We have done some dns changes that might resolved a network issue but Im doubt that it was the issue. > Static membership - repetitive PreparingRebalance with updating metadata for > member reason > -- > > Key: KAFKA-10643 > URL: https://issues.apache.org/jira/browse/KAFKA-10643 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Eran Levy >Priority: Major > Attachments: broker-4-11.csv, client-4-11.csv, > client-d-9-11-11-2020.csv > > > Kafka streams 2.6.0, brokers version 2.6.0. Kafka nodes are healthy, kafka > streams app is healthy. > Configured with static membership. > Every 10 minutes (I assume cause of topic.metadata.refresh.interval.ms), I > see the following group coordinator log for different stream consumers: > INFO [GroupCoordinator 2]: Preparing to rebalance group **--**-stream in > state PreparingRebalance with old generation 12244 (__consumer_offsets-45) > (reason: Updating metadata for member > -stream-11-1-013edd56-ed93-4370-b07c-1c29fbe72c9a) > (kafka.coordinator.group.GroupCoordinator) > and right after that the following log: > INFO [GroupCoordinator 2]: Assignment received from leader for group > **-**-stream for generation 12246 (kafka.coordinator.group.GroupCoordinator) > > Looked a bit on the kafka code and Im not sure that I get why such a thing > happening - is this line described the situation that happens here re the > "reason:"?[https://github.com/apache/kafka/blob/7ca299b8c0f2f3256c40b694078e422350c20d19/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L311] > I also dont see it happening too often in other kafka streams applications > that we have. > The only thing suspicious that I see around every hour that different pods of > that kafka streams application throw this exception: > {"timestamp":"2020-10-25T06:44:20.414Z","level":"INFO","thread":"**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1","logger":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer > > clientId=**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1-restore-consumer, > groupId=null] Error sending fetch request (sessionId=34683236, epoch=2872) > to node > 3:","context":"default","exception":"org.apache.kafka.common.errors.DisconnectException: > null\n"} > I came across this strange behaviour after stated to investigate a strange > stuck rebalancing state after one of the members left the group and caused > the rebalance to stuck - the only thing that I found is that maybe because > that too often preparing to rebalance states, the app might affected of this > bug - KAFKA-9752 ? > I dont understand why it happens, it wasn't before I applied static > membership to that kafka streams application (since around 2 weeks ago). > Will be happy if you can help me > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dengziming commented on pull request #11219: MINOR: Forward list partition reassignments
dengziming commented on pull request #11219: URL: https://github.com/apache/kafka/pull/11219#issuecomment-899490789 ping @rondagostino @cmccabe -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming opened a new pull request #11219: MINOR: Forward list partition reassignments
dengziming opened a new pull request #11219: URL: https://github.com/apache/kafka/pull/11219 *More detailed description of your change* We already support list partition reassignments in KRAFT mode, so change `notYetSupported` to `shouldAlwaysForward` *Summary of testing strategy (including rationale)* Unit test -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13205) Clarify API specification of Kafka Connect endpoint
Jonathan Kaleve created KAFKA-13205: --- Summary: Clarify API specification of Kafka Connect endpoint Key: KAFKA-13205 URL: https://issues.apache.org/jira/browse/KAFKA-13205 Project: Kafka Issue Type: Improvement Components: KafkaConnect Affects Versions: 2.7.0 Reporter: Jonathan Kaleve Since Version 2.5, Kafka Connect exposes an Endpoint for getting all topics related to a Connector via its REST API (see [the original KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-558%3A+Track+the+set+of+actively+used+topics+by+connectors+in+Kafka+Connect]). While the original KIP proposed the Response Payload to look as follows: {code:java} { "some-source": { "topics": [ "foo", "bar", "baz", ] } } {code} {{}} The documentation by Confluent states the same: [https://docs.confluent.io/platform/current/connect/references/restapi.html#get--connectors-(string-name)-topics] The [actual Code|https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java#L198], however, produces a result of the following form: {code:java} { "some-source": { "connector": "some-source", "topics": [ "foo", "bar", "baz", ] } } {code} This poses a problem to some Applications (like [Strimzi Operator|https://github.com/strimzi/strimzi-kafka-operator]) since they expect the response to be of type {{Map>>}} (see [here|https://github.com/strimzi/strimzi-kafka-operator/blob/61a2301390fb9ecc87feb1925d0c2d2f1b2f8107/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectApiImpl.java#L610]), but in Kafka Connect, the return type is actually {{Map>}} (see [this test|https://github.com/apache/kafka/blob/2.7/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java#L884], for example) Which type should be expected here? Is the endpoint intended to return the type documented by Confluent, or the type that is actually present in the code? Also: I might be overlooking something, but it seems there is no official documentation of the specific API endpoints of Kafka Connect. Is that correct? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13205) Clarify API specification of Kafka Connect endpoint
[ https://issues.apache.org/jira/browse/KAFKA-13205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jonathan Kaleve updated KAFKA-13205: Description: Since Version 2.5, Kafka Connect exposes an Endpoint for getting all topics related to a Connector via its REST API (see [the original KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-558%3A+Track+the+set+of+actively+used+topics+by+connectors+in+Kafka+Connect]). While the original KIP proposed the Response Payload to look as follows: {code:java} { "some-source": { "topics": [ "foo", "bar", "baz", ] } } {code} The documentation by Confluent states the same: [https://docs.confluent.io/platform/current/connect/references/restapi.html#get--connectors-(string-name)-topics] The [actual Code|https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java#L198], however, produces a result of the following form: {code:java} { "some-source": { "connector": "some-source", "topics": [ "foo", "bar", "baz", ] } } {code} This poses a problem to some Applications (like [Strimzi Operator|https://github.com/strimzi/strimzi-kafka-operator]) since they expect the response to be of type {{Map>>}} (see [here|https://github.com/strimzi/strimzi-kafka-operator/blob/61a2301390fb9ecc87feb1925d0c2d2f1b2f8107/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectApiImpl.java#L610]), but in Kafka Connect, the return type is actually {{Map>}} (see [this test|https://github.com/apache/kafka/blob/2.7/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java#L884], for example) Which type should be expected here? Is the endpoint intended to return the type documented by Confluent, or the type that is actually present in the code? Also: I might be overlooking something, but it seems there is no official documentation of the specific API endpoints of Kafka Connect. Is that correct? was: Since Version 2.5, Kafka Connect exposes an Endpoint for getting all topics related to a Connector via its REST API (see [the original KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-558%3A+Track+the+set+of+actively+used+topics+by+connectors+in+Kafka+Connect]). While the original KIP proposed the Response Payload to look as follows: {code:java} { "some-source": { "topics": [ "foo", "bar", "baz", ] } } {code} {{}} The documentation by Confluent states the same: [https://docs.confluent.io/platform/current/connect/references/restapi.html#get--connectors-(string-name)-topics] The [actual Code|https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java#L198], however, produces a result of the following form: {code:java} { "some-source": { "connector": "some-source", "topics": [ "foo", "bar", "baz", ] } } {code} This poses a problem to some Applications (like [Strimzi Operator|https://github.com/strimzi/strimzi-kafka-operator]) since they expect the response to be of type {{Map>>}} (see [here|https://github.com/strimzi/strimzi-kafka-operator/blob/61a2301390fb9ecc87feb1925d0c2d2f1b2f8107/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectApiImpl.java#L610]), but in Kafka Connect, the return type is actually {{Map>}} (see [this test|https://github.com/apache/kafka/blob/2.7/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java#L884], for example) Which type should be expected here? Is the endpoint intended to return the type documented by Confluent, or the type that is actually present in the code? Also: I might be overlooking something, but it seems there is no official documentation of the specific API endpoints of Kafka Connect. Is that correct? > Clarify API specification of Kafka Connect endpoint > --- > > Key: KAFKA-13205 > URL: https://issues.apache.org/jira/browse/KAFKA-13205 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.7.0 >Reporter: Jonathan Kaleve >Priority: Major > > Since Version 2.5, Kafka Connect exposes an Endpoint for getting all topics > related to a Connector via its REST API (see [the original > KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-558%3A+Track+the+set+of+actively+used+topics+by+connectors+in+Kafka+Connect]). > While the original KIP proposed the Response Payload to look as follows: > {code:java} > { > "some-source": { > "topics": [ >
[jira] [Comment Edited] (KAFKA-10643) Static membership - repetitive PreparingRebalance with updating metadata for member reason
[ https://issues.apache.org/jira/browse/KAFKA-10643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17399346#comment-17399346 ] Maatari edited comment on KAFKA-10643 at 8/16/21, 11:44 AM: Hi ([~cadonna] [~ableegoldman] [~eran-levy] ), I wonder if there is any news on this. I'm having the same issue. I'm using kafka 2.8 (confluent 6.2.0 more specifically). I am using static membership. I am running into the same exact issue. Although my session.timeout.ms and max.poll.record.ms are both set to an hour, before i hit any of those timeout, i am always experiencing, a rebalance kicking in, because of either a leader re-joining or a metadata update. In each case, once the rebalance start, every member are first removed and re-join, but then the group never properly stabilize. Although somehow, my kafka stream application end up progressing, but never at full capacity. I never see in the log of the coordinator, the group being stable again, however, i can see member rejoining intermittently, and in the mean time the application progresses. Generally speaking, what is the status of this ? is it being worked on, is there any workaround ? [~eran-levy] how did you manage to work around or solve this ? was (Author: maatdeamon): Hi, I wonder if there is any news on this. I'm having the same issue. I'm using kafka 2.8 (confluent 6.2.0 more specifically). I am using static membership. I am running into the same exact issue. Although my session.timeout.ms and max.poll.record.ms are both set to an hour, before i hit any of those timeout, i am always experiencing, a rebalance kicking in, because of either a leader re-joining or a metadata update. In each case, once the rebalance start, every member are first removed and re-join, but then the group never properly stabilize. Although somehow, my kafka stream application end up progressing, but never at full capacity. I never see in the log of the coordinator, the group being stable again, however, i can see member rejoining intermittently, and in the mean time the application progresses. Generally speaking, what is the status of this ? is it being worked on, is there any workaround ? [~eran-levy] how did you manage to work around or solve this ? > Static membership - repetitive PreparingRebalance with updating metadata for > member reason > -- > > Key: KAFKA-10643 > URL: https://issues.apache.org/jira/browse/KAFKA-10643 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Eran Levy >Priority: Major > Attachments: broker-4-11.csv, client-4-11.csv, > client-d-9-11-11-2020.csv > > > Kafka streams 2.6.0, brokers version 2.6.0. Kafka nodes are healthy, kafka > streams app is healthy. > Configured with static membership. > Every 10 minutes (I assume cause of topic.metadata.refresh.interval.ms), I > see the following group coordinator log for different stream consumers: > INFO [GroupCoordinator 2]: Preparing to rebalance group **--**-stream in > state PreparingRebalance with old generation 12244 (__consumer_offsets-45) > (reason: Updating metadata for member > -stream-11-1-013edd56-ed93-4370-b07c-1c29fbe72c9a) > (kafka.coordinator.group.GroupCoordinator) > and right after that the following log: > INFO [GroupCoordinator 2]: Assignment received from leader for group > **-**-stream for generation 12246 (kafka.coordinator.group.GroupCoordinator) > > Looked a bit on the kafka code and Im not sure that I get why such a thing > happening - is this line described the situation that happens here re the > "reason:"?[https://github.com/apache/kafka/blob/7ca299b8c0f2f3256c40b694078e422350c20d19/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L311] > I also dont see it happening too often in other kafka streams applications > that we have. > The only thing suspicious that I see around every hour that different pods of > that kafka streams application throw this exception: > {"timestamp":"2020-10-25T06:44:20.414Z","level":"INFO","thread":"**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1","logger":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer > > clientId=**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1-restore-consumer, > groupId=null] Error sending fetch request (sessionId=34683236, epoch=2872) > to node > 3:","context":"default","exception":"org.apache.kafka.common.errors.DisconnectException: > null\n"} > I came across this strange behaviour after stated to investigate a strange > stuck rebalancing state after one of the members left the group and caused > the rebalance to stuck - the only thing that I found is that maybe because > that too
[GitHub] [kafka] mimaison commented on pull request #11212: KAFKA-13200: Fix MirrorMaker2 connector version
mimaison commented on pull request #11212: URL: https://github.com/apache/kafka/pull/11212#issuecomment-899357757 @showuon Good point, updated! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11218: MINOR: optimize performAssignment to skip unnecessary check
showuon commented on pull request #11218: URL: https://github.com/apache/kafka/pull/11218#issuecomment-899357307 @guozhangwang @ableegoldman , please take a look when available. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11218: MINOR: optimize performAssignment to skip unnecessary check
showuon commented on a change in pull request #11218: URL: https://github.com/apache/kafka/pull/11218#discussion_r689370424 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -597,28 +603,32 @@ private void updateGroupSubscription(Set topics) { // own metadata with the newly added topics so that it will not trigger a subsequent rebalance // when these topics gets updated from metadata refresh. // +// We skip the check for in-product assignors since this will not happen in in-product assignors. +// // TODO: this is a hack and not something we want to support long-term unless we push regex into the protocol // we may need to modify the ConsumerPartitionAssignor API to better support this case. -Set assignedTopics = new HashSet<>(); -for (Assignment assigned : assignments.values()) { -for (TopicPartition tp : assigned.partitions()) -assignedTopics.add(tp.topic()); -} +if (!isInProductAssignor(assignor.name())) { +Set assignedTopics = new HashSet<>(); +for (Assignment assigned : assignments.values()) { +for (TopicPartition tp : assigned.partitions()) +assignedTopics.add(tp.topic()); +} -if (!assignedTopics.containsAll(allSubscribedTopics)) { -Set notAssignedTopics = new HashSet<>(allSubscribedTopics); -notAssignedTopics.removeAll(assignedTopics); -log.warn("The following subscribed topics are not assigned to any members: {} ", notAssignedTopics); -} +if (!assignedTopics.containsAll(allSubscribedTopics)) { +Set notAssignedTopics = new HashSet<>(allSubscribedTopics); +notAssignedTopics.removeAll(assignedTopics); +log.warn("The following subscribed topics are not assigned to any members: {} ", notAssignedTopics); +} -if (!allSubscribedTopics.containsAll(assignedTopics)) { -Set newlyAddedTopics = new HashSet<>(assignedTopics); -newlyAddedTopics.removeAll(allSubscribedTopics); -log.info("The following not-subscribed topics are assigned, and their metadata will be " + +if (!allSubscribedTopics.containsAll(assignedTopics)) { +Set newlyAddedTopics = new HashSet<>(assignedTopics); +newlyAddedTopics.removeAll(allSubscribedTopics); +log.info("The following not-subscribed topics are assigned, and their metadata will be " + "fetched from the brokers: {}", newlyAddedTopics); -allSubscribedTopics.addAll(assignedTopics); -updateGroupSubscription(allSubscribedTopics); +allSubscribedTopics.addAll(newlyAddedTopics); Review comment: side fix: We can just add the `newlyAddedTopics` here because we already computed the it by `assignedTopics - allSubscribedTopics`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request #11218: MINOR: optimize performAssignment to skip unnecessary check
showuon opened a new pull request #11218: URL: https://github.com/apache/kafka/pull/11218 Found this while reading the code. We did a "a little heavy" check each time after performing assignment, which is to compare the "assigned topics" set and the "subscribed topics" set, to see if there's any topics not existed in another set. Also, the "assigned topics" set is created by traversing all the assigned partitions, which will be a little heavy if partition numbers are large. However, as the comments described, it's a safe-guard for user-customized assignor, which might do assignment that we don't expected. In most cases, user will just use the in-product assignor, which guarantee that we only assign the topics from subscribed topics. Therefore, no need this check for in-product assignors. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11217: KAFKA-13204: assignor name conflict check
showuon commented on pull request #11217: URL: https://github.com/apache/kafka/pull/11217#issuecomment-899322956 @ableegoldman , please take a look when available. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request #11217: KAFKA-13204: assignor name conflict check
showuon opened a new pull request #11217: URL: https://github.com/apache/kafka/pull/11217 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13081) Port sticky assignor fixes (KAFKA-12984) back to 2.8
[ https://issues.apache.org/jira/browse/KAFKA-13081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17399588#comment-17399588 ] Luke Chen commented on KAFKA-13081: --- My PR is ready for [~ableegoldman] 's review, I think this PR should be fast since it's just a back port PR. > Port sticky assignor fixes (KAFKA-12984) back to 2.8 > > > Key: KAFKA-13081 > URL: https://issues.apache.org/jira/browse/KAFKA-13081 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: A. Sophie Blee-Goldman >Assignee: Luke Chen >Priority: Blocker > Fix For: 2.8.1 > > > We should make sure that fix #1 and #2 of > [#10985|https://github.com/apache/kafka/pull/10985] make it back to the 2.8 > sticky assignor, since it's pretty much impossible to smoothly cherrypick > that commit from 3.0 to 2.8 due to all the recent improvements and > refactoring in the AbstractStickyAssignor. Either we can just extract and > apply those two fixes to 2.8 directly, or go back and port all the commits > that made this cherrypick difficult over to 2.8 as well. If we do so then > cherrypicking the original commit should be easy. > See [this > comment|https://github.com/apache/kafka/pull/10985#issuecomment-879521196] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13081) Port sticky assignor fixes (KAFKA-12984) back to 2.8
[ https://issues.apache.org/jira/browse/KAFKA-13081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17399566#comment-17399566 ] David Jacot commented on KAFKA-13081: - [~ableegoldman] [~showuon] This is the only remaining blocker for 2.8.1. Would you already have an ETA for it? > Port sticky assignor fixes (KAFKA-12984) back to 2.8 > > > Key: KAFKA-13081 > URL: https://issues.apache.org/jira/browse/KAFKA-13081 > Project: Kafka > Issue Type: Bug > Components: consumer >Reporter: A. Sophie Blee-Goldman >Assignee: Luke Chen >Priority: Blocker > Fix For: 2.8.1 > > > We should make sure that fix #1 and #2 of > [#10985|https://github.com/apache/kafka/pull/10985] make it back to the 2.8 > sticky assignor, since it's pretty much impossible to smoothly cherrypick > that commit from 3.0 to 2.8 due to all the recent improvements and > refactoring in the AbstractStickyAssignor. Either we can just extract and > apply those two fixes to 2.8 directly, or go back and port all the commits > that made this cherrypick difficult over to 2.8 as well. If we do so then > cherrypicking the original commit should be easy. > See [this > comment|https://github.com/apache/kafka/pull/10985#issuecomment-879521196] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13204) wrong assignor selected if the assignor name is identical
Luke Chen created KAFKA-13204: - Summary: wrong assignor selected if the assignor name is identical Key: KAFKA-13204 URL: https://issues.apache.org/jira/browse/KAFKA-13204 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 2.8.0 Reporter: Luke Chen Assignee: Luke Chen We used the partition assignor name to identify which assignor to use in consumer coordinator. But we didn't do any assignor name conflict check, which will cause the wrong assignor got selected issue. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13204) wrong assignor selected if the assignor name is identical
[ https://issues.apache.org/jira/browse/KAFKA-13204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-13204: -- Description: We used the partition assignor name to identify which assignor to use in consumer coordinator. But we didn't do any assignor name conflict check, which will cause the wrong assignor got selected when performing assignment. (was: We used the partition assignor name to identify which assignor to use in consumer coordinator. But we didn't do any assignor name conflict check, which will cause the wrong assignor got selected issue.) > wrong assignor selected if the assignor name is identical > - > > Key: KAFKA-13204 > URL: https://issues.apache.org/jira/browse/KAFKA-13204 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.8.0 >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > We used the partition assignor name to identify which assignor to use in > consumer coordinator. But we didn't do any assignor name conflict check, > which will cause the wrong assignor got selected when performing assignment. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13195) StateSerde don't honor DeserializationExceptionHandler
[ https://issues.apache.org/jira/browse/KAFKA-13195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17399536#comment-17399536 ] Ludo commented on KAFKA-13195: -- Thanks [~mjsax] for all the feedback. I understand your point of view, about the option you describe : * A custom deserializer can be an option for simple POJO (mean simple application with simple POJO embedded in the application), in my case, it's [plugins based|https://kestra.io/docs/plugin-developer-guide/] solution, and I need to be resilient to external changed that I don't know : If the plugins is breaking 1 message, the app must continue processing all the other. * using a Transfomer, don't really catch it here. Look at the stack trace, I don't know how to catch this exception on my side, the only way (as I see) to catch it is with a [Custom Deserializer|https://github.com/kestra-io/kestra/blob/b55ac08ba9661b7803cfd7ac4116f4c93c9f29ec/runner-kafka/src/main/java/io/kestra/runner/kafka/serializers/JsonDeserializer.java#L32] but I will be only able to return a null value here, not really sure the underlying part of state store will not crash? * Reset the application is also not an option, since the deserialization will failed on deserialization on my case even with reprocessing (still no idea of what plugins have done). I agree with you this is not a bug with your explanation, but maybe another option to handle it like {{StreamsConfig.}}{{DEFAULT_STATE_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG}}. will be fined with the same behavior from input topic ? In a real world, stream application must never crashed, and give the control to handle invalid data on state store (skip, dead letter queue, delete from state store ...) is a real need in my opinion. What do you think ? > StateSerde don't honor DeserializationExceptionHandler > -- > > Key: KAFKA-13195 > URL: https://issues.apache.org/jira/browse/KAFKA-13195 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Ludo >Priority: Major > > Kafka streams allow to configure an > [DeserializationExceptionHandler|https://docs.confluent.io/platform/current/streams/faq.html#failure-and-exception-handling] > > When you are using a StateStore most of message will be a copy of original > message in internal topic and mostly will use the same serializer if the > message is another type. > You can see > [here|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java#L159-L161] > that StateSerde is using the raw Deserializer and not honor the > {{StreamsConfig.}}{{DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG}}. > Leading to crash the application (reaching the > {{setUncaughtExceptionHandler}} method). > I think the state store must have the same behavior than the > {{RecordDeserializer}} and honor the DeserializationExceptionHandler. > > Stacktrace (coming from kafka stream 2.6.1) : > > {code:java} > Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing > !Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing > !org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=1_14, processor=workertaskjoined-repartition-source, > topic=kestra_executor-workertaskjoined-repartition, partition=14, > offset=167500, > stacktrace=org.apache.kafka.common.errors.SerializationException: > com.fasterxml.jackson.databind.exc.InvalidFormatException: Cannot deserialize > value of type `io.kestra.plugin.gcp.bigquery.ExtractToGcs$Format` from String > "txt": not one of the values accepted for Enum class: > [NEWLINE_DELIMITED_JSON, AVRO, PARQUET, CSV] at [Source: > (byte[])"{[truncated 1270 bytes]; line: 1, column: 1187] (through > reference chain: > io.kestra.core.models.flows.Flow["tasks"]->java.util.ArrayList[1]->org.kestra.task.gcp.bigquery.ExtractToGcs["format"]) > at > com.fasterxml.jackson.databind.exc.InvalidFormatException.from(InvalidFormatException.java:67) > at > com.fasterxml.jackson.databind.DeserializationContext.weirdStringException(DeserializationContext.java:1851) > at > com.fasterxml.jackson.databind.DeserializationContext.handleWeirdStringValue(DeserializationContext.java:1079) > at > com.fasterxml.jackson.databind.deser.std.EnumDeserializer._deserializeAltString(EnumDeserializer.java:327) > at > com.fasterxml.jackson.databind.deser.std.EnumDeserializer._fromString(EnumDeserializer.java:214) > at > com.fasterxml.jackson.databind.deser.std.EnumDeserializer.deserialize(EnumDeserializer.java:188) > at > com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:138) > at >
[GitHub] [kafka] lkokhreidze commented on pull request #10851: KAFKA-6718 / Rack aware standby task assignor
lkokhreidze commented on pull request #10851: URL: https://github.com/apache/kafka/pull/10851#issuecomment-899245208 Hi @cadonna ! Thanks for the feedback. I will address your comments this week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org