[jira] [Resolved] (KAFKA-12791) ConcurrentModificationException in KafkaProducer constructor
[ https://issues.apache.org/jira/browse/KAFKA-12791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Bradstreet resolved KAFKA-12791. -- Resolution: Fixed > ConcurrentModificationException in KafkaProducer constructor > > > Key: KAFKA-12791 > URL: https://issues.apache.org/jira/browse/KAFKA-12791 > Project: Kafka > Issue Type: Bug >Reporter: Lucas Bradstreet >Priority: Minor > Fix For: 3.0.0 > > > Recently we have noticed multiple instances where KafkaProducers have failed > to constructe due to the following exception: > {noformat} > org.apache.kafka.common.KafkaException: Failed to construct kafka producer at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:440) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:291) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:318) > java.base/java.lang.Thread.run(Thread.java:832) Caused by: > java.util.ConcurrentModificationException at > java.base/java.util.HashMap$HashIterator.nextNode(HashMap.java:1584) at > java.base/java.util.HashMap$KeyIterator.next(HashMap.java:1607) at > java.base/java.util.AbstractSet.removeAll(AbstractSet.java:171) at > org.apache.kafka.common.config.AbstractConfig.unused(AbstractConfig.java:221) > at > org.apache.kafka.common.config.AbstractConfig.logUnused(AbstractConfig.java:379) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:433) > ... 9 more exception.class:org.apache.kafka.common.KafkaException > exception.message:Failed to construct kafka producer > {noformat} > It appears that this is due to the fact that `used` below is a synchronized > set: > > {code:java} > public Set unused() { > Set keys = new HashSet<>(originals.keySet()); > keys.removeAll(used); > return keys; > }{code} > It appears that `used` is being modified while removeAll is being called. > This may be due to the way that keys are added to it when used: > {code:java} > protected Object get(String key) { > if (!values.containsKey(key)) > throw new ConfigException(String.format("Unknown configuration '%s'", key)); > used.add(key); > return values.get(key); > }{code} > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13342) LISR sent for topic queued for deletion in controller
Lucas Bradstreet created KAFKA-13342: Summary: LISR sent for topic queued for deletion in controller Key: KAFKA-13342 URL: https://issues.apache.org/jira/browse/KAFKA-13342 Project: Kafka Issue Type: Bug Reporter: Lucas Bradstreet Under certain conditions in some system tests a broker will be hard killed during a topic deletion and before its replica has moved to OfflineReplica state. When the broker comes back up the controller will send it a LeaderAndIsrRequest containing the partition causing it to recreate the partition locally even though it is in deleting state in the controller. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-13194) LogCleaner may clean past highwatermark
Lucas Bradstreet created KAFKA-13194: Summary: LogCleaner may clean past highwatermark Key: KAFKA-13194 URL: https://issues.apache.org/jira/browse/KAFKA-13194 Project: Kafka Issue Type: Bug Reporter: Lucas Bradstreet Here we have the cleaning point being bounded to the active segment base offset and the first unstable offset. Which makes sense: // find first segment that cannot be cleaned // neither the active segment, nor segments with any messages closer to the head of the log than the minimum compaction lag time // may be cleaned val firstUncleanableDirtyOffset: Long = Seq( // we do not clean beyond the first unstable offset log.firstUnstableOffset, // the active segment is always uncleanable Option(log.activeSegment.baseOffset), // the first segment whose largest message timestamp is within a minimum time lag from now if (minCompactionLagMs > 0) \{ // dirty log segments val dirtyNonActiveSegments = log.localNonActiveLogSegmentsFrom(firstDirtyOffset) dirtyNonActiveSegments.find { s => val isUncleanable = s.largestTimestamp > now - minCompactionLagMs debug(s"Checking if log segment may be cleaned: log='${log.name}' segment.baseOffset=${s.baseOffset} " + s"segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - minCompactionLagMs}; " + s"is uncleanable=$isUncleanable") isUncleanable }.map(_.baseOffset) } else None ).flatten.min But LSO starts out as None. @volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata] = None private[log] def firstUnstableOffset: Option[Long] = firstUnstableOffsetMetadata.map(_.messageOffset) For most code depending on the LSO, fetchLastStableOffsetMetadata is used to default it to the hwm if it's not set. private def fetchLastStableOffsetMetadata: LogOffsetMetadata = \{ checkIfMemoryMappedBufferClosed() // cache the current high watermark to avoid a concurrent update invalidating the range check val highWatermarkMetadata = fetchHighWatermarkMetadata firstUnstableOffsetMetadata match { case Some(offsetMetadata) if offsetMetadata.messageOffset < highWatermarkMetadata.messageOffset => if (offsetMetadata.messageOffsetOnly) { lock synchronized { val fullOffset = convertToOffsetMetadataOrThrow(offsetMetadata.messageOffset) if (firstUnstableOffsetMetadata.contains(offsetMetadata)) firstUnstableOffsetMetadata = Some(fullOffset) fullOffset } } else \{ offsetMetadata } case _ => highWatermarkMetadata } } This means that in the case where the hwm is prior to the active segment base, the log cleaner may clean past the hwm. This is most likely to occur after a broker restart when the log cleaner may start cleaning prior to replication becoming active. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12896) Group rebalance loop caused by repeated group leader JoinGroups
Lucas Bradstreet created KAFKA-12896: Summary: Group rebalance loop caused by repeated group leader JoinGroups Key: KAFKA-12896 URL: https://issues.apache.org/jira/browse/KAFKA-12896 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 2.6.0 Reporter: Lucas Bradstreet We encountered a strange case of a rebalance loop with the "cooperative-sticky" assignor. The logs show the following for several hours: {{Apr 7, 2021 @ 03:58:36.040[GroupCoordinator 7]: Stabilized group mygroup generation 19830137 (__consumer_offsets-7)}} {{Apr 7, 2021 @ 03:58:35.992[GroupCoordinator 7]: Preparing to rebalance group mygroup in state PreparingRebalance with old generation 19830136 (__consumer_offsets-7) (reason: Updating metadata for member mygroup-1-7ad27e07-3784-4588-97e1-d796a74d4ecc during CompletingRebalance)}} {{Apr 7, 2021 @ 03:58:35.988[GroupCoordinator 7]: Stabilized group mygroup generation 19830136 (__consumer_offsets-7)}} {{Apr 7, 2021 @ 03:58:35.972[GroupCoordinator 7]: Preparing to rebalance group mygroup in state PreparingRebalance with old generation 19830135 (__consumer_offsets-7) (reason: Updating metadata for member mygroup during CompletingRebalance)}} {{Apr 7, 2021 @ 03:58:35.965[GroupCoordinator 7]: Stabilized group mygroup generation 19830135 (__consumer_offsets-7)}} {{Apr 7, 2021 @ 03:58:35.953[GroupCoordinator 7]: Preparing to rebalance group mygroup in state PreparingRebalance with old generation 19830134 (__consumer_offsets-7) (reason: Updating metadata for member mygroup-7ad27e07-3784-4588-97e1-d796a74d4ecc during CompletingRebalance)}} {{Apr 7, 2021 @ 03:58:35.941[GroupCoordinator 7]: Stabilized group mygroup generation 19830134 (__consumer_offsets-7)}} {{Apr 7, 2021 @ 03:58:35.926[GroupCoordinator 7]: Preparing to rebalance group mygroup in state PreparingRebalance with old generation 19830133 (__consumer_offsets-7) (reason: Updating metadata for member mygroup during CompletingRebalance)}} Every single time, it was the same member that triggered the JoinGroup and it was always the leader of the group.{{}} The leader has the privilege of being able to trigger a rebalance by sending `JoinGroup` even if its subscription metadata has not changed. But why would it do so? It is possible that this is due to the same issue or a similar bug to https://issues.apache.org/jira/browse/KAFKA-12890. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12791) ConcurrentModificationException in KafkaProducer constructor
Lucas Bradstreet created KAFKA-12791: Summary: ConcurrentModificationException in KafkaProducer constructor Key: KAFKA-12791 URL: https://issues.apache.org/jira/browse/KAFKA-12791 Project: Kafka Issue Type: Bug Reporter: Lucas Bradstreet Recently we have noticed multiple instances where KafkaProducers have failed to constructe due to the following exception: {noformat} org.apache.kafka.common.KafkaException: Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:440) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:291) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:318) java.base/java.lang.Thread.run(Thread.java:832) Caused by: java.util.ConcurrentModificationException at java.base/java.util.HashMap$HashIterator.nextNode(HashMap.java:1584) at java.base/java.util.HashMap$KeyIterator.next(HashMap.java:1607) at java.base/java.util.AbstractSet.removeAll(AbstractSet.java:171) at org.apache.kafka.common.config.AbstractConfig.unused(AbstractConfig.java:221) at org.apache.kafka.common.config.AbstractConfig.logUnused(AbstractConfig.java:379) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:433) ... 9 more exception.class:org.apache.kafka.common.KafkaException exception.message:Failed to construct kafka producer {noformat} It appears that this is due to the fact that `used` below is a synchronized set: {code:java} public Set unused() { Set keys = new HashSet<>(originals.keySet()); keys.removeAll(used); return keys; }{code} It appears that `used` is being modified while removeAll is being called. This may be due to the way that keys are added to it when used: {code:java} protected Object get(String key) { if (!values.containsKey(key)) throw new ConfigException(String.format("Unknown configuration '%s'", key)); used.add(key); return values.get(key); }{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12736) KafkaProducer.flush holds onto completed ProducerBatch(s) until flush completed
Lucas Bradstreet created KAFKA-12736: Summary: KafkaProducer.flush holds onto completed ProducerBatch(s) until flush completed Key: KAFKA-12736 URL: https://issues.apache.org/jira/browse/KAFKA-12736 Project: Kafka Issue Type: Improvement Reporter: Lucas Bradstreet When flush is called a copy of the incomplete batches is made. This means that the full ProducerBatch(s) are held in memory until the flush has completed. For batches where the existing memory pool is used this is not as wasteful as the memory will already be returned to the pool, but for non pool memory it can only be GC'd after the flush has completed. Rather than use copyAll we can make a new array with only the produceFuture(s) and await on those. {code:java} /** * Mark all partitions as ready to send and block until the send is complete */ public void awaitFlushCompletion() throws InterruptedException { try { for (ProducerBatch batch : this.incomplete.copyAll()) batch.produceFuture.await(); } finally { this.flushesInProgress.decrementAndGet(); } } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12330) FetchSessionCache may cause starvation for partitions when FetchResponse is full
Lucas Bradstreet created KAFKA-12330: Summary: FetchSessionCache may cause starvation for partitions when FetchResponse is full Key: KAFKA-12330 URL: https://issues.apache.org/jira/browse/KAFKA-12330 Project: Kafka Issue Type: Bug Reporter: Lucas Bradstreet The incremental FetchSessionCache sessions deprioritizes partitions where a response is returned. This may happen if log metadata such as log start offset, hwm, etc is returned, or if data for that partition is returned. When a fetch response fills to maxBytes, data may not be returned for partitions where it's available. However, the fetch response will still contain updates to metadata such as hwm if that metadata has changed. This can lead to degenerate behavior where a partition's hwm or log start offset is updated resulting in the next fetch being unnecessarily skipped for that partition. At first this appeared to be worse, as hwm updates occur frequently, but starvation should result in hwm movement becoming blocked, allowing a fetch to go through and then becoming unstuck. However, it'll still require one more fetch request than necessary to do so. I believe we should only reorder the partition fetch priority if data is actually returned for a partition. {code:java} {code} {noformat} private class PartitionIterator(val iter: FetchSession.RESP_MAP_ITER, val updateFetchContextAndRemoveUnselected: Boolean) extends FetchSession.RESP_MAP_ITER { var nextElement: util.Map.Entry[TopicPartition, FetchResponse.PartitionData[Records]] = null override def hasNext: Boolean = { while ((nextElement == null) && iter.hasNext) { val element = iter.next() val topicPart = element.getKey val respData = element.getValue val cachedPart = session.partitionMap.find(new CachedPartition(topicPart)) val mustRespond = cachedPart.maybeUpdateResponseData(respData, updateFetchContextAndRemoveUnselected) if (mustRespond) { nextElement = element // Don't move partition to end of queue if we didn't actually fetch data // This should help avoid starvation even when we are filling the fetch response fully while returning metadata for these partitions if (updateFetchContextAndRemoveUnselected && respData.records != null && respData.records.sizeInBytes > 0) { session.partitionMap.remove(cachedPart) session.partitionMap.mustAdd(cachedPart) } } else { if (updateFetchContextAndRemoveUnselected) { iter.remove() } } } nextElement != null }{noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12178) Improve guard rails for consumer commit when using EOS
Lucas Bradstreet created KAFKA-12178: Summary: Improve guard rails for consumer commit when using EOS Key: KAFKA-12178 URL: https://issues.apache.org/jira/browse/KAFKA-12178 Project: Kafka Issue Type: Improvement Reporter: Lucas Bradstreet When EOS is in use, offsets are committed via the producer using the sendOffsetsToTransaction API. This is what ensures that a transaction is committed atomically along with the consumer offsets. Unfortunately this does not prevent the consumer from committing, making it easy to achieve non-EOS characteristics by accident. enable.auto.commit = true is the default setting for consumers. If this not set to false, or if commitSync/commitAsync are called manually offsets will no longer be committed correctly for EOS semantics. We need more guard rails to prevent consumers from being incorrectly used in this way. Currently the consumers have no knowledge that a producer is even committing offsets and this is difficult to achieve. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12177) Retention is not idempotent
Lucas Bradstreet created KAFKA-12177: Summary: Retention is not idempotent Key: KAFKA-12177 URL: https://issues.apache.org/jira/browse/KAFKA-12177 Project: Kafka Issue Type: Improvement Reporter: Lucas Bradstreet Kafka today applies retention in the following order: # Time # Size # Log start offset Today it is possible for a segment with offsets less than the log start offset to contain data that is not deletable due to time retention. This means that it's possible for log start offset retention to unblock further deletions as a result of time based retention. Note that this does require a case where the max timestamp for each segment increases, decreases and then increases again. Even so it would be nice to make retention idempotent by applying log start offset retention first, followed by size and time. This would also be potentially cheaper to perform as neither log start offset and size retention require the maxTimestamp for a segment to be loaded from disk after a broker restart. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10839) Improve consumer group coordinator unavailable message
Lucas Bradstreet created KAFKA-10839: Summary: Improve consumer group coordinator unavailable message Key: KAFKA-10839 URL: https://issues.apache.org/jira/browse/KAFKA-10839 Project: Kafka Issue Type: Improvement Reporter: Lucas Bradstreet When a consumer encounters an issue that triggers marking a coordinator as unknown, the error message it prints does not give context about the error that triggered it. {noformat} log.info("Group coordinator {} is unavailable or invalid, will attempt rediscovery", this.coordinator);{noformat} These may be triggered by response errors or the coordinator becoming disconnected. We should improve this error message to make the cause clear. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10432) LeaderEpochCache is incorrectly recovered on segment recovery for epoch 0
Lucas Bradstreet created KAFKA-10432: Summary: LeaderEpochCache is incorrectly recovered on segment recovery for epoch 0 Key: KAFKA-10432 URL: https://issues.apache.org/jira/browse/KAFKA-10432 Project: Kafka Issue Type: Bug Affects Versions: 2.6.0, 2.5.0, 2.4.0, 2.3.0 Reporter: Lucas Bradstreet I added some functionality to the system tests to compare epoch cache lineages ([https://github.com/apache/kafka/pull/9213]), and I found a bug in leader epoch cache recovery. The test hard kills a broker and the cache hasn't been flushed yet, and then it starts up and goes through log recovery. After recovery there is divergence in the epoch caches for epoch 0: {noformat} AssertionError: leader epochs for output-topic-1 didn't match [{0: 9393L, 2: 9441L, 4: 42656L}, {0: 0L, 2: 9441L, 4: 42656L}, {0: 0L, 2: 9441L, 4: 42656L}] {noformat} The cache is supposed to include the offset for epoch 0 but in recovery it skips it [https://github.com/apache/kafka/blob/487b3682ebe0eefde3445b37ee72956451a9d15e/core/src/main/scala/kafka/log/LogSegment.scala#L364] due to [https://github.com/apache/kafka/commit/d152989f26f51b9004b881397db818ad6eaf0392]. Then it stamps the epoch with a later offset when fetching from the leader. I'm not sure why the recovery code includes the condition `batch.partitionLeaderEpoch > 0`. I discussed this with Jason Gustafson and he believes it may have been intended to avoid assigning negative epochs but is not sure why it was added. None of the tests fail with this check removed. {noformat} leaderEpochCache.foreach { cache => if (batch.partitionLeaderEpoch > 0 && cache.latestEpoch.forall(batch.partitionLeaderEpoch > _)) cache.assign(batch.partitionLeaderEpoch, batch.baseOffset) } {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10399) Producer and consumer clients could log IP addresses for brokers to ease debugging
Lucas Bradstreet created KAFKA-10399: Summary: Producer and consumer clients could log IP addresses for brokers to ease debugging Key: KAFKA-10399 URL: https://issues.apache.org/jira/browse/KAFKA-10399 Project: Kafka Issue Type: Improvement Components: consumer, producer Reporter: Lucas Bradstreet Lag in DNS updates and resolution can cause connectivity problems in clients. client.dns.lookup = "use_all_dns_ips" helps reduce the incidence of such issues, however it's still possible for DNS issues to cause real problems with clients. The ZK client helpfully logs IP addresses with DNS addresses. We could do the same thing in the Kafka clients, e.g. {noformat} Group coordinator broker3.my.kafka.cluster.com/52.32.14.201:9092 (id: 3738382 rack: null) is unavailable or invalid{noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10390) kafka-server-stop lookup is not specific enough and may kill other processes
Lucas Bradstreet created KAFKA-10390: Summary: kafka-server-stop lookup is not specific enough and may kill other processes Key: KAFKA-10390 URL: https://issues.apache.org/jira/browse/KAFKA-10390 Project: Kafka Issue Type: Bug Components: core Reporter: Lucas Bradstreet kafka-server-stop.sh picks out kafka processes by: {noformat} PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}'){noformat} This is not specific enough and may match unintended processes, e.g. one that even includes dependencies including *.kafka.kafka.* **A better match would be: {noformat} PIDS=$(ps ax | grep ' kafka\.Kafka ' | grep java | grep -v grep | awk '{print $1}') {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9946) KAFKA-9539/StopReplicaRequest deletePartition changes may cause premature topic deletion handling in controller
Lucas Bradstreet created KAFKA-9946: --- Summary: KAFKA-9539/StopReplicaRequest deletePartition changes may cause premature topic deletion handling in controller Key: KAFKA-9946 URL: https://issues.apache.org/jira/browse/KAFKA-9946 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.6.0 Reporter: Lucas Bradstreet It seems like [https://github.com/apache/kafka/commit/7c7d55dbd8d42f6378d13ba02d62633366a7ede8] does not handle StopReplicaRequest where deletePartition(s) is set to false correctly when another delete topic request is outstanding at the time of the response being received. In the failing code it seems like two StopReplicaRequest(s) are sent, one with the delete flag set on partitions, and one without. It seems like the request without the delete flag set on any partitions is prematurely triggering the controller to believe that the topic was deleted successfully. We previously didn't set a callback if the StopReplicaRequest was not a delete request [https://github.com/apache/kafka/commit/7c7d55dbd8d42f6378d13ba02d62633366a7ede8#diff-987fef43991384a3ebec5fb55e53b577L570|https://github.com/apache/kafka/commit/7c7d55dbd8d42f6378d13ba02d62633366a7ede8#diff-987fef43991384a3ebec5fb55e53b577L570,]. Now we set it unconditionally [https://github.com/apache/kafka/commit/7c7d55dbd8d42f6378d13ba02d62633366a7ede8#diff-987fef43991384a3ebec5fb55e53b577L570,] but the callback does not distinguish between the partition states where a delete was being performed and where it was not. This happens on all IBP versions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9864) Avoid expensive QuotaViolationException usage
Lucas Bradstreet created KAFKA-9864: --- Summary: Avoid expensive QuotaViolationException usage Key: KAFKA-9864 URL: https://issues.apache.org/jira/browse/KAFKA-9864 Project: Kafka Issue Type: Improvement Components: core Reporter: Lucas Bradstreet QuotaViolationException generates stack traces and uses String.format in exception generation. QuotaViolationException is used for control flow and these costs add up even though the exception contents are ignored. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9820) validateMessagesAndAssignOffsetsCompressed allocates batch iterator which is not used
Lucas Bradstreet created KAFKA-9820: --- Summary: validateMessagesAndAssignOffsetsCompressed allocates batch iterator which is not used Key: KAFKA-9820 URL: https://issues.apache.org/jira/browse/KAFKA-9820 Project: Kafka Issue Type: Bug Components: core Reporter: Lucas Bradstreet KAFKA-8106 added a new skip key/value iterator that reduces allocations [https://github.com/apache/kafka/commit/3e9d1c1411c5268de382f9dfcc95bdf66d0063a0]. Unfortunately in LogValidator it creates that iterator but it never uses it, and this is quite expensive in terms of allocations. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-8963) Benchmark and optimize incremental fetch session handler
[ https://issues.apache.org/jira/browse/KAFKA-8963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Bradstreet resolved KAFKA-8963. - Fix Version/s: 2.5.0 Resolution: Fixed > Benchmark and optimize incremental fetch session handler > > > Key: KAFKA-8963 > URL: https://issues.apache.org/jira/browse/KAFKA-8963 > Project: Kafka > Issue Type: Task >Reporter: Lucas Bradstreet >Assignee: Lucas Bradstreet >Priority: Major > Fix For: 2.5.0 > > > The FetchSessionHandler is a cause of high CPU usage in the replica fetcher > for brokers with high partition counts. A jmh benchmark should be added and > the incremental fetch session handling should be measured and optimized. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9577) Client encountering SASL_HANDSHAKE protocol version errors on 2.5 / trunk
Lucas Bradstreet created KAFKA-9577: --- Summary: Client encountering SASL_HANDSHAKE protocol version errors on 2.5 / trunk Key: KAFKA-9577 URL: https://issues.apache.org/jira/browse/KAFKA-9577 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.5.0 Reporter: Lucas Bradstreet I am trying 2.5.0 with sasl turned on and my consumer clients receive: {noformat} org.apache.kafka.common.errors.UnsupportedVersionException: The SASL_HANDSHAKE protocol does not support version 2 {noformat} I believe this is due to [https://github.com/apache/kafka/commit/0a2569e2b9907a1217dd50ccbc320f8ad0b42fd0] which added flexible version support and bumped the protocol version. It appears that the SaslClientAuthenticator uses the max version for SASL_HANDSHAKE returned by the broker's api versions request, and then uses that version even though it may not support it. See [https://github.com/apache/kafka/blob/eb09efa9ac79efa484307bdcf03ac8eb8a3a94e2/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java#L290]. This may make it hard to ever evolve this schema. In the short term I suggest we roll back the version bump and flexible schema until we figure out a path forward. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9137) Maintenance of FetchSession cache causing FETCH_SESSION_ID_NOT_FOUND in live sessions
[ https://issues.apache.org/jira/browse/KAFKA-9137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Bradstreet resolved KAFKA-9137. - Resolution: Fixed Closed by [https://github.com/apache/kafka/pull/7640] > Maintenance of FetchSession cache causing FETCH_SESSION_ID_NOT_FOUND in live > sessions > - > > Key: KAFKA-9137 > URL: https://issues.apache.org/jira/browse/KAFKA-9137 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Lucas Bradstreet >Priority: Major > > We have recently seen cases where brokers end up in a bad state where fetch > session evictions occur at a high rate (> 16 per second) after a roll. This > increase in eviction rate included the following pattern in our logs: > > {noformat} > broker 6: October 31st 2019, 17:52:45.496 Created a new incremental > FetchContext for session id 2046264334, epoch 9790: added (), updated (), > removed () > broker 6: October 31st 2019, 17:52:45.496 Created a new incremental > FetchContext for session id 2046264334, epoch 9791: added (), updated (), > removed () broker 6: October 31st 2019, 17:52:45.500 Created a new > incremental FetchContext for session id 2046264334, epoch 9792: added (), > updated (lkc-7nv6o_tenant_soak_topic_144p-67), removed () > broker 6: October 31st 2019, 17:52:45.501 Created a new incremental > FetchContext for session id 2046264334, epoch 9793: added (), updated > (lkc-7nv6o_tenant_soak_topic_144p-59, lkc-7nv6o_tenant_soak_topic_144p-123, > lkc-7nv6o_tenant_soak_topic_144p-11, lkc-7nv6o_tenant_soak_topic_144p-3, > lkc-7nv6o_tenant_soak_topic_144p-67, lkc-7nv6o_tenant_soak_topic_144p-115), > removed () > broker 6: October 31st 2019, 17:52:45.501 Evicting stale FetchSession > 2046264334. > broker 6: October 31st 2019, 17:52:45.502 Session error for 2046264334: no > such session ID found. > broker 4: October 31st 2019, 17:52:45.813 [ReplicaFetcher replicaId=4, > leaderId=6, fetcherId=0] Node 6 was unable to process the fetch request with > (sessionId=2046264334, epoch=9793): FETCH_SESSION_ID_NOT_FOUND. > {noformat} > This pattern appears to be problematic for two reasons. Firstly, the replica > fetcher for broker 4 was clearly able to send multiple incremental fetch > requests to broker 6, and receive replies, and did so right up to the point > where broker 6 evicted its fetch session within milliseconds of multiple > fetch requests. The second problem is that replica fetchers are considered > privileged for the fetch session cache, and should not be evicted by consumer > fetch sessions. This cluster only has 12 brokers and 1000 fetch session cache > slots (the default for max.incremental.fetch.session.cache.slots), and it > thus very unlikely that this session should have been evicted by another > replica fetcher session. > This cluster also appears to be causing cycles of fetch session evictions > where the cluster never stabilizes into a state where fetch sessions are not > evicted. The above logs are the best example I could find of a case where a > session clearly should not have been evicted. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9513) Failed GroupMetadataManager loadGroupAndOffsets will consider groups as loaded
Lucas Bradstreet created KAFKA-9513: --- Summary: Failed GroupMetadataManager loadGroupAndOffsets will consider groups as loaded Key: KAFKA-9513 URL: https://issues.apache.org/jira/browse/KAFKA-9513 Project: Kafka Issue Type: Improvement Reporter: Lucas Bradstreet Bugs in group loading such as https://issues.apache.org/jira/browse/KAFKA-8896 may cause errors loading offsets. loadGroupsAndOffsets's finally block adds the offsets partition to ownedPartitions and removes it from loadingPartitions even if this process does not succeed. {code:java} private[group] def loadGroupsAndOffsets(topicPartition: TopicPartition, onGroupLoaded: GroupMetadata => Unit): Unit = { try { val startMs = time.milliseconds() doLoadGroupsAndOffsets(topicPartition, onGroupLoaded) val endMs = time.milliseconds() val timeLapse = endMs - startMs partitionLoadSensor.record(timeLapse, endMs, false) info(s"Finished loading offsets and group metadata from $topicPartition in $timeLapse milliseconds.") } catch { case t: Throwable => error(s"Error loading offsets from $topicPartition", t) } finally { inLock(partitionLock) { ownedPartitions.add(topicPartition.partition) loadingPartitions.remove(topicPartition.partition) } } } {code} This means that the group is considered loaded by: {code:java} def isGroupLoading(groupId: String): Boolean = isPartitionLoading(partitionFor(groupId)) {code} Which may result in consumers being able to load the wrong offsets. We should consider whether we should be more defensive and instead mark the partition as failed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9401) High lock contention for kafka.server.FetchManager.newContext
Lucas Bradstreet created KAFKA-9401: --- Summary: High lock contention for kafka.server.FetchManager.newContext Key: KAFKA-9401 URL: https://issues.apache.org/jira/browse/KAFKA-9401 Project: Kafka Issue Type: Improvement Components: core Reporter: Lucas Bradstreet kafka.server.FetchManager.newContext takes out what is essentially a global fetch lock on kafka.server.FetchSessionCache, for updates to not only the FetchSessionCache but the also update the fetch sessions stored with in it. This causes a high amount of lock contention for fetches, as every fetch request must go through this lock. I have taken an async-profiler lock profile on a high throughput cluster, and I see around 25s of waiting on this lock for a sixty second profile. --- 25818577497 ns (20.84%), 5805 samples [ 0] kafka.server.FetchSessionCache [ 1] kafka.server.FetchManager.newContext [ 2] kafka.server.KafkaApis.handleFetchRequest [ 3] kafka.server.KafkaApis.handle [ 4] kafka.server.KafkaRequestHandler.run [ 5] java.lang.Thread.run {code:java} cache.synchronized { cache.get(reqMetadata.sessionId) match { case None => { debug(s"Session error for ${reqMetadata.sessionId}: no such session ID found.") new SessionErrorContext(Errors.FETCH_SESSION_ID_NOT_FOUND, reqMetadata) } case Some(session) => session.synchronized { if (session.epoch != reqMetadata.epoch) { debug(s"Session error for ${reqMetadata.sessionId}: expected epoch " + s"${session.epoch}, but got ${reqMetadata.epoch} instead."); new SessionErrorContext(Errors.INVALID_FETCH_SESSION_EPOCH, reqMetadata) } else { val (added, updated, removed) = session.update(fetchData, toForget, reqMetadata) if (session.isEmpty) { debug(s"Created a new sessionless FetchContext and closing session id ${session.id}, " + s"epoch ${session.epoch}: after removing ${partitionsToLogString(removed)}, " + s"there are no more partitions left.") cache.remove(session) new SessionlessFetchContext(fetchData) } else { cache.touch(session, time.milliseconds()) session.epoch = JFetchMetadata.nextEpoch(session.epoch) debug(s"Created a new incremental FetchContext for session id ${session.id}, " + s"epoch ${session.epoch}: added ${partitionsToLogString(added)}, " + s"updated ${partitionsToLogString(updated)}, " + s"removed ${partitionsToLogString(removed)}") new IncrementalFetchContext(time, reqMetadata, session) } } } } } {code} Contention has been made worse by the solution for "KAFKA-9137: Fix incorrect FetchSessionCache eviction logic" ([https://github.com/apache/kafka/pull/7640),] as the cache is correctly touched now, whereas previously the touch was being skipped. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9393) DeleteRecords triggers extreme lock contention for large partition directories
Lucas Bradstreet created KAFKA-9393: --- Summary: DeleteRecords triggers extreme lock contention for large partition directories Key: KAFKA-9393 URL: https://issues.apache.org/jira/browse/KAFKA-9393 Project: Kafka Issue Type: Improvement Affects Versions: 2.4.0, 2.3.0, 2.2.0 Reporter: Lucas Bradstreet DeleteRecords, frequently used by KStreams triggers a Log.maybeIncrementLogStartOffset call, calling kafka.log.ProducerStateManager.listSnapshotFiles which calls java.io.File.listFiles on the partition dir. The time taken to list this directory can be extreme for partitions with many small segments (e.g 2) taking multiple seconds to finish. This causes lock contention for the log, and if produce requests are also occurring for the same log can cause a majority of request handler threads to become blocked waiting for the DeleteRecords call to finish. I believe this is a problem going back to the initial implementation of the transactional producer, but I need to confirm how far back it goes. One possible solution is to maintain a producer state snapshot aligned to the log segment, and simply delete it whenever we delete a segment. This would ensure that we never have to perform a directory scan. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9359) Controller does not handle requests while broker is being shutdown
Lucas Bradstreet created KAFKA-9359: --- Summary: Controller does not handle requests while broker is being shutdown Key: KAFKA-9359 URL: https://issues.apache.org/jira/browse/KAFKA-9359 Project: Kafka Issue Type: Improvement Components: controller, core Reporter: Lucas Bradstreet When a broker is shutdown it stops accepting requests, as it immediately socket server and handler pools are shutdown. It does so before shutting down the controller and or closing the log manager, and this may take some time to complete. During this time it will remain the controller as the zkClient has not been closed. We should improve the shutdown process such that a broker does not remain the controller while it is unable to accept requests that is expected of a controller. See also https://issues.apache.org/jira/browse/KAFKA-9358 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9358) Explicitly resign controller leadership and broker znode
Lucas Bradstreet created KAFKA-9358: --- Summary: Explicitly resign controller leadership and broker znode Key: KAFKA-9358 URL: https://issues.apache.org/jira/browse/KAFKA-9358 Project: Kafka Issue Type: Improvement Components: controller, core Reporter: Lucas Bradstreet Assignee: Lucas Bradstreet When shutting down the controller the broker shuts down the controller and then closes the zookeeper connection. Closing the zookeeper connection results in ephemeral nodes being removed. It is currently critical that the zkClient is closed after the controller is shutdown, otherwise a controller election will not occur if the broker being shutdown is currently the controller. We should consider resigning leadership explicitly in the controller rather than relying on the zookeeper client being closed. This would ensure that any changes in shutdown order cannot lead to periods where a broker's controller component is stopped while also maintaining leadership until the zkClient is closed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9338) Incremental fetch sessions do not maintain or use leader epoch for fencing purposes
Lucas Bradstreet created KAFKA-9338: --- Summary: Incremental fetch sessions do not maintain or use leader epoch for fencing purposes Key: KAFKA-9338 URL: https://issues.apache.org/jira/browse/KAFKA-9338 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.4.0, 2.3.0, 2.2.0, 2.1.0 Reporter: Lucas Bradstreet KIP-320 adds the ability to fence replicas by detecting stale leader epochs from followers, and helping consumers handle unclean truncation. Unfortunately the incremental fetch session handling does not maintain or use the leader epoch in the fetch session cache. As a result, it does not appear that the leader epoch is used for fencing a majority of the time. I'm not sure if this is only the case after incremental fetch sessions are established - it may be the case that the first "full" fetch session is safe. Optional.empty is returned for the FetchRequest.PartitionData here: [https://github.com/apache/kafka/blob/a4cbdc6a7b3140ccbcd0e2339e28c048b434974e/core/src/main/scala/kafka/server/FetchSession.scala#L111] I believe this affects brokers from 2.1.0 when fencing was improved on the replica fetcher side, and 2.3.0 and above for consumers, which is when client side truncation detection was added on the consumer side. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9312) KafkaProducer flush behavior does not guarantee send completion under record batch splitting
Lucas Bradstreet created KAFKA-9312: --- Summary: KafkaProducer flush behavior does not guarantee send completion under record batch splitting Key: KAFKA-9312 URL: https://issues.apache.org/jira/browse/KAFKA-9312 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 2.4.0, 2.3.0, 2.2.0, 2.1.0 Reporter: Lucas Bradstreet The KafkaProducer flush call guarantees that all records that have been sent at time of the flush call will be either sent successfully or will result in an error. The KafkaProducer will split record batches upon receiving a MESSAGE_TOO_LARGE error from the broker. However the flush behavior relies on the accumulator checking incomplete sends that exist at the time of the flush call. {code:java} public void awaitFlushCompletion() throws InterruptedException { try { for (ProducerBatch batch : this.incomplete.copyAll()) batch.produceFuture.await(); } finally { this.flushesInProgress.decrementAndGet(); } }{code} When large record batches are split, the batch producerFuture in question is completed, and new batches added to the incomplete list of record batches. This will break the flush guarantee as awaitFlushCompletion will finish without awaiting the corresponding batches. This is demonstrated in a test case that can be found at [https://github.com/lbradstreet/kafka/commit/733a683273c31823df354d0a785cb2c24365735a#diff-0b8da0c7ceecaa1f00486dadb53208b1R2339] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9200) ListOffsetRequest missing error response for v5
Lucas Bradstreet created KAFKA-9200: --- Summary: ListOffsetRequest missing error response for v5 Key: KAFKA-9200 URL: https://issues.apache.org/jira/browse/KAFKA-9200 Project: Kafka Issue Type: Bug Affects Versions: 2.3.0 Reporter: Lucas Bradstreet Assignee: Lucas Bradstreet It seems ListOffsetResponse getErrorResponse is missing a a case for version 5. I'd have hoped that this kind of case would be picked up by KafkaApisTest. {noformat} java.lang.IllegalArgumentException: Version 5 is not valid. Valid versions for ListOffsetRequest are 0 to 5 at org.apache.kafka.common.requests.ListOffsetRequest.getErrorResponse(ListOffsetRequest.java:282) at kafka.server.KafkaApis.sendErrorOrCloseConnection(KafkaApis.scala:3062) at kafka.server.KafkaApis.sendErrorResponseMaybeThrottle(KafkaApis.scala:3045) at kafka.server.KafkaApis.handleError(KafkaApis.scala:3027) at kafka.server.KafkaApis.handle(KafkaApis.scala:209) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:78) at java.lang.Thread.run(Thread.java:748) {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9193) org.apache.kafka.common.utils.Timer should use monotonic clock
Lucas Bradstreet created KAFKA-9193: --- Summary: org.apache.kafka.common.utils.Timer should use monotonic clock Key: KAFKA-9193 URL: https://issues.apache.org/jira/browse/KAFKA-9193 Project: Kafka Issue Type: Bug Reporter: Lucas Bradstreet utils.Timer uses System.currentTimeMillis to implement blocking methods with timeouts. We should not rely on a non-monotonic clock and should instead switch this to Time.hiResClockMs() (which uses System.nanoTime). When we do so we should revert [https://github.com/apache/kafka/pull/7683] which was caused by inaccuracies in our current approach (the test was good, the code is bad). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9137) Maintenance of FetchSession cache causing FETCH_SESSION_ID_NOT_FOUND in live sessions
Lucas Bradstreet created KAFKA-9137: --- Summary: Maintenance of FetchSession cache causing FETCH_SESSION_ID_NOT_FOUND in live sessions Key: KAFKA-9137 URL: https://issues.apache.org/jira/browse/KAFKA-9137 Project: Kafka Issue Type: Bug Components: core Reporter: Lucas Bradstreet We have recently seen cases where brokers end up in a bad state where fetch session evictions occur at a high rate (> 16 per second) after a roll. This increase in eviction rate included the following pattern in our logs: {noformat} broker 6: October 31st 2019, 17:52:45.496 Created a new incremental FetchContext for session id 2046264334, epoch 9790: added (), updated (), removed () broker 6: October 31st 2019, 17:52:45.496 Created a new incremental FetchContext for session id 2046264334, epoch 9791: added (), updated (), removed () broker 6: October 31st 2019, 17:52:45.500 Created a new incremental FetchContext for session id 2046264334, epoch 9792: added (), updated (lkc-7nv6o_tenant_soak_topic_144p-67), removed () broker 6: October 31st 2019, 17:52:45.501 Created a new incremental FetchContext for session id 2046264334, epoch 9793: added (), updated (lkc-7nv6o_tenant_soak_topic_144p-59, lkc-7nv6o_tenant_soak_topic_144p-123, lkc-7nv6o_tenant_soak_topic_144p-11, lkc-7nv6o_tenant_soak_topic_144p-3, lkc-7nv6o_tenant_soak_topic_144p-67, lkc-7nv6o_tenant_soak_topic_144p-115), removed () broker 6: October 31st 2019, 17:52:45.501 Evicting stale FetchSession 2046264334. broker 6: October 31st 2019, 17:52:45.502 Session error for 2046264334: no such session ID found. broker 4: October 31st 2019, 17:52:45.813 [ReplicaFetcher replicaId=4, leaderId=6, fetcherId=0] Node 6 was unable to process the fetch request with (sessionId=2046264334, epoch=9793): FETCH_SESSION_ID_NOT_FOUND. {noformat} This pattern appears to be problematic for two reasons. Firstly, the replica fetcher for broker 4 was clearly able to send multiple incremental fetch requests to broker 6, and receive replies, and did so right up to the point where broker 6 evicted its fetch session within milliseconds of multiple fetch requests. The second problem is that replica fetchers are considered privileged for the fetch session cache, and should not be evicted by consumer fetch sessions. This cluster only has 12 brokers and 1000 fetch session cache slots (the default for max.incremental.fetch.session.cache.slots), and it thus very unlikely that this session should have been evicted by another replica fetcher session. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9048) Improve partition scalability in replica fetcher
Lucas Bradstreet created KAFKA-9048: --- Summary: Improve partition scalability in replica fetcher Key: KAFKA-9048 URL: https://issues.apache.org/jira/browse/KAFKA-9048 Project: Kafka Issue Type: Task Components: core Reporter: Lucas Bradstreet https://issues.apache.org/jira/browse/KAFKA-9039 ([https://github.com/apache/kafka/pull/7443]) improves the performance of the replica fetcher (at both small and large numbers of partitions), but it does not improve its complexity or scalability in the number of partitions. I took a profile using async-profiler for the 1000 partition JMH replica fetcher benchmark. The big remaining culprits are: * ~18% looking up logStartOffset * ~45% FetchSessionHandler$Builder.add * ~19% FetchSessionHandler$Builder.build *Suggestions* # The logStartOffset is looked up for every partition on each doWork pass. This requires a hashmap lookup even though the logStartOffset changes rarely. If the replica fetcher could be notified of updates to the logStartOffset, then we could reduce the overhead to a function of the number of updates to the logStartOffset instead of O(n) on each pass. # The use of FetchSessionHandler means that we maintain a partitionStates hashmap in the replica fetcher, and a sessionPartitions hashmap in the FetchSessionHandler. On each incremental fetch session pass, we need to reconcile these two hashmaps to determine which partitions were added/updated and which partitions were removed. This reconciliation process is especially expensive, requiring multiple passes over the fetching partitions, and hashmap remove and puts for most partitions. The replica fetcher could be smarter by maintaining the fetch session *updated* hashmap containing FetchRequest.PartitionData(s) directly, as well as *removed* partitions list so that these do not need to be generated by reconciled on each fetch pass. # maybeTruncate requires an O(n) pass over the elements in partitionStates even if there are no partitions in truncating state. If we can maintain some additional state about whether truncating partitions exist in partitionStates, or if we could separate these states into a separate data structure, we would not need to iterate across all partitions on every doWork pass. I’ve seen clusters where this work takes about 0.5%-1% of CPU, which is minor but will become more substantial as the number of partitions increases. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-8963) Benchmark and optimize incremental fetch session handler
Lucas Bradstreet created KAFKA-8963: --- Summary: Benchmark and optimize incremental fetch session handler Key: KAFKA-8963 URL: https://issues.apache.org/jira/browse/KAFKA-8963 Project: Kafka Issue Type: Task Reporter: Lucas Bradstreet The FetchSessionHandler is a cause of high CPU usage in the replica fetcher for brokers with high partition counts. We should add a jmh benchmark and optimize the incremental fetch session building. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-8899) Optimize Partition.maybeIncrementLeaderHW
[ https://issues.apache.org/jira/browse/KAFKA-8899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Bradstreet resolved KAFKA-8899. - Resolution: Duplicate Duplicate of https://issues.apache.org/jira/browse/KAFKA-8841 > Optimize Partition.maybeIncrementLeaderHW > - > > Key: KAFKA-8899 > URL: https://issues.apache.org/jira/browse/KAFKA-8899 > Project: Kafka > Issue Type: Task > Components: core >Affects Versions: 2.3.0, 2.2.1 >Reporter: Lucas Bradstreet >Priority: Major > > Partition.maybeIncrementLeaderHW is in the hot path for > ReplicaManager.updateFollowerFetchState. When replicating between brokers > with high partition counts, maybeIncrementLeaderHW becomes expensive, with > much of the time going to calling Partition.remoteReplicas which performs a > toSet conversion. maybeIncrementLeaderHW should avoid generating any > intermediate collections when calculating the new HWM. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (KAFKA-8899) Optimize Partition.maybeIncrementLeaderHW
Lucas Bradstreet created KAFKA-8899: --- Summary: Optimize Partition.maybeIncrementLeaderHW Key: KAFKA-8899 URL: https://issues.apache.org/jira/browse/KAFKA-8899 Project: Kafka Issue Type: Task Components: core Affects Versions: 2.2.1, 2.3.0 Reporter: Lucas Bradstreet Partition.maybeIncrementLeaderHW is in the hot path for ReplicaManager.updateFollowerFetchState. When replicating between brokers with high partition counts, maybeIncrementLeaderHW becomes expensive, with much of the time going to calling Partition.remoteReplicas which performs a toSet conversion. maybeIncrementLeaderHW should avoid generating any intermediate collections when calculating the new HWM. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (KAFKA-8872) Improvements to controller "deleting" state / topic Identifiers
Lucas Bradstreet created KAFKA-8872: --- Summary: Improvements to controller "deleting" state / topic Identifiers Key: KAFKA-8872 URL: https://issues.apache.org/jira/browse/KAFKA-8872 Project: Kafka Issue Type: Improvement Reporter: Lucas Bradstreet Kafka currently uniquely identifies a topic by its name. This is generally sufficient, but there are flaws in this scheme if a topic is deleted and recreated with the same name. As a result, Kafka attempts to prevent these classes of issues by ensuring a topic is deleted from all replicas before completing a deletion. This solution is not perfect, as it is possible for partitions to be reassigned from brokers while they are down, and there are no guarantees that this state will ever be cleaned up and will not cause issues in the future. As the controller must wait for all replicas to delete their local partitions, deletes can also become blocked, preventing topics from being created with the same name until the deletion is complete on all replicas. This can mean that downtime for a single broker can effectively cause a complete outage for everyone producing/consuming to that topic name, as the topic cannot be recreated without manual intervention. Unique topic IDs could help address this issue by associating a unique ID with each topic, ensuring a newly created topic with a previously used name cannot be confused with a previous topic with that name. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (KAFKA-8499) Ducker missing java commands in path for ducker user on openjdk docker images
Lucas Bradstreet created KAFKA-8499: --- Summary: Ducker missing java commands in path for ducker user on openjdk docker images Key: KAFKA-8499 URL: https://issues.apache.org/jira/browse/KAFKA-8499 Project: Kafka Issue Type: Improvement Components: system tests Affects Versions: 2.2.0, 2.3.0 Reporter: Lucas Bradstreet openjdk:8/openjdk:11 used to include java and other java programs in /usr/bin. It has since been moved to ```/usr/local/openjdk-VERSION/bin```, which will cause problems when the system tests invoke any java related utility as well as java itself if the user is using a later image with the same tag. The openjdk images have been updated with the same tag, so this can happen suddenly without any other code changes if the new version is pulled. We need to ensure the ducker user that is created in the Dockerfile includes the new location that java is installed to is included in its path. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8125) Check for topic existence in CreateTopicsRequest prior to creating replica assignment
Lucas Bradstreet created KAFKA-8125: --- Summary: Check for topic existence in CreateTopicsRequest prior to creating replica assignment Key: KAFKA-8125 URL: https://issues.apache.org/jira/browse/KAFKA-8125 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 2.1.1 Reporter: Lucas Bradstreet Imagine the following pattern to ensure topic creation in an application: # Attempt to create a topic with # partitions P and replication factor R. # If topic creation fails with TopicExistsException, continue. If topic creation succeeds, continue, the topic now exists. This normally works fine. However, if the topic has already been created, but if the number of live brokers < R, then the topic creation will fail an org.apache.kafka.common.errors.InvalidReplicationFactorException, even though the topic already exists. This could be avoided if we check whether the topic exists prior to calling AdminUtils.assignReplicasToBrokers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7410) Rack aware partitions assignment create unbalanced broker assignments on unbalanced racks
Lucas Bradstreet created KAFKA-7410: --- Summary: Rack aware partitions assignment create unbalanced broker assignments on unbalanced racks Key: KAFKA-7410 URL: https://issues.apache.org/jira/browse/KAFKA-7410 Project: Kafka Issue Type: Bug Components: admin Affects Versions: 1.1.1 Reporter: Lucas Bradstreet Attachments: AdminUtilsTest.scala AdminUtils creates a bad partition assignment when the number of brokers on each rack is unbalanced, e.g. 80 brokers rack A, 20 brokers rack B, 15 brokers rack C. Under such a scenario, a single broker from rack C may be assigned over and over again, when more balanced allocations exist. kafka.admin.AdminUtils.getRackAlternatedBrokerList is supposed to create a list of brokers alternating by rack, however once it runs out of brokers on the racks with fewer brokers, it ends up placing a run of brokers from the same rack together as rackIterator.hasNext will return false for the other racks. {code:java} while (result.size < brokerRackMap.size) { val rackIterator = brokersIteratorByRack(racks(rackIndex)) if (rackIterator.hasNext) result += rackIterator.next() rackIndex = (rackIndex + 1) % racks.length }{code} Once assignReplicasToBrokersRackAware hits the run of brokers from the same rack, when choosing the replicas to go along with the leader on the rack with the most brokers e.g. C, it will skip all of the C brokers until it wraps around to the first broker in the alternated list, and choose the first broker in the alternated list. {code:java} if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks) && (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == numBrokers)) { replicaBuffer += broker racksWithReplicas += rack brokersWithReplicas += broker done = true } k += 1 {code} It does so for each of the remaining brokers for C, choosing the first broker in the alternated list until it's allocated all of the partitions. See the attached sample code for more details. -- This message was sent by Atlassian JIRA (v7.6.3#76005)