[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
vvcephei commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r414654696 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java ## @@ -111,8 +115,9 @@ private final String storeName = "store"; private AtomicBoolean errorInjected; -private AtomicBoolean gcInjected; -private volatile boolean doGC = true; +private AtomicBoolean stallInjected; Review comment: This is another case where I've gotten tripped up by the same thing twice, and decided to fix it this time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
vvcephei commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r414655837 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java ## @@ -41,8 +42,8 @@ import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION; public final class AssignorConfiguration { -public static final String HIGH_AVAILABILITY_ENABLED_CONFIG = "internal.high.availability.enabled"; -private final boolean highAvailabilityEnabled; +public static final String INTERNAL_TASK_ASSIGNOR_CLASS = "internal.task.assignor.class"; Review comment: Oh, yeah, good idea. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch
hachikuji commented on pull request #8509: URL: https://github.com/apache/kafka/pull/8509#issuecomment-619100794 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch
hachikuji commented on pull request #8509: URL: https://github.com/apache/kafka/pull/8509#issuecomment-619100923 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #8543: [KAFKA-9826] Handle an unaligned first dirty offset during log cleani…
junrao commented on pull request #8543: URL: https://github.com/apache/kafka/pull/8543#issuecomment-619129858 @steverod : There seems to be compilation errors in JDK 8 test? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #8545: MINOR: smoke test for jmh benchmark functionality
ijuma commented on pull request #8545: URL: https://github.com/apache/kafka/pull/8545#issuecomment-619055845 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #8543: [KAFKA-9826] Handle an unaligned first dirty offset during log cleani…
chia7712 commented on pull request #8543: URL: https://github.com/apache/kafka/pull/8543#issuecomment-619070458 is this duplicate to #8542? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #8103: KAFKA-7061: KIP-280 Enhanced log compaction
junrao commented on a change in pull request #8103: URL: https://github.com/apache/kafka/pull/8103#discussion_r414175405 ## File path: core/src/main/scala/kafka/log/OffsetMap.scala ## @@ -20,28 +20,80 @@ package kafka.log import java.util.Arrays import java.security.MessageDigest import java.nio.ByteBuffer + +import kafka.log.CompactionStrategy.CompactionStrategy import kafka.utils._ -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.record.Record +import org.apache.kafka.common.utils.{ByteUtils, Utils} trait OffsetMap { + /* The maximum number of entries this map can contain */ def slots: Int - def put(key: ByteBuffer, offset: Long): Unit - def get(key: ByteBuffer): Long + + /* Initialize the map with the topic compact strategy */ + def init(strategy: String, headerKey: String, cleanerThreadId: Int, topicPartitionName: String) Review comment: Since this is called in every round of cleaning, perhaps it should be called reinitialize()? ## File path: core/src/main/scala/kafka/log/OffsetMap.scala ## @@ -68,90 +120,164 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extend /** * The number of bytes of space each entry uses (the number of bytes in the hash plus an 8 byte offset) + * This evaluates to the number of bytes in the hash plus 8 bytes for the offset Review comment: We probably don't need the comment on the line above now? ## File path: core/src/main/scala/kafka/log/OffsetMap.scala ## @@ -20,28 +20,80 @@ package kafka.log import java.util.Arrays import java.security.MessageDigest import java.nio.ByteBuffer + +import kafka.log.CompactionStrategy.CompactionStrategy import kafka.utils._ -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.record.Record +import org.apache.kafka.common.utils.{ByteUtils, Utils} trait OffsetMap { + /* The maximum number of entries this map can contain */ def slots: Int - def put(key: ByteBuffer, offset: Long): Unit - def get(key: ByteBuffer): Long + + /* Initialize the map with the topic compact strategy */ + def init(strategy: String, headerKey: String, cleanerThreadId: Int, topicPartitionName: String) + + /** + * Associate this offset to the given key. + * @param record The record + * @return success flag + */ + def put(record: Record): Boolean + + /** + * Checks to see whether to retain the record or not + * @param record The record + * @return true to retain; false not to + */ + def shouldRetainRecord(record: Record): Boolean + + /** + * Get the offset associated with this key. + * @param key The key + * @return The offset associated with this key or -1 if the key is not found + */ + def getOffset(key: ByteBuffer): Long + + /** + * Get the version associated with this key for non-offset based strategy. + * @param key The key + * @return The version associated with this key or -1 if the key is not found + */ + def getVersion(key: ByteBuffer): Long + + /** + * Sets the passed value as the latest offset. + * @param offset teh latest offset + */ def updateLatestOffset(offset: Long): Unit - def clear(): Unit + + /* The number of entries put into the map (note that not all may remain) */ Review comment: Hmm, not sure that I understand "note that not all may remain". ## File path: core/src/main/scala/kafka/log/OffsetMap.scala ## @@ -68,90 +120,164 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extend /** * The number of bytes of space each entry uses (the number of bytes in the hash plus an 8 byte offset) + * This evaluates to the number of bytes in the hash plus 8 bytes for the offset + * and, if applicable, another 8 bytes for non-offset compact strategy (set in the init method). */ - val bytesPerEntry = hashSize + 8 - + var bytesPerEntry = hashSize + longByteSize + /** * The maximum number of entries this map can contain */ - val slots: Int = memory / bytesPerEntry + var slots: Int = memory / bytesPerEntry + + /* compact strategy */ + private var compactionStrategy: CompactionStrategy = null + + /* header key for the Strategy header to look for */ + private var headerKey: String = "" + + /** + * Initialize the map with the topic compact strategy + * @param strategy The compaction strategy + * @param headerKey The header key if the compaction strategy is set to header + * @param cleanerThreadId The cleaner thread id + * @param topicPartitionName The topic partition name + */ + override def init(strategy: String = Defaults.CompactionStrategyOffset, headerKey: String = "", cleanerThreadId: Int = -1, topicPartitionName: String = "") { +// set the log indent for the topic partition +this.logIdent = s"[OffsetMap-$cleanerThreadId $topicPartitionName]: " + +// Change the salt used for key hashing making all existing
[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
vvcephei commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r414651483 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ## @@ -713,23 +713,18 @@ private boolean assignTasksToClients(final Set allSourceTopics, allTasks, clientStates, numStandbyReplicas()); final TaskAssignor taskAssignor; -if (highAvailabilityEnabled) { -if (lagComputationSuccessful) { -taskAssignor = new HighAvailabilityTaskAssignor( -clientStates, -allTasks, -statefulTasks, -assignmentConfigs); -} else { -log.info("Failed to fetch end offsets for changelogs, will return previous assignment to clients and " - + "trigger another rebalance to retry."); -setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code()); -taskAssignor = new StickyTaskAssignor(clientStates, allTasks, statefulTasks, assignmentConfigs, true); -} +if (!lagComputationSuccessful) { Review comment: This is a good thought. I think it mitigates the downside that we do still assign all the tasks when we fail to fetch lags, so it's not like we make no progress while waiting for the next rebalance. The "endless cycle" is a concern, but I'm not sure how it could happen in practice. I.e., what would make brokers consistently fail to report end offsets, but _not_ fail on any other APIs that Streams needs, especially since Streams needs to query the end-offset API during restoration anyway. It seems like the failure would either be transient or permanent(ish). If transient, then Streams will make progress during the probing.rebalance.interval, and succeed in balancing the assignment later. Even if we get further transient exceptions _during_ the sequence of HATA probing rebalances, the fact that we just return all tasks to their prior owners and that the HATA is stable mean that we just delay convergence by a single probing.rebalance.interval, not start all over again. If permanent, then Streams will fail anyway _after_ the assignment completes, since it also tends to query the end offsets immediately after getting the assignment. Even if it gets all prior tasks returned, which would make it skip the restoration phase, it seems implausible that we'd see a permanent failure on _only_ the end-offset API and Streams would happily be able to poll, commit, manage transactions, etc. Our big alternative is just to immediately raise the exception, and leave it to KIP-572 to deal with the situation holistically. But I'm concerned that the impact of bombing out of assignment is greater than that of handling other failures during processing. It seems like an exception in assignment dooms the current Join/SyncGroup phase for everyone, which means that they have to wait for a timeout and then redo the rebalance. So KIP-572 can still recover gracefully, by reconstructing the consumer, but it can't help the extra downtime of waiting for the failed rebalance to time out and trying again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
vvcephei commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r414662116 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java ## @@ -41,8 +42,8 @@ import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION; public final class AssignorConfiguration { -public static final String HIGH_AVAILABILITY_ENABLED_CONFIG = "internal.high.availability.enabled"; -private final boolean highAvailabilityEnabled; +public static final String INTERNAL_TASK_ASSIGNOR_CLASS = "internal.task.assignor.class"; Review comment: Ok, I moved it to `org.apache.kafka.streams.StreamsConfig.InternalConfig#INTERNAL_TASK_ASSIGNOR_CLASS`. I made an ad-hoc decision not to add the underscores, though, because this config is different than the other internal configs. I added comments to InternalConfig to explain the difference. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8532: HOTFIX: Fix broker bounce system tests
mjsax commented on a change in pull request #8532: URL: https://github.com/apache/kafka/pull/8532#discussion_r414679179 ## File path: tests/kafkatest/tests/streams/streams_broker_bounce_test.py ## @@ -164,7 +164,7 @@ def setup_system(self, start_processor=True, num_threads=3): # Start test harness self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka) -self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, num_threads) +self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, "at_least_once", num_threads) Review comment: We can try to add both eos cases, too. For `Streams[Complex]EosTestJobRunnerService`, they use a different Java class compared to the "smoke test" runner. Not sure if we could unify them? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
vvcephei commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r414653962 ## File path: build.gradle ## @@ -236,8 +236,10 @@ subprojects { def logStreams = new HashMap() beforeTest { TestDescriptor td -> def tid = testId(td) + // truncate the file name if it's too long def logFile = new File( - "${projectDir}/build/reports/testOutput/${tid}.test.stdout") + "${projectDir}/build/reports/testOutput/${tid.substring(0, Math.min(tid.size(),240))}.test.stdout" Review comment: The only alternative I can think of is to parameterize the "short name" of the TaskAssignor, which seems kind of wacky. Also, worth noting the impact of truncation is nothing if the file name is still unique. If the name is shared between two tests, then the impact is still nothing if both tests pass. The only observable effect is that if one or both tests fail, their logs would get combined. It seems like we can afford just to defer this problem until it happens, if ever. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #8529: KAFKA-9901:Fix streams_broker_bounce_test error
cadonna commented on pull request #8529: URL: https://github.com/apache/kafka/pull/8529#issuecomment-619080823 In order to avoid duplicate work, I just want to let you know that I also have a PR that fixes this issue the same way https://github.com/apache/kafka/pull/8532. I have already run the system tests on that PR to verify the fix. On my PR, there is also a discussion on which value to set the new constructor parameter. You can merge this or the other PR, I do not care. I just wanted to let you know. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8504: KAFKA-9298: reuse mapped stream error in joins
vvcephei commented on a change in pull request #8504: URL: https://github.com/apache/kafka/pull/8504#discussion_r414731001 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ## @@ -989,16 +994,18 @@ private void to(final TopicNameExtractor topicExtractor, null, optimizableRepartitionNodeBuilder); -final OptimizableRepartitionNode optimizableRepartitionNode = optimizableRepartitionNodeBuilder.build(); -builder.addGraphNode(streamsGraphNode, optimizableRepartitionNode); +if (repartitionNode == null || !name.equals(repartitionName)) { Review comment: Since you made the mistake of asking my opinion, here it is :) : > bumping the index It's true that users can't currently reuse the KStream, so there's no compatibility issue there, but we can't bump the index for the _first_ repartition topic, or we would break every topology that uses generated repartition topic names already. So, either way, we have to cache something to tell us to do something different on the "first reuse" (i.e., the second use of the KStream). Since we have to do that anyway, maybe it's fine to just cache the repartition node itself instead of a flag that says "bump the index next time". > leaking optimizations into the DSL I'm on the fence about whether this is an "optimization" or "reasonable behavior". It sort of feels like the latter, and the only reason we needed to introduce the "repartition-collapsing" optimization is that we failed to introduce reasonable behavior from the beginning. Also, my read is that the DSL builder and the optimizer are not cleanly separated right now anyway, and if we ever want to build more optimizations, we'll most likely need to make another pass on both anyway. We're also starting to think about topology evolution (cc @cadonna ), which makes this a less scary prospect, as we can then implement a mechanism to _compatibly_ introduce new optimizations. In other words, I'm not taking a hard stance, but leaning in the direction of doing the more efficient thing than the more pure thing, since we're not currently super pure anyway. > Other repartition topics I think we'd better leave it alone for now, implement topology evolution, then migrate to a completely pure and consistent approach. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #8529: KAFKA-9901:Fix streams_broker_bounce_test error
cadonna commented on pull request #8529: URL: https://github.com/apache/kafka/pull/8529#issuecomment-619140731 @jiameixie My PR was merged. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #8524: KAFKA-9866: Avoid election for topics where preferred leader is not in ISR
junrao commented on a change in pull request #8524: URL: https://github.com/apache/kafka/pull/8524#discussion_r414740034 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -1068,12 +1068,23 @@ class KafkaController(val config: KafkaConfig, val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => controllerContext.isReplicaOnline(leaderBroker, tp) && controllerContext.partitionsBeingReassigned.isEmpty && !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) && - controllerContext.allTopics.contains(tp.topic)) + controllerContext.allTopics.contains(tp.topic) && + isPreferredLeaderInSync(tp) + ) onReplicaElection(candidatePartitions.toSet, ElectionType.PREFERRED, AutoTriggered) } } } + private def isPreferredLeaderInSync(tp: TopicPartition): Boolean = { Review comment: Perhaps a more accurate name is canPreferredReplicaBeLeader()? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
ableegoldman commented on pull request #8541: URL: https://github.com/apache/kafka/pull/8541#issuecomment-619148319 > 29 system tests failed Expected, but still upsetting This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #8543: [KAFKA-9826] Handle an unaligned first dirty offset during log cleani…
junrao commented on pull request #8543: URL: https://github.com/apache/kafka/pull/8543#issuecomment-619128544 @chia7712 : It's the same patch, but for a different branch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8483: KAFKA-9865: Expose output topic names from TopologyTestDriver
mjsax commented on a change in pull request #8483: URL: https://github.com/apache/kafka/pull/8483#discussion_r414732356 ## File path: streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java ## @@ -855,6 +856,20 @@ public void advanceWallClockTime(final Duration advance) { return new TestOutputTopic<>(this, topicName, keyDeserializer, valueDeserializer); } +/** + * Get all the names of all the topics to which records have been output. + * + * Call this method after piping the input into the test driver to retrieve the full set of topics the topology + * produced records to. + * + * The returned set of topic names includes changelog, repartition and sink topic names. + * + * @return the set of output topic names. + */ +public final Set getOutputTopicNames() { Review comment: @big-andy-coates Can you update the method name according to the KIP discussion? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #8532: HOTFIX: Fix broker bounce system tests
abbccdda commented on a change in pull request #8532: URL: https://github.com/apache/kafka/pull/8532#discussion_r414636938 ## File path: tests/kafkatest/tests/streams/streams_broker_bounce_test.py ## @@ -164,7 +164,7 @@ def setup_system(self, start_processor=True, num_threads=3): # Start test harness self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka) -self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, num_threads) +self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, "at_least_once", num_threads) Review comment: I feel neutral about it, if we are adding 2X time to system tests, that's probably not good. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #8542: [KAFKA-9826] Handle an unaligned first dirty offset during log cleani…
junrao commented on pull request #8542: URL: https://github.com/apache/kafka/pull/8542#issuecomment-619127117 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #8525: KAFKA-9885; Evict last members of a group when the maximum allowed is reached
hachikuji commented on pull request #8525: URL: https://github.com/apache/kafka/pull/8525#issuecomment-619142257 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a change in pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…
zhaohaidao commented on a change in pull request #8550: URL: https://github.com/apache/kafka/pull/8550#discussion_r415334757 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -633,6 +634,45 @@ public final void copartitionSources(final Collection sourceNodes) { copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes))); } +public void validateCoPartition() { +final List> copartitionGroups = +copartitionSourceGroups +.stream() +.map(sourceGroup -> sourceGroup +.stream() +.flatMap(node -> nodeToSourceTopics.get(node).stream()) +.collect(Collectors.toSet()) +).collect(Collectors.toList()); +for (final Set coPartition : copartitionGroups) { +final Map coPartitionProperties = new HashMap<>(); +internalTopicNamesWithProperties.forEach((topic, prop) -> { +if (coPartition.contains(topic) && prop.getNumberOfPartitions().isPresent()) { +coPartitionProperties.put(topic, prop); +} +}); +if (coPartition.size() == coPartitionProperties.size()) { Review comment: It's my pleasure. It means that not all input topics have correspond internal topic if coPartition.size() != coPartitionProperties.size(), if not equal is true, we can just skip this validation. You can see the original validation in CopartitionedTopicsEnforcer#enforce ``` if (copartitionGroup.equals(repartitionTopicConfigs.keySet())) { ... validateAndGetNumOfPartitions ... } ``` If some of input topics don't have repartition operation, their internal topic partition number can be deducted by others which have repartition operation. You can see KStreamRepartitionIntegrationTest#shouldDeductNumberOfPartitionsFromRepartitionOperation for more details. So we can skip this validation if coPartition.size() != coPartitionProperties.size() This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] big-andy-coates commented on pull request #8483: KAFKA-9865: Expose output topic names from TopologyTestDriver
big-andy-coates commented on pull request #8483: URL: https://github.com/apache/kafka/pull/8483#issuecomment-619531323 @mjsax updated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zhaohaidao commented on a change in pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…
zhaohaidao commented on a change in pull request #8550: URL: https://github.com/apache/kafka/pull/8550#discussion_r415335411 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java ## @@ -312,6 +312,7 @@ public void buildAndOptimizeTopology(final Properties props) { graphNodePriorityQueue.offer(graphNode); } } +internalTopologyBuilder.validateCoPartition(); Review comment: I'm not sure if we can remove later stage validation code @mjsax @lkokhreidze Can you give some advice? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #8525: KAFKA-9885; Evict last members of a group when the maximum allowed is reached
hachikuji commented on a change in pull request #8525: URL: https://github.com/apache/kafka/pull/8525#discussion_r414737521 ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -124,6 +124,35 @@ class GroupCoordinator(val brokerId: Int, info("Shutdown complete.") } + /** + * Verify if the group has space to accept the joining member. The various + * criteria are explained bellow. Review comment: typo: `bellow` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology
ableegoldman commented on a change in pull request #8540: URL: https://github.com/apache/kafka/pull/8540#discussion_r414822406 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -701,18 +703,33 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, internalTopologyBuilder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG))); +final int numStreamThreads; +if (internalTopologyBuilder.hasNoNonGlobalTopology()) { +log.warn("Overriding number of StreamThreads to zero for global-only topology"); +numStreamThreads = 0; +} else { +numStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); +} + // create the stream thread, global update thread, and cleanup thread -threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)]; +threads = new StreamThread[numStreamThreads]; + +final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology(); +final boolean hasGlobalTopology = globalTaskTopology != null; + +if (numStreamThreads == 0 && !hasGlobalTopology) { +log.error("Must subscribe to at least one source topic or global table"); +throw new IllegalArgumentException("Topology has no stream threads and no global threads"); Review comment: I guess I can't personally imagine any reason to ever want an app running with an empty topology, and would prefer to be notified immediately since I presumably did something wrong. But if you feel strongly about allowing this I can demote this to a warning This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] steverod commented on pull request #8543: [KAFKA-9826] Handle an unaligned first dirty offset during log cleani…
steverod commented on pull request #8543: URL: https://github.com/apache/kafka/pull/8543#issuecomment-619214390 > is this duplicate to #8542? Yes, this is aimed at 2.4 rather than 2.5. Took a best guess for propagation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8504: KAFKA-9298: reuse mapped stream error in joins
vvcephei commented on a change in pull request #8504: URL: https://github.com/apache/kafka/pull/8504#discussion_r414731001 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ## @@ -989,16 +994,18 @@ private void to(final TopicNameExtractor topicExtractor, null, optimizableRepartitionNodeBuilder); -final OptimizableRepartitionNode optimizableRepartitionNode = optimizableRepartitionNodeBuilder.build(); -builder.addGraphNode(streamsGraphNode, optimizableRepartitionNode); +if (repartitionNode == null || !name.equals(repartitionName)) { Review comment: Since you made the mistake of asking my opinion, here it is :) : > bumping the index It's true that users can't currently reuse the KStream, so there's no compatibility issue there, but we can't bump the index for the _first_ repartition topic, or we would break every topology that uses generated repartition topic names already. So, either way, we have to cache something to tell us to do something different on the "first reuse" (i.e., the second use of the KStream). Since we have to do that anyway, maybe it's fine to just cache the repartition node itself instead of a flag that says "bump the index next time". > leaking optimizations into the DSL I'm on the fence about whether this is an "optimization" or "reasonable behavior". It sort of feels like the latter, and the only reason we needed to introduce the "repartition-collapsing" optimization is that we failed to introduce reasonable behavior from the beginning. Also, my read is that the DSL builder and the optimizer are not cleanly separated right now anyway, and if we ever want to build more optimizations, we'll most likely need to make another pass on both anyway. We're also starting to think about topology evolution (cc @cadonna ), which makes this a less scary prospect, as we can then implement a mechanism to _compatibly_ introduce new optimizations. In other words, I'm not taking a hard stance, but leaning in the direction of doing the more efficient thing than the more pure thing, since we're not currently super pure anyway. > Other repartition topics I think we'd better leave it alone for now, implement topology evolution, then migrate to a completely pure and consistent approach. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8254: KIP-557: Add Emit On Change Support
vvcephei commented on pull request #8254: URL: https://github.com/apache/kafka/pull/8254#issuecomment-619195143 FYI, a recent change is that only committers can kick off the tests. It's mighty inconvenient, but better for security. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8254: KIP-557: Add Emit On Change Support
vvcephei commented on pull request #8254: URL: https://github.com/apache/kafka/pull/8254#issuecomment-619194786 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ConcurrencyPractitioner commented on pull request #8254: KIP-557: Add Emit On Change Support
ConcurrencyPractitioner commented on pull request #8254: URL: https://github.com/apache/kafka/pull/8254#issuecomment-619213065 @vvcephei Can you retrigger tests? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ConcurrencyPractitioner commented on pull request #8254: KIP-557: Add Emit On Change Support
ConcurrencyPractitioner commented on pull request #8254: URL: https://github.com/apache/kafka/pull/8254#issuecomment-619212877 @vvcephei Oh yeah, I was aware of that. Just recently though, I was added to the jenkins whitelist (my handle is listed in .asf.yaml). Supposedly, I will be able to trigger tests after doing that, but it appears that wasn't effective. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
vvcephei commented on pull request #8541: URL: https://github.com/apache/kafka/pull/8541#issuecomment-619162446 > > 29 system tests failed > > Expected, but still upsetting scream_cat I should have said "130 system tests passed" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology
ableegoldman commented on a change in pull request #8540: URL: https://github.com/apache/kafka/pull/8540#discussion_r414819765 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -883,6 +887,50 @@ public void statefulTopologyShouldCreateStateDirectory() throws Exception { startStreamsAndCheckDirExists(topology, true); } +@Test +public void shouldThrowIllegalArgumentExceptionOnEmptyTopology() { +assertThrows( +IllegalArgumentException.class, +() -> new KafkaStreams(new StreamsBuilder().build(), props, supplier, time) +); +} + +@Test +public void shouldNotCreateStreamThreadsForGlobalOnlyTopology() { +final StreamsBuilder builder = new StreamsBuilder(); +builder.globalTable("anyTopic"); +final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time); + +assertThat(streams.threads.length, equalTo(0)); +} + +@Test +public void shouldNotTransitToErrorStateWithGlobalOnlyTopology() throws InterruptedException { +final StreamsBuilder builder = new StreamsBuilder(); +builder.globalTable("anyTopic"); +final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time); +streams.setStateListener((newState, oldState) -> { +if (newState.equals(State.ERROR)) { +throw new AssertionError("Should not have transitioned to ERROR state with no stream threads"); Review comment: I guess we don't need to throw here, it would just cause KafkaStreams to transition to ERROR and fail below. But I realized this doesn't even do that because we're mocking pretty much everything in this test class including the stream threads. I'll try to look for a better way and place to do this test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology
ableegoldman commented on a change in pull request #8540: URL: https://github.com/apache/kafka/pull/8540#discussion_r414825387 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -883,6 +887,50 @@ public void statefulTopologyShouldCreateStateDirectory() throws Exception { startStreamsAndCheckDirExists(topology, true); } +@Test +public void shouldThrowIllegalArgumentExceptionOnEmptyTopology() { +assertThrows( Review comment: Ack This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology
ableegoldman commented on a change in pull request #8540: URL: https://github.com/apache/kafka/pull/8540#discussion_r414832679 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -327,15 +331,15 @@ private void prepareStreamThread(final StreamThread thread, final boolean termin @Test public void testShouldTransitToNotRunningIfCloseRightAfterCreated() { -final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time); +final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); Review comment: Would a class-level `final StreamsBuilder builder = new StreamsBuilder()` that we add a source to in the setUp be any. better iyo? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #8527: Remove dependencies on deprecated --zookeeper command flags in junit tests
d8tltanc commented on a change in pull request #8527: URL: https://github.com/apache/kafka/pull/8527#discussion_r414844095 ## File path: core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala ## @@ -1408,13 +1408,23 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet sslStoreProps.put(KafkaConfig.PasswordEncoderSecretProp, kafkaConfig.passwordEncoderSecret.map(_.value).orNull) zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path) -val args = Array("--zookeeper", kafkaConfig.zkConnect, - "--alter", "--add-config", sslStoreProps.asScala.map { case (k, v) => s"$k=$v" }.mkString(","), - "--entity-type", "brokers", - "--entity-name", kafkaConfig.brokerId.toString) -ConfigCommand.main(args) +val entityType = ConfigType.Broker +val entityName = kafkaConfig.brokerId.toString +val passwordConfigs = sslStoreProps.asScala.keySet.filter(DynamicBrokerConfig.isPasswordConfig) +val passwordEncoderConfigs = new Properties +passwordEncoderConfigs ++= sslStoreProps.asScala.filter { case (key, _) => key.startsWith("password.encoder.") } Review comment: Good catch. Deleted. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8535: KAFKA-9903
guozhangwang commented on pull request #8535: URL: https://github.com/apache/kafka/pull/8535#issuecomment-619168284 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8254: KIP-557: Add Emit On Change Support
vvcephei commented on pull request #8254: URL: https://github.com/apache/kafka/pull/8254#issuecomment-619194659 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology
ableegoldman commented on a change in pull request #8540: URL: https://github.com/apache/kafka/pull/8540#discussion_r414823010 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -327,15 +331,15 @@ private void prepareStreamThread(final StreamThread thread, final boolean termin @Test public void testShouldTransitToNotRunningIfCloseRightAfterCreated() { -final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time); +final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); Review comment: Is `getBuilderWithSource` really that much uglier than `new StreamsBuilder`? :P This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology
ableegoldman commented on a change in pull request #8540: URL: https://github.com/apache/kafka/pull/8540#discussion_r414822929 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -701,18 +703,33 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, internalTopologyBuilder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG))); +final int numStreamThreads; +if (internalTopologyBuilder.hasNoNonGlobalTopology()) { +log.warn("Overriding number of StreamThreads to zero for global-only topology"); +numStreamThreads = 0; +} else { +numStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); +} + // create the stream thread, global update thread, and cleanup thread -threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)]; +threads = new StreamThread[numStreamThreads]; + +final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology(); +final boolean hasGlobalTopology = globalTaskTopology != null; + +if (numStreamThreads == 0 && !hasGlobalTopology) { +log.error("Must subscribe to at least one source topic or global table"); +throw new IllegalArgumentException("Topology has no stream threads and no global threads"); Review comment: Also, do we have an `InvalidTopologyException` or similar exception already? Or were you proposing to add a new type This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #8542: [KAFKA-9826] Handle an unaligned first dirty offset during log cleani…
junrao commented on pull request #8542: URL: https://github.com/apache/kafka/pull/8542#issuecomment-619216927 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem
lbradstreet commented on a change in pull request #8517: URL: https://github.com/apache/kafka/pull/8517#discussion_r413086046 ## File path: core/src/main/scala/kafka/utils/DelayedItem.scala ## @@ -21,24 +21,19 @@ import java.util.concurrent._ import org.apache.kafka.common.utils.Time -import scala.math._ +class DelayedItem(val delayMs: Long, val time: Time) extends Logging { + def this(delayMs: Long) = this(delayMs, Time.SYSTEM) Review comment: Thanks, yes, I was just coming back to say your suggestion was better for testability reasons. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] belugabehr opened a new pull request #8531: KAFAKA-9904: Use ThreadLocalConcurrent to Replace Random
belugabehr opened a new pull request #8531: URL: https://github.com/apache/kafka/pull/8531 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna opened a new pull request #8532: HOTFIX: Fix broker bounce system tests
cadonna opened a new pull request #8532: URL: https://github.com/apache/kafka/pull/8532 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on issue #8532: HOTFIX: Fix broker bounce system tests
cadonna commented on issue #8532: URL: https://github.com/apache/kafka/pull/8532#issuecomment-617883911 Streams system tests run: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3914 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] leonardge opened a new pull request #8533: Fixed bug in log validator tests.
leonardge opened a new pull request #8533: URL: https://github.com/apache/kafka/pull/8533 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem
ijuma commented on a change in pull request #8517: URL: https://github.com/apache/kafka/pull/8517#discussion_r413077069 ## File path: core/src/main/scala/kafka/utils/DelayedItem.scala ## @@ -21,24 +21,19 @@ import java.util.concurrent._ import org.apache.kafka.common.utils.Time -import scala.math._ +class DelayedItem(val delayMs: Long, val time: Time) extends Logging { Review comment: Does `time` need to be a `val`? ## File path: core/src/main/scala/kafka/utils/DelayedItem.scala ## @@ -21,24 +21,19 @@ import java.util.concurrent._ import org.apache.kafka.common.utils.Time -import scala.math._ +class DelayedItem(val delayMs: Long, val time: Time) extends Logging { Review comment: Does `time` need to be a public `val`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem
lbradstreet commented on a change in pull request #8517: URL: https://github.com/apache/kafka/pull/8517#discussion_r413085641 ## File path: core/src/main/scala/kafka/utils/DelayedItem.scala ## @@ -21,24 +21,19 @@ import java.util.concurrent._ import org.apache.kafka.common.utils.Time -import scala.math._ +class DelayedItem(val delayMs: Long, val time: Time) extends Logging { Review comment: Good point. I'll make it private. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch
hachikuji commented on a change in pull request #8509: URL: https://github.com/apache/kafka/pull/8509#discussion_r413091037 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -3084,12 +3084,7 @@ class KafkaApis(val requestChannel: RequestChannel, // Broker epoch in LeaderAndIsr/UpdateMetadata/StopReplica request is unknown // if the controller hasn't been upgraded to use KIP-380 if (brokerEpochInRequest == AbstractControlRequest.UNKNOWN_BROKER_EPOCH) false -else { - val curBrokerEpoch = controller.brokerEpoch - if (brokerEpochInRequest < curBrokerEpoch) true - else if (brokerEpochInRequest == curBrokerEpoch) false - else throw new IllegalStateException(s"Epoch $brokerEpochInRequest larger than current broker epoch $curBrokerEpoch") -} +else brokerEpochInRequest < controller.brokerEpoch Review comment: Short comment here may be helpful about the case where the controller sees the epoch bump first. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on issue #8454: KAFKA-9844; Maximum number of members within a group is not always enforced due to a race condition in join group
dajac commented on issue #8454: URL: https://github.com/apache/kafka/pull/8454#issuecomment-617907578 @hachikuji Could we get this one merged? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #8532: HOTFIX: Fix broker bounce system tests
cadonna commented on a change in pull request #8532: URL: https://github.com/apache/kafka/pull/8532#discussion_r413110487 ## File path: tests/kafkatest/tests/streams/streams_broker_bounce_test.py ## @@ -164,7 +164,7 @@ def setup_system(self, start_processor=True, num_threads=3): # Start test harness self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka) -self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, num_threads) +self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, "at_least_once", num_threads) Review comment: Is it enough to specify the processing guarantee as `at_least_once` here or do you want also for this test to include all processing guarantees in the test matrix? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on issue #8532: HOTFIX: Fix broker bounce system tests
cadonna commented on issue #8532: URL: https://github.com/apache/kafka/pull/8532#issuecomment-617869532 Call for review: @mjsax @abbccdda This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #8532: HOTFIX: Fix broker bounce system tests
cadonna commented on a change in pull request #8532: URL: https://github.com/apache/kafka/pull/8532#discussion_r413116285 ## File path: tests/kafkatest/tests/streams/streams_broker_bounce_test.py ## @@ -164,7 +164,7 @@ def setup_system(self, start_processor=True, num_threads=3): # Start test harness self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka) -self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, num_threads) +self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, "at_least_once", num_threads) Review comment: On a different note, now that the processing guarantee can be passed to the service, do we still need `StreamsEosTestJobRunnerService` and `StreamsComplexEosTestJobRunnerService`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on issue #8520: Add explicit grace period to tumbling window example
vvcephei commented on issue #8520: URL: https://github.com/apache/kafka/pull/8520#issuecomment-617376010 Gah, I forgot to add the reviewers to the merge commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
vvcephei commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r412422082 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -16,128 +16,103 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClientOrNoCaughtUpClientsExist; + import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TaskMovement { -private static final Logger log = LoggerFactory.getLogger(TaskMovement.class); -final TaskId task; -final UUID source; -final UUID destination; +class TaskMovement { +private final TaskId task; +private final UUID destination; +private final SortedSet caughtUpClients; -TaskMovement(final TaskId task, final UUID source, final UUID destination) { +private TaskMovement(final TaskId task, final UUID destination, final SortedSet caughtUpClients) { this.task = task; -this.source = source; this.destination = destination; -} +this.caughtUpClients = caughtUpClients; -@Override -public boolean equals(final Object o) { -if (this == o) { -return true; -} -if (o == null || getClass() != o.getClass()) { -return false; +if (caughtUpClients == null || caughtUpClients.isEmpty()) { +throw new IllegalStateException("Should not attempt to move a task if no caught up clients exist"); } -final TaskMovement movement = (TaskMovement) o; -return Objects.equals(task, movement.task) && - Objects.equals(source, movement.source) && - Objects.equals(destination, movement.destination); -} - -@Override -public int hashCode() { -return Objects.hash(task, source, destination); } /** - * Computes the movement of tasks from the state constrained to the balanced assignment, up to the configured - * {@code max.warmup.replicas}. A movement corresponds to a warmup replica on the destination client, with - * a few exceptional cases: - * - * 1. Tasks whose destination clients are caught-up, or whose source clients are not caught-up, will be moved - * immediately from the source to the destination in the state constrained assignment - * 2. Tasks whose destination client previously had this task as a standby will not be counted towards the total - * {@code max.warmup.replicas}. Instead they will be counted against that task's total {@code num.standby.replicas}. - * - * @param statefulActiveTaskAssignment the initial, state constrained assignment, with the source clients - * @param balancedStatefulActiveTaskAssignment the final, balanced assignment, with the destination clients - * @return list of the task movements from statefulActiveTaskAssignment to balancedStatefulActiveTaskAssignment + * @return whether any warmup replicas were assigned */ -static List getMovements(final Map> statefulActiveTaskAssignment, - final Map> balancedStatefulActiveTaskAssignment, - final Map> tasksToCaughtUpClients, - final Map clientStates, - final Map tasksToRemainingStandbys, - final int maxWarmupReplicas) { -if (statefulActiveTaskAssignment.size() != balancedStatefulActiveTaskAssignment.size()) { -throw new IllegalStateException("Tried to compute movements but assignments differ in size."); -} +static boolean assignTaskMovements(final Map> statefulActiveTaskAssignment, + final Map> tasksToCaughtUpClients, + final Map clientStates, + final Map tasksToRemainingStandbys, + final int maxWarmupReplicas) { +boolean warmupReplicasAssigned = false; + +final ValidClientsByTaskLoadQueue clientsByTaskLoad = new ValidClientsByTaskLoadQueue( +clientStates, +(client, task) -> taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients) +); -final Map taskToDestinationClient = new HashMap<>(); -for (final Map.Entry> clientEntry :
[GitHub] [kafka] vvcephei commented on issue #8254: KIP-557: Add Emit On Change Support
vvcephei commented on issue #8254: URL: https://github.com/apache/kafka/pull/8254#issuecomment-617392150 I also took a look at the foreign-key join test, which is actually telling us something awesome: your feature allows us to drop _unnecessary_ tombstones that we'd otherwise send under some conditions. Anyway, it's complicated, so here's a fix for the test: ```diff --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java @@ -48,6 +48,7 @@ import java.util.Properties; import java.util.function.Function; import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; @@ -371,12 +372,16 @@ public class KTableKTableForeignKeyJoinIntegrationTest { // Deleting a non-joining record produces an unnecessary tombstone for inner joins, because // it's not possible to know whether a result was previously emitted. +// HOWEVER, when the final join result is materialized (either explicitly or +// implicitly by a subsequent join), we _can_ detect that the tombstone is unnecessary and drop it. // For the left join, the tombstone is necessary. left.pipeInput("lhs1", (String) null); { assertThat( outputTopic.readKeyValuesToMap(), -is(mkMap(mkEntry("lhs1", null))) +is(leftJoin || !(materialized || rejoin) + ? mkMap(mkEntry("lhs1", null)) + : emptyMap()) ); if (materialized) { assertThat( @@ -452,12 +457,15 @@ public class KTableKTableForeignKeyJoinIntegrationTest { } // "moving" our subscription to another non-existent FK results in an unnecessary tombstone for inner join, // since it impossible to know whether the prior FK existed or not (and thus whether any results have -// previously been emitted) +// previously been emitted). HOWEVER, when the final join result is materialized (either explicitly or +// implicitly by a subsequent join), we _can_ detect that the tombstone is unnecessary and drop it. // The left join emits a _necessary_ update (since the lhs record has actually changed) left.pipeInput("lhs1", "lhsValue1|rhs2"); assertThat( outputTopic.readKeyValuesToMap(), -is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs2,null)" : null))) +is(leftJoin + ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs2,null)")) + : (materialized || rejoin) ? emptyMap() : singletonMap("lhs1", null)) ); if (materialized) { assertThat( @@ -469,7 +477,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest { left.pipeInput("lhs1", "lhsValue1|rhs3"); assertThat( outputTopic.readKeyValuesToMap(), -is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs3,null)" : null))) +is(leftJoin + ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs3,null)")) + : (materialized || rejoin) ? emptyMap() : singletonMap("lhs1", null)) ); if (materialized) { assertThat( ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on issue #8254: KIP-557: Add Emit On Change Support
vvcephei commented on issue #8254: URL: https://github.com/apache/kafka/pull/8254#issuecomment-617423139 Ok, last one. The TransformValuesTest is just another case where the test input data is now considered idempotent, which is fine: ```diff --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java @@ -398,8 +398,8 @@ public class KTableTransformValuesTest { driver.createInputTopic(INPUT_TOPIC, new StringSerializer(), new StringSerializer()); inputTopic.pipeInput("A", "ignored", 5L); -inputTopic.pipeInput("A", "ignored", 15L); -inputTopic.pipeInput("A", "ignored", 10L); +inputTopic.pipeInput("A", "ignored1", 15L); +inputTopic.pipeInput("A", "ignored2", 10L); assertThat(output(), hasItems(new KeyValueTimestamp<>("A", "1", 5), new KeyValueTimestamp<>("A", "0", 15), ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on issue #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
vvcephei commented on issue #8497: URL: https://github.com/apache/kafka/pull/8497#issuecomment-617439325 Unrelated java 11 failures: kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryAllStalePartitionStores This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on issue #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
vvcephei commented on issue #8497: URL: https://github.com/apache/kafka/pull/8497#issuecomment-617364180 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on issue #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
vvcephei commented on issue #8497: URL: https://github.com/apache/kafka/pull/8497#issuecomment-617364803 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on issue #8254: KIP-557: Add Emit On Change Support
vvcephei commented on issue #8254: URL: https://github.com/apache/kafka/pull/8254#issuecomment-617423885 I hope I didn't step on your toes, @ConcurrencyPractitioner , but I just wanted to make sure that you're unblocked to finish up this PR. Figuring out what's exactly wrong with those tests and whether it's ok can be a bit subtle. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] surabhidixit opened a new pull request #8526: KAFKA-6867: corrected the typos in upgrade.html
surabhidixit opened a new pull request #8526: URL: https://github.com/apache/kafka/pull/8526 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on issue #8254: KIP-557: Add Emit On Change Support
vvcephei commented on issue #8254: URL: https://github.com/apache/kafka/pull/8254#issuecomment-617418517 The source topic restart integration test was actually just failing because the tests were polluting each others' topics. This is one way to fix it: ```diff diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java index 3ec239fab9..b42a5852a5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java @@ -100,6 +100,7 @@ public class KTableSourceTopicRestartIntegrationTest { @After public void after() throws Exception { IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG); +CLUSTER.deleteAllTopicsAndWait(60_000L); } @Test ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
vvcephei commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r412423011 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ValidClientsByTaskLoadQueue.java ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.UUID; +import java.util.function.BiFunction; +import org.apache.kafka.streams.processor.TaskId; + +/** + * Wraps a priority queue of clients and returns the next valid candidate(s) based on the current task assignment + */ +class ValidClientsByTaskLoadQueue { +private final PriorityQueue clientsByTaskLoad; +private final BiFunction validClientCriteria; + +ValidClientsByTaskLoadQueue(final Map clientStates, +final BiFunction validClientCriteria) { Review comment: Ah, sorry about that @ableegoldman ; I wasn't able (or was too lazy) to follow the `git praise` trail through the class movement. Well, kudos to you, then. :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
vvcephei commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r412419973 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -16,128 +16,94 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient; + import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TaskMovement { -private static final Logger log = LoggerFactory.getLogger(TaskMovement.class); - final TaskId task; -final UUID source; -final UUID destination; +private final UUID destination; -TaskMovement(final TaskId task, final UUID source, final UUID destination) { +TaskMovement(final TaskId task, final UUID destination) { this.task = task; -this.source = source; this.destination = destination; } -@Override -public boolean equals(final Object o) { -if (this == o) { -return true; -} -if (o == null || getClass() != o.getClass()) { -return false; -} -final TaskMovement movement = (TaskMovement) o; -return Objects.equals(task, movement.task) && - Objects.equals(source, movement.source) && - Objects.equals(destination, movement.destination); -} - -@Override -public int hashCode() { -return Objects.hash(task, source, destination); -} - /** - * Computes the movement of tasks from the state constrained to the balanced assignment, up to the configured - * {@code max.warmup.replicas}. A movement corresponds to a warmup replica on the destination client, with - * a few exceptional cases: - * - * 1. Tasks whose destination clients are caught-up, or whose source clients are not caught-up, will be moved - * immediately from the source to the destination in the state constrained assignment - * 2. Tasks whose destination client previously had this task as a standby will not be counted towards the total - * {@code max.warmup.replicas}. Instead they will be counted against that task's total {@code num.standby.replicas}. - * - * @param statefulActiveTaskAssignment the initial, state constrained assignment, with the source clients - * @param balancedStatefulActiveTaskAssignment the final, balanced assignment, with the destination clients - * @return list of the task movements from statefulActiveTaskAssignment to balancedStatefulActiveTaskAssignment + * @return whether any warmup replicas were assigned */ -static List getMovements(final Map> statefulActiveTaskAssignment, - final Map> balancedStatefulActiveTaskAssignment, - final Map> tasksToCaughtUpClients, - final Map clientStates, - final Map tasksToRemainingStandbys, - final int maxWarmupReplicas) { -if (statefulActiveTaskAssignment.size() != balancedStatefulActiveTaskAssignment.size()) { -throw new IllegalStateException("Tried to compute movements but assignments differ in size."); -} +static boolean assignTaskMovements(final Map> statefulActiveTaskAssignment, + final Map> tasksToCaughtUpClients, + final Map clientStates, + final Map tasksToRemainingStandbys, + final int maxWarmupReplicas) { +boolean warmupReplicasAssigned = false; + +final ValidClientsByTaskLoadQueue clientsByTaskLoad = +new ValidClientsByTaskLoadQueue( +clientStates, +(client, task) -> taskIsCaughtUpOnClient(task, client, tasksToCaughtUpClients) +); -final Map taskToDestinationClient = new HashMap<>(); -for (final Map.Entry> clientEntry : balancedStatefulActiveTaskAssignment.entrySet()) { -final UUID destination = clientEntry.getKey(); -for (final TaskId task : clientEntry.getValue()) { -taskToDestinationClient.put(task, destination); +final SortedSet taskMovements = new TreeSet<>( +(movement, other) -> { +final int numCaughtUpClients = tasksToCaughtUpClients.get(movement.task).size(); +final int
[GitHub] [kafka] vvcephei commented on issue #8254: KIP-557: Add Emit On Change Support
vvcephei commented on issue #8254: URL: https://github.com/apache/kafka/pull/8254#issuecomment-617372772 Hey @ConcurrencyPractitioner , sorry it took so long. It's just again because the test happened to expect idempotent updates to flow through regularly, but not for anything important. Just changing the value of the "tick" record the second time fixes it without breaking anything about the test. Here's my diff: ```diff --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java @@ -615,11 +615,11 @@ public class SuppressScenarioTest { ); -inputTopicRight.pipeInput("tick", "tick", 21L); +inputTopicRight.pipeInput("tick", "tick1", 21L); verify( drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), asList( -new KeyValueTimestamp<>("tick", "(null,tick)", 21), // just a testing artifact +new KeyValueTimestamp<>("tick", "(null,tick1)", 21), // just a testing artifact new KeyValueTimestamp<>("A", "(b,2)", 13L) ) ); @@ -703,11 +703,11 @@ public class SuppressScenarioTest { ); -inputTopicLeft.pipeInput("tick", "tick", 21L); +inputTopicLeft.pipeInput("tick", "tick1", 21L); verify( drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), asList( -new KeyValueTimestamp<>("tick", "(tick,null)", 21), // just a testing artifact +new KeyValueTimestamp<>("tick", "(tick1,null)", 21), // just a testing artifact new KeyValueTimestamp<>("A", "(2,b)", 13L) ) ); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable
abbccdda commented on a change in pull request #8486: URL: https://github.com/apache/kafka/pull/8486#discussion_r413442881 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ## @@ -498,6 +498,104 @@ public void validateOffsetsIfNeeded() { validateOffsetsAsync(partitionsToValidate); } +/** + * For each partition which needs validation, make an asynchronous request to get the end-offsets for the partition + * with the epoch less than or equal to the epoch the partition last saw. + * + * Requests are grouped by Node for efficiency. + */ +private void validateOffsetsAsync(Map partitionsToValidate) { +final Map> regrouped = +regroupFetchPositionsByLeader(partitionsToValidate); + +regrouped.forEach((node, fetchPositions) -> { +if (node.isEmpty()) { +metadata.requestUpdate(); +return; +} + +NodeApiVersions nodeApiVersions = apiVersions.get(node.idString()); +if (nodeApiVersions == null) { +client.tryConnect(node); +return; +} + +if (!hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) { +log.debug("Skipping validation of fetch offsets for partitions {} since the broker does not " + + "support the required protocol version (introduced in Kafka 2.3)", +fetchPositions.keySet()); +completeAllValidations(fetchPositions); +return; +} + +// We need to get the client epoch state before sending out the leader epoch request, and use it to +// decide whether we need to validate offsets. +if (!metadata.hasReliableLeaderEpochs()) { Review comment: Np, will do This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jiameixie commented on issue #8529: KAFKA-9901:Fix streams_broker_bounce_test error
jiameixie commented on issue #8529: URL: https://github.com/apache/kafka/pull/8529#issuecomment-618169833 @guozhangwang @ijuma @junrao PTAL, thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #8524: KAFKA-9866: Avoid election for topics where preferred leader is not in ISR
junrao commented on a change in pull request #8524: URL: https://github.com/apache/kafka/pull/8524#discussion_r413436495 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -1068,7 +1068,9 @@ class KafkaController(val config: KafkaConfig, val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => controllerContext.isReplicaOnline(leaderBroker, tp) && controllerContext.partitionsBeingReassigned.isEmpty && !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) && - controllerContext.allTopics.contains(tp.topic)) + controllerContext.allTopics.contains(tp.topic) && + controllerContext.partitionLeadershipInfo.get(tp).forall(l => l.leaderAndIsr.isr.contains(leaderBroker)) Review comment: The preferred leader election also checks for live brokers. So, perhaps we could just call PartitionLeaderElectionAlgorithms.preferredReplicaPartitionLeaderElection() here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on issue #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch
hachikuji commented on issue #8509: URL: https://github.com/apache/kafka/pull/8509#issuecomment-618070328 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on issue #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch
hachikuji commented on issue #8509: URL: https://github.com/apache/kafka/pull/8509#issuecomment-618070266 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8326: KAFKA-8639: Replace AddPartitionsToTxn with Automated Protocol
guozhangwang commented on a change in pull request #8326: URL: https://github.com/apache/kafka/pull/8326#discussion_r413374355 ## File path: clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java ## @@ -17,157 +17,109 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData; +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic; +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.utils.CollectionUtils; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_EPOCH; -import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_ID; -import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; -import static org.apache.kafka.common.protocol.CommonFields.TRANSACTIONAL_ID; -import static org.apache.kafka.common.protocol.types.Type.INT32; - public class AddPartitionsToTxnRequest extends AbstractRequest { -private static final String TOPICS_KEY_NAME = "topics"; -private static final String PARTITIONS_KEY_NAME = "partitions"; - -private static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V0 = new Schema( -TRANSACTIONAL_ID, -PRODUCER_ID, -PRODUCER_EPOCH, -new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema( -TOPIC_NAME, -new Field(PARTITIONS_KEY_NAME, new ArrayOf(INT32, -"The partitions to add to the transaction.")); - -/** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ -private static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V1 = ADD_PARTITIONS_TO_TXN_REQUEST_V0; - -public static Schema[] schemaVersions() { -return new Schema[]{ADD_PARTITIONS_TO_TXN_REQUEST_V0, ADD_PARTITIONS_TO_TXN_REQUEST_V1}; -} + +public final AddPartitionsToTxnRequestData data; public static class Builder extends AbstractRequest.Builder { -private final String transactionalId; -private final long producerId; -private final short producerEpoch; -private final List partitions; +public final AddPartitionsToTxnRequestData data; -public Builder(String transactionalId, long producerId, short producerEpoch, List partitions) { +public Builder(final AddPartitionsToTxnRequestData data) { super(ApiKeys.ADD_PARTITIONS_TO_TXN); -this.transactionalId = transactionalId; -this.producerId = producerId; -this.producerEpoch = producerEpoch; -this.partitions = partitions; +this.data = data; +} + +public Builder(final String transactionalId, + final long producerId, + final short producerEpoch, + final List partitions) { +super(ApiKeys.ADD_PARTITIONS_TO_TXN); + +Map> partitionMap = new HashMap<>(); +for (TopicPartition topicPartition : partitions) { +String topicName = topicPartition.topic(); + +List subPartitions = partitionMap.getOrDefault(topicName, +new ArrayList<>()); +subPartitions.add(topicPartition.partition()); +partitionMap.put(topicName, subPartitions); +} + +AddPartitionsToTxnTopicCollection topics = new AddPartitionsToTxnTopicCollection(); +for (Map.Entry> partitionEntry : partitionMap.entrySet()) { +topics.add(new AddPartitionsToTxnTopic() + .setName(partitionEntry.getKey()) + .setPartitions(partitionEntry.getValue())); +} + +this.data = new AddPartitionsToTxnRequestData() +.setTransactionalId(transactionalId) +.setProducerId(producerId) +.setProducerEpoch(producerEpoch) +.setTopics(topics); } @Override public AddPartitionsToTxnRequest build(short version) { -return new AddPartitionsToTxnRequest(version, transactionalId, producerId, producerEpoch, partitions); +return new AddPartitionsToTxnRequest(data, version);
[GitHub] [kafka] abbccdda commented on a change in pull request #8326: KAFKA-8639: Replace AddPartitionsToTxn with Automated Protocol
abbccdda commented on a change in pull request #8326: URL: https://github.com/apache/kafka/pull/8326#discussion_r413471840 ## File path: clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java ## @@ -17,157 +17,109 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData; +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic; +import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.utils.CollectionUtils; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_EPOCH; -import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_ID; -import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; -import static org.apache.kafka.common.protocol.CommonFields.TRANSACTIONAL_ID; -import static org.apache.kafka.common.protocol.types.Type.INT32; - public class AddPartitionsToTxnRequest extends AbstractRequest { -private static final String TOPICS_KEY_NAME = "topics"; -private static final String PARTITIONS_KEY_NAME = "partitions"; - -private static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V0 = new Schema( -TRANSACTIONAL_ID, -PRODUCER_ID, -PRODUCER_EPOCH, -new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema( -TOPIC_NAME, -new Field(PARTITIONS_KEY_NAME, new ArrayOf(INT32, -"The partitions to add to the transaction.")); - -/** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ -private static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V1 = ADD_PARTITIONS_TO_TXN_REQUEST_V0; - -public static Schema[] schemaVersions() { -return new Schema[]{ADD_PARTITIONS_TO_TXN_REQUEST_V0, ADD_PARTITIONS_TO_TXN_REQUEST_V1}; -} + +public final AddPartitionsToTxnRequestData data; public static class Builder extends AbstractRequest.Builder { -private final String transactionalId; -private final long producerId; -private final short producerEpoch; -private final List partitions; +public final AddPartitionsToTxnRequestData data; -public Builder(String transactionalId, long producerId, short producerEpoch, List partitions) { +public Builder(final AddPartitionsToTxnRequestData data) { super(ApiKeys.ADD_PARTITIONS_TO_TXN); -this.transactionalId = transactionalId; -this.producerId = producerId; -this.producerEpoch = producerEpoch; -this.partitions = partitions; +this.data = data; +} + +public Builder(final String transactionalId, + final long producerId, + final short producerEpoch, + final List partitions) { +super(ApiKeys.ADD_PARTITIONS_TO_TXN); + +Map> partitionMap = new HashMap<>(); +for (TopicPartition topicPartition : partitions) { +String topicName = topicPartition.topic(); + +List subPartitions = partitionMap.getOrDefault(topicName, +new ArrayList<>()); +subPartitions.add(topicPartition.partition()); +partitionMap.put(topicName, subPartitions); +} + +AddPartitionsToTxnTopicCollection topics = new AddPartitionsToTxnTopicCollection(); +for (Map.Entry> partitionEntry : partitionMap.entrySet()) { +topics.add(new AddPartitionsToTxnTopic() + .setName(partitionEntry.getKey()) + .setPartitions(partitionEntry.getValue())); +} + +this.data = new AddPartitionsToTxnRequestData() +.setTransactionalId(transactionalId) +.setProducerId(producerId) +.setProducerEpoch(producerEpoch) +.setTopics(topics); } @Override public AddPartitionsToTxnRequest build(short version) { -return new AddPartitionsToTxnRequest(version, transactionalId, producerId, producerEpoch, partitions); +return new AddPartitionsToTxnRequest(data, version); }
[GitHub] [kafka] hachikuji commented on issue #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch
hachikuji commented on issue #8509: URL: https://github.com/apache/kafka/pull/8509#issuecomment-618147106 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8248: KAFKA-9501: convert between active and standby without closing stores
ableegoldman commented on a change in pull request #8248: URL: https://github.com/apache/kafka/pull/8248#discussion_r391363596 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java ## @@ -132,47 +130,74 @@ private static String getTaskProducerClientId(final String threadClientId, final partitions ); -if (threadProducer == null) { -final String taskProducerClientId = getTaskProducerClientId(threadId, taskId); -final Map producerConfigs = config.getProducerConfigs(taskProducerClientId); -producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, applicationId + "-" + taskId); -log.info("Creating producer client for task {}", taskId); -taskProducers.put(taskId, clientSupplier.getProducer(producerConfigs)); -} - -final RecordCollector recordCollector = new RecordCollectorImpl( -logContext, -taskId, -consumer, -threadProducer != null ? -new StreamsProducer(threadProducer, false, logContext, applicationId) : -new StreamsProducer(taskProducers.get(taskId), true, logContext, applicationId), -config.defaultProductionExceptionHandler(), - EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)), -streamsMetrics -); - -final Task task = new StreamTask( +createdTasks.add(createStreamTask( taskId, partitions, -topology, consumer, -config, -streamsMetrics, -stateDirectory, -cache, -time, +logContext, stateManager, -recordCollector -); - -log.trace("Created task {} with assigned partitions {}", taskId, partitions); -createdTasks.add(task); -createTaskSensor.record(); +topology)); } return createdTasks; } +private StreamTask createStreamTask(final TaskId taskId, +final Set partitions, +final Consumer consumer, +final LogContext logContext, +final ProcessorStateManager stateManager, +final ProcessorTopology topology) { +if (threadProducer == null) { +final String taskProducerClientId = getTaskProducerClientId(threadId, taskId); +final Map producerConfigs = config.getProducerConfigs(taskProducerClientId); +producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, applicationId + "-" + taskId); +log.info("Creating producer client for task {}", taskId); +taskProducers.put(taskId, clientSupplier.getProducer(producerConfigs)); +} + +final RecordCollector recordCollector = new RecordCollectorImpl( +logContext, +taskId, +consumer, +threadProducer != null ? +new StreamsProducer(threadProducer, false, logContext, applicationId) : + new StreamsProducer(taskProducers.get(taskId), true, logContext, applicationId), +config.defaultProductionExceptionHandler(), + EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)), +streamsMetrics +); + +final StreamTask task = new StreamTask( +taskId, +partitions, +topology, +consumer, +config, +streamsMetrics, +stateDirectory, +cache, +time, +stateManager, +recordCollector +); + +log.trace("Created task {} with assigned partitions {}", taskId, partitions); +createTaskSensor.record(); +return task; +} + +StreamTask convertStandbyToActive(final StandbyTask standbyTask, + final Set partitions, + final Consumer consumer) { +return createStreamTask( +standbyTask.id, +partitions, +consumer, +getLogContext(standbyTask.id), +standbyTask.stateMgr, +standbyTask.topology); Review comment: The `topology` is created but never initialized for a standby, therefore we don't need to worry about closing it and can reuse it here This is an automated message from the Apache Git Service.
[GitHub] [kafka] chia7712 commented on issue #5935: KAFKA-7665: Replace BaseConsumerRecord with ConsumerRecord in MM
chia7712 commented on issue #5935: URL: https://github.com/apache/kafka/pull/5935#issuecomment-618162318 @huxihx Are you still working on this? I'd like to complete both of KIP and this PR :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8504: KAFKA-9298: reuse mapped stream error in joins
mjsax commented on a change in pull request #8504: URL: https://github.com/apache/kafka/pull/8504#discussion_r413380852 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ## @@ -989,16 +994,18 @@ private void to(final TopicNameExtractor topicExtractor, null, optimizableRepartitionNodeBuilder); -final OptimizableRepartitionNode optimizableRepartitionNode = optimizableRepartitionNodeBuilder.build(); -builder.addGraphNode(streamsGraphNode, optimizableRepartitionNode); +if (repartitionNode == null || !name.equals(repartitionName)) { Review comment: Hmmm... I am wondering if just bumping the index would be sufficient and the optimizer would merge the node automatically? I am also not sure about the code structure: so far, the DSL layer does not know much about optimizations (even if we "leak" a little bit into it, as we built up the `StreamsGraphNode` graph... We would push some optimization decisions into the DSL layer thus spreading out "optimization code"? On the other hand, just inserting one `OptimizableRepartitionNode` is much more efficient than inserting multiple and let the optimizer remove them later? I am also wondering, if we could do the same for other repartition topics? Last question: this method is also use for stream-table joins and thus, if one joins a stream with two tables, would this change be backward incompatible? Or would two stream-table joins fail with the same `InvalidTopologyException`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #8069: KAFKA-9374: Make connector interactions asynchronous
C0urante commented on a change in pull request #8069: URL: https://github.com/apache/kafka/pull/8069#discussion_r413499718 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ## @@ -263,17 +267,20 @@ public boolean startConnector( Plugins.compareAndSwapLoaders(savedLoader); workerMetricsGroup.recordConnectorStartupFailure(); statusListener.onFailure(connName, t); -return false; +onConnectorStateChange.onCompletion(t, null); +return; } +workerConnector.transitionTo(initialState, onConnectorStateChange); Review comment: This part still needs some work; it's in an inconsistent state because I modified `Worker::startConnector` to have no return value and instead communicate all success or failure of the connector startup through the callback, but haven't taken care of issues like possibly invoking the callback twice (once in this method, and once in the `WorkerConnector` instance), making sure to swap plugin classloaders at the right times, and preventing a possible race with the check to see if the connector already exists based on whether its name is present as a key in the `connectors` map. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jiameixie commented on issue #8446: KAFKA-9804:Extract consumer configs out of PerfConfig
jiameixie commented on issue #8446: URL: https://github.com/apache/kafka/pull/8446#issuecomment-618169937 @guozhangwang @ijuma @junrao PTAL, thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on issue #8504: KAFKA-9298: reuse mapped stream error in joins
mjsax commented on issue #8504: URL: https://github.com/apache/kafka/pull/8504#issuecomment-618073161 > Ideally, the fix should be to generate a repartition topic name each time to avoid such issues. But IMHO that ship has already sailed because by introducing a new name generation will cause compatibility issues for existing topologies. Why that? Because such a topology would hit the bug, it could never be deployed, and thus nobody can actually run such a topology? In fact, shouldn't be "burn" and index even if a name is provided (IIRC, we do this for some cases)? I agree thought, that merging repartition topics (as proposed in (1)) should be done if possible (it's a historic artifact that we did not merge them in the past and IMHO we should not make the same mistake again?). For (2), it's a tricky question because the different names are used for different stores and changelog topics (ie, main purpose?) -- it seems to be a "nasty side effect" if we would end up with two repartition topics for this case? Of course, given the new `repartition()` operator, a user can work around it by using it after `map()` and before calling `join()`. Just brainstorming here what the impact could be and what tradeoff we want to pick. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on issue #8534: KAFKA-9512: Backport Flaky Test LagFetchIntegrationTest.shouldFetchLagsDuringRestoration
guozhangwang commented on issue #8534: URL: https://github.com/apache/kafka/pull/8534#issuecomment-618104185 Thanks @vinothchandar !! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable
guozhangwang commented on a change in pull request #8486: URL: https://github.com/apache/kafka/pull/8486#discussion_r413417021 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ## @@ -498,6 +498,104 @@ public void validateOffsetsIfNeeded() { validateOffsetsAsync(partitionsToValidate); } +/** + * For each partition which needs validation, make an asynchronous request to get the end-offsets for the partition + * with the epoch less than or equal to the epoch the partition last saw. + * + * Requests are grouped by Node for efficiency. + */ +private void validateOffsetsAsync(Map partitionsToValidate) { +final Map> regrouped = +regroupFetchPositionsByLeader(partitionsToValidate); + +regrouped.forEach((node, fetchPositions) -> { +if (node.isEmpty()) { +metadata.requestUpdate(); +return; +} + +NodeApiVersions nodeApiVersions = apiVersions.get(node.idString()); +if (nodeApiVersions == null) { +client.tryConnect(node); +return; +} + +if (!hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) { +log.debug("Skipping validation of fetch offsets for partitions {} since the broker does not " + + "support the required protocol version (introduced in Kafka 2.3)", +fetchPositions.keySet()); +completeAllValidations(fetchPositions); +return; +} + +// We need to get the client epoch state before sending out the leader epoch request, and use it to +// decide whether we need to validate offsets. +if (!metadata.hasReliableLeaderEpochs()) { +log.debug("Skipping validation of fetch offsets for partitions {} since the provided leader broker " + + "is not reliable", fetchPositions.keySet()); +completeAllValidations(fetchPositions); +return; +} + +subscriptions.setNextAllowedRetry(fetchPositions.keySet(), time.milliseconds() + requestTimeoutMs); + +RequestFuture future = +offsetsForLeaderEpochClient.sendAsyncRequest(node, fetchPositions); + +future.addListener(new RequestFutureListener() { +@Override +public void onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsResult) { +Map truncationWithoutResetPolicy = new HashMap<>(); +if (!offsetsResult.partitionsToRetry().isEmpty()) { + subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(), time.milliseconds() + retryBackoffMs); +metadata.requestUpdate(); +} + +// For each OffsetsForLeader response, check if the end-offset is lower than our current offset +// for the partition. If so, it means we have experienced log truncation and need to reposition +// that partition's offset. +// +// In addition, check whether the returned offset and epoch are valid. If not, then we should treat +// it as out of range and update metadata for rediscovery. +offsetsResult.endOffsets().forEach((respTopicPartition, respEndOffset) -> { +if (respEndOffset.hasUndefinedEpochOrOffset()) { +// Should attempt to find the new leader in the next try. +log.debug("Requesting metadata update for partition {} due to undefined epoch or offset {}", Review comment: nit: `... or offset {} from OffsetsForLeaderEpoch response` ## File path: clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java ## @@ -86,4 +84,9 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(error, leaderEpoch, endOffset); } + +public boolean hasUndefinedEpochOrOffset() { +return this.endOffset == UNDEFINED_EPOCH_OFFSET || Review comment: For my own understanding: if endOffset is UNDEFINED the epoch should always be UNDEFINED too? If that's the case we can just rely on `leaderEpoch` alone? ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ## @@ -498,6 +498,104 @@ public void validateOffsetsIfNeeded() { validateOffsetsAsync(partitionsToValidate); } +/** + * For each partition which needs validation, make an asynchronous request to get the end-offsets for the partition + * with the epoch less than or equal to the epoch the partition last saw. + * + * Requests are grouped by Node for efficiency. +
[GitHub] [kafka] jiameixie commented on issue #8489: KAFKA-9857:Failed to build image ducker-ak-openjdk-8 on arm
jiameixie commented on issue #8489: URL: https://github.com/apache/kafka/pull/8489#issuecomment-618132157 @guozhangwang @ijuma @junrao PTAL, thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #8326: KAFKA-8639: Replace AddPartitionsToTxn with Automated Protocol
abbccdda commented on a change in pull request #8326: URL: https://github.com/apache/kafka/pull/8326#discussion_r413473580 ## File path: core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala ## @@ -27,9 +27,9 @@ import org.junit.{Before, Test} import scala.jdk.CollectionConverters._ -class AddPartitionsToTxnRequestTest extends BaseRequestTest { - private val topic1 = "foobartopic" - val numPartitions = 3 +class AddPartitionsToTxnRequestServerTest extends BaseRequestTest { Review comment: Because its name has a conflict with our added `AddPartitionsToTxnRequestTest`, just want to clarify that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vinothchandar commented on a change in pull request #8462: KAFKA-9846: Filter active tasks for running state in KafkaStreams#allLocalStorePartitionLags()
vinothchandar commented on a change in pull request #8462: URL: https://github.com/apache/kafka/pull/8462#discussion_r412690139 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java ## @@ -295,9 +295,10 @@ public void onRestoreEnd(final TopicPartition topicPartition, final String store } }); -restartedStreams.start(); +// Wait till the restarted instance reaches running, after restoring + startApplicationAndWaitUntilRunning(Collections.singletonList(restartedStreams), Duration.ofSeconds(60)); Review comment: this is the actual fix for flaky test .. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #8311: KAFKA-9434: automated protocol for alterReplicaLogDirs
dajac commented on a change in pull request #8311: URL: https://github.com/apache/kafka/pull/8311#discussion_r412287554 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -2207,21 +2212,33 @@ public AlterReplicaLogDirsResult alterReplicaLogDirs(Map()); -Map> replicaAssignmentByBroker = new HashMap<>(); +Map replicaAssignmentByBroker = new HashMap<>(); for (Map.Entry entry: replicaAssignment.entrySet()) { TopicPartitionReplica replica = entry.getKey(); String logDir = entry.getValue(); int brokerId = replica.brokerId(); TopicPartition topicPartition = new TopicPartition(replica.topic(), replica.partition()); Review comment: `topicPartition` is not used except for getting the topic and the partition above. `replica.topic()` and `replica.partition()` could be directly used instead. ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -2207,21 +2212,33 @@ public AlterReplicaLogDirsResult alterReplicaLogDirs(Map()); -Map> replicaAssignmentByBroker = new HashMap<>(); +Map replicaAssignmentByBroker = new HashMap<>(); for (Map.Entry entry: replicaAssignment.entrySet()) { TopicPartitionReplica replica = entry.getKey(); String logDir = entry.getValue(); int brokerId = replica.brokerId(); TopicPartition topicPartition = new TopicPartition(replica.topic(), replica.partition()); -if (!replicaAssignmentByBroker.containsKey(brokerId)) -replicaAssignmentByBroker.put(brokerId, new HashMap<>()); -replicaAssignmentByBroker.get(brokerId).put(topicPartition, logDir); +AlterReplicaLogDirsRequestData value = replicaAssignmentByBroker.computeIfAbsent(brokerId, Review comment: nit: Could we rename `value` to something like `alterReplicaLogDirs`? ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -2207,21 +2212,33 @@ public AlterReplicaLogDirsResult alterReplicaLogDirs(Map()); -Map> replicaAssignmentByBroker = new HashMap<>(); +Map replicaAssignmentByBroker = new HashMap<>(); for (Map.Entry entry: replicaAssignment.entrySet()) { TopicPartitionReplica replica = entry.getKey(); String logDir = entry.getValue(); int brokerId = replica.brokerId(); TopicPartition topicPartition = new TopicPartition(replica.topic(), replica.partition()); -if (!replicaAssignmentByBroker.containsKey(brokerId)) -replicaAssignmentByBroker.put(brokerId, new HashMap<>()); -replicaAssignmentByBroker.get(brokerId).put(topicPartition, logDir); +AlterReplicaLogDirsRequestData value = replicaAssignmentByBroker.computeIfAbsent(brokerId, +key -> new AlterReplicaLogDirsRequestData()); +AlterReplicaLogDir alterReplicaLogDir = value.dirs().find(logDir); +if (alterReplicaLogDir == null) { +alterReplicaLogDir = new AlterReplicaLogDir(); +alterReplicaLogDir.setPath(logDir); +value.dirs().add(alterReplicaLogDir); +} +AlterReplicaLogDirTopic alterReplicaLogDirTopic = alterReplicaLogDir.topics().find(topicPartition.topic()); +if (alterReplicaLogDirTopic == null) { +alterReplicaLogDirTopic = new AlterReplicaLogDirTopic(); +alterReplicaLogDir.topics().add(alterReplicaLogDirTopic); +} +alterReplicaLogDirTopic.setName(topicPartition.topic()) Review comment: `setName` could be done only once within the if statement. ## File path: clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java ## @@ -17,122 +17,53 @@ package org.apache.kafka.common.requests; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; -import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.utils.CollectionUtils; - import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; -import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; -import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; -import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; -import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; +import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData; +import
[GitHub] [kafka] dajac commented on a change in pull request #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch
dajac commented on a change in pull request #8509: URL: https://github.com/apache/kafka/pull/8509#discussion_r412807377 ## File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ## @@ -1553,6 +1552,179 @@ class KafkaApisTest { assertEquals(Errors.INVALID_REQUEST, response.error) } + @Test + def testUpdateMetadataRequestWithCurrentBrokerEpoch(): Unit = { +val currentBrokerEpoch = 1239875L +testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE) + } + + @Test + def testUpdateMetadataRequestWithNewerBrokerEpochIsValid(): Unit = { +val currentBrokerEpoch = 1239875L +testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch + 1, Errors.NONE) + } + + @Test + def testUpdateMetadataRequestWithStaleBrokerEpochIsRejected(): Unit = { +val currentBrokerEpoch = 1239875L +testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch - 1, Errors.STALE_BROKER_EPOCH) + } + + def testUpdateMetadataRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = { +val updateMetadataRequest = createBasicMetadataRequest("topicA", 1, brokerEpochInRequest) +val request = buildRequest(updateMetadataRequest) + +val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture() + +EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch) +EasyMock.expect(replicaManager.maybeUpdateMetadataCache( + EasyMock.eq(request.context.correlationId), + EasyMock.anyObject() +)).andStubReturn( + Seq() +) + + EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse))) +EasyMock.replay(replicaManager, controller, requestChannel) + +createKafkaApis().handleUpdateMetadataRequest(request) +val updateMetadataResponse = readResponse(ApiKeys.UPDATE_METADATA, updateMetadataRequest, capturedResponse) + .asInstanceOf[UpdateMetadataResponse] +assertEquals(expectedError, updateMetadataResponse.error()) +EasyMock.verify(replicaManager) + } + + @Test + def testLeaderAndIsrRequestWithCurrentBrokerEpoch(): Unit = { +val currentBrokerEpoch = 1239875L +testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE) + } + + @Test + def testLeaderAndIsrRequestWithNewerBrokerEpochIsValid(): Unit = { +val currentBrokerEpoch = 1239875L +testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch + 1, Errors.NONE) + } + + @Test + def testLeaderAndIsrRequestWithStaleBrokerEpochIsRejected(): Unit = { +val currentBrokerEpoch = 1239875L +testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch - 1, Errors.STALE_BROKER_EPOCH) + } + + def testLeaderAndIsrRequest(currentBrokerEpoch: Long, brokerEpochInRequest: Long, expectedError: Errors): Unit = { +val controllerId = 2 +val controllerEpoch = 6 +val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture() +val partitionStates = Seq( + new LeaderAndIsrRequestData.LeaderAndIsrPartitionState() +.setTopicName("topicW") +.setPartitionIndex(1) +.setControllerEpoch(1) +.setLeader(0) +.setLeaderEpoch(1) +.setIsr(asList(0, 1)) +.setZkVersion(2) +.setReplicas(asList(0, 1, 2)) +.setIsNew(false) +).asJava +val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder( + ApiKeys.LEADER_AND_ISR.latestVersion, + controllerId, + controllerEpoch, + brokerEpochInRequest, + partitionStates, + asList(new Node(0, "host0", 9090), new Node(1, "host1", 9091)) +).build() +val request = buildRequest(leaderAndIsrRequest) +val response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData() + .setErrorCode(Errors.NONE.code) + .setPartitionErrors(asList())) + +EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch) +EasyMock.expect(replicaManager.becomeLeaderOrFollower( + EasyMock.eq(request.context.correlationId), + EasyMock.anyObject(), + EasyMock.anyObject() +)).andStubReturn( + response +) + + EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse))) +EasyMock.replay(replicaManager, controller, requestChannel) + +createKafkaApis().handleLeaderAndIsrRequest(request) +val leaderAndIsrResponse = readResponse(ApiKeys.LEADER_AND_ISR, leaderAndIsrRequest, capturedResponse) + .asInstanceOf[LeaderAndIsrResponse] +assertEquals(expectedError, leaderAndIsrResponse.error()) +EasyMock.verify(replicaManager) + } + + @Test + def testStopReplicaRequestWithCurrentBrokerEpoch(): Unit = { +val currentBrokerEpoch = 1239875L +testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch, Errors.NONE) Review comment: This and the two above should use `testStopReplicaRequest` instead of `testUpdateMetadataRequest`. ## File path:
[GitHub] [kafka] iceChen8123 commented on issue #233: KAFKA-2419; Garbage collect unused sensors
iceChen8123 commented on issue #233: URL: https://github.com/apache/kafka/pull/233#issuecomment-617512892 Execuse me. Is this bug fix now? I just see a ExpireSensorTask in Metrics, but nowhere can use. Then I see beyond 800,000 sensors existed in my application, and it is in Old generation ,can't be GC. I use kafka-clients version 2.1.1 . This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on a change in pull request #8417: KAFKA-8955: Add an AbstractResponse#errorCounts(Stream) and tidy
tombentley commented on a change in pull request #8417: URL: https://github.com/apache/kafka/pull/8417#discussion_r412754269 ## File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java ## @@ -57,6 +59,10 @@ public ByteBuffer serialize(ApiKeys apiKey, short version, int correlationId) { return Collections.singletonMap(error, 1); } +protected Map errorCounts(Stream errors) { +return errors.collect(Collectors.groupingBy(e -> e, Collectors.summingInt(e -> 1))); +} + protected Map errorCounts(Collection errors) { Review comment: @chia7712 you're right, but the only two remaining callers of this method are for RPCs which haven't been converted to the message generator. There's little benefit to changing them when there are already PRs for converting those RPCs, and I'm planning to remove this method entirely when those PRs have been merged. I guess I could mark this method as `@Deprecated`. Relatedly there's only a single caller of the `apiErrorCounts(Map errors)` method which is also for an not-yet-converted RPC with a PR. If this gets merged first I'll be able to remove `apiErrorCounts(Map errors)` in that PR. ## File path: clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsResponse.java ## @@ -52,10 +51,9 @@ protected Struct toStruct(short version) { @Override public Map errorCounts() { Map counts = new HashMap<>(); Review comment: @chia7712 what I've tried to do in this PR so far is: * Change `for` stmt + updateErrorCounts to use `forEach` consistently * Change calls to `errorCounts(Collection)` to `errorCounts(Stream)` I've not tried to change all code to use either `forEach` or `errorCounts(Stream)`. Obviously we could do that, but @ijuma seems happy enough with continuing to have these two ways to do it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] zshuo commented on issue #8224: KAFKA-9704: Fix the issue z/OS won't let us resize file when mmap.
zshuo commented on issue #8224: URL: https://github.com/apache/kafka/pull/8224#issuecomment-618311862 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #8527: Remove dependencies on deprecated --zookeeper command flags in junit tests
cmccabe commented on a change in pull request #8527: URL: https://github.com/apache/kafka/pull/8527#discussion_r413720950 ## File path: core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala ## @@ -293,13 +293,6 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin } } - @Test - def testCreateWithUnspecifiedReplicationFactorAndPartitionsWithZkClient(): Unit = { Review comment: It's weird that this is even in this file. This should be moved to `TopicCommandWithZKClientTest.scala`, I think This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on issue #8224: KAFKA-9704: Fix the issue z/OS won't let us resize file when mmap.
mimaison commented on issue #8224: URL: https://github.com/apache/kafka/pull/8224#issuecomment-618329701 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on issue #8527: Remove dependencies on deprecated --zookeeper command flags in junit tests
cmccabe commented on issue #8527: URL: https://github.com/apache/kafka/pull/8527#issuecomment-618337731 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on issue #8069: KAFKA-9374: Make connector interactions asynchronous
C0urante commented on issue #8069: URL: https://github.com/apache/kafka/pull/8069#issuecomment-618097440 @ncliang I've made some updates to the PR and rebased on the latest trunk; would you be willing to do another pass? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable
abbccdda commented on a change in pull request #8486: URL: https://github.com/apache/kafka/pull/8486#discussion_r413468323 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java ## @@ -85,10 +85,6 @@ protected OffsetForEpochResult handleResponse( case KAFKA_STORAGE_ERROR: case OFFSET_NOT_AVAILABLE: case LEADER_NOT_AVAILABLE: -logger().debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.", Review comment: It is exactly the same as the subsequent cases handling. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on issue #8533: KAFKA-9589: Fixed bug in V2 log validator tests
ijuma commented on issue #8533: URL: https://github.com/apache/kafka/pull/8533#issuecomment-618192475 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #8238: KAFKA-9130: KIP-518 Allow listing consumer groups per state
mimaison commented on a change in pull request #8238: URL: https://github.com/apache/kafka/pull/8238#discussion_r413717848 ## File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ## @@ -1553,6 +1554,50 @@ class KafkaApisTest { assertEquals(Errors.INVALID_REQUEST, response.error) } + @Test + def testListGroupsRequest(): Unit = { +val overviews = List( + GroupOverview("group1", "protocol1", "Stable"), + GroupOverview("goupp2", "qwerty", "Empty") +) +val response = listGroupRequest(Option.empty, overviews) +assertEquals(2, response.data.groups.size) +assertEquals("", response.data.groups.get(0).groupState) +assertEquals("", response.data.groups.get(1).groupState) Review comment: Yes that's a good idea This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #8527: Remove dependencies on deprecated --zookeeper command flags in junit tests
cmccabe commented on a change in pull request #8527: URL: https://github.com/apache/kafka/pull/8527#discussion_r413719892 ## File path: core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala ## @@ -255,7 +225,7 @@ class ReassignPartitionsCommandArgsTest { @Test def shouldNotAllowBrokersListWithVerifyOption(): Unit = { val args = Array( - "--zookeeper", "localhost:1234", + "--bootstrap-server", "localhost:1234", Review comment: This is a good change since the test is not specifically testing the legacy mode (so it should use the new mode) Keep this one ## File path: core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala ## @@ -275,7 +245,7 @@ class ReassignPartitionsCommandArgsTest { @Test def shouldNotAllowTopicsOptionWithVerify(): Unit = { val args = Array( - "--zookeeper", "localhost:1234", + "--bootstrap-server", "localhost:1234", Review comment: keep this change ## File path: core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala ## @@ -265,7 +235,7 @@ class ReassignPartitionsCommandArgsTest { @Test def shouldNotAllowThrottleWithVerifyOption(): Unit = { val args = Array( - "--zookeeper", "localhost:1234", + "--bootstrap-server", "localhost:1234", Review comment: keep this change This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org