[jira] [Resolved] (KAFKA-12791) ConcurrentModificationException in KafkaProducer constructor

2021-11-26 Thread Lucas Bradstreet (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Bradstreet resolved KAFKA-12791.
--
Resolution: Fixed

> ConcurrentModificationException in KafkaProducer constructor
> 
>
> Key: KAFKA-12791
> URL: https://issues.apache.org/jira/browse/KAFKA-12791
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lucas Bradstreet
>Priority: Minor
> Fix For: 3.0.0
>
>
> Recently we have noticed multiple instances where KafkaProducers have failed 
> to constructe due to the following exception:
> {noformat}
> org.apache.kafka.common.KafkaException: Failed to construct kafka producer at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:440)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:291)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:318)
>  java.base/java.lang.Thread.run(Thread.java:832) Caused by: 
> java.util.ConcurrentModificationException at 
> java.base/java.util.HashMap$HashIterator.nextNode(HashMap.java:1584) at 
> java.base/java.util.HashMap$KeyIterator.next(HashMap.java:1607) at 
> java.base/java.util.AbstractSet.removeAll(AbstractSet.java:171) at 
> org.apache.kafka.common.config.AbstractConfig.unused(AbstractConfig.java:221) 
> at 
> org.apache.kafka.common.config.AbstractConfig.logUnused(AbstractConfig.java:379)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:433)
>  ... 9 more exception.class:org.apache.kafka.common.KafkaException 
> exception.message:Failed to construct kafka producer
> {noformat}
> It appears that this is due to the fact that `used` below is a synchronized 
> set:
>  
> {code:java}
> public Set unused() {
>  Set keys = new HashSet<>(originals.keySet());
>  keys.removeAll(used);
>  return keys;
> }{code}
> It appears that `used` is being modified while removeAll is being called. 
> This may be due to the way that keys are added to it when used:
> {code:java}
> protected Object get(String key) {
>  if (!values.containsKey(key))
>  throw new ConfigException(String.format("Unknown configuration '%s'", key));
>  used.add(key);
>  return values.get(key);
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13342) LISR sent for topic queued for deletion in controller

2021-10-03 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-13342:


 Summary: LISR sent for topic queued for deletion in controller
 Key: KAFKA-13342
 URL: https://issues.apache.org/jira/browse/KAFKA-13342
 Project: Kafka
  Issue Type: Bug
Reporter: Lucas Bradstreet


Under certain conditions in some system tests a broker will be hard killed 
during a topic deletion and before its replica has moved to OfflineReplica 
state. When the broker comes back up the controller will send it a 
LeaderAndIsrRequest containing the partition causing it to recreate the 
partition locally even though it is in deleting state in the controller.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13194) LogCleaner may clean past highwatermark

2021-08-11 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-13194:


 Summary: LogCleaner may clean past highwatermark
 Key: KAFKA-13194
 URL: https://issues.apache.org/jira/browse/KAFKA-13194
 Project: Kafka
  Issue Type: Bug
Reporter: Lucas Bradstreet


Here we have the cleaning point being bounded to the active segment base offset 
and the first unstable offset. Which makes sense:
// find first segment that cannot be cleaned
// neither the active segment, nor segments with any messages closer to the 
head of the log than the minimum compaction lag time
// may be cleaned
val firstUncleanableDirtyOffset: Long = Seq(

  // we do not clean beyond the first unstable offset
  log.firstUnstableOffset,

  // the active segment is always uncleanable
  Option(log.activeSegment.baseOffset),

  // the first segment whose largest message timestamp is within a minimum 
time lag from now
  if (minCompactionLagMs > 0) \{
// dirty log segments
val dirtyNonActiveSegments = 
log.localNonActiveLogSegmentsFrom(firstDirtyOffset)
dirtyNonActiveSegments.find { s =>
  val isUncleanable = s.largestTimestamp > now - minCompactionLagMs
  debug(s"Checking if log segment may be cleaned: log='${log.name}' 
segment.baseOffset=${s.baseOffset} " +
s"segment.largestTimestamp=${s.largestTimestamp}; now - 
compactionLag=${now - minCompactionLagMs}; " +
s"is uncleanable=$isUncleanable")
  isUncleanable
}.map(_.baseOffset)
  } else None
).flatten.min
But LSO starts out as None.
  @volatile private var firstUnstableOffsetMetadata: Option[LogOffsetMetadata] 
= None
  private[log] def firstUnstableOffset: Option[Long] = 
firstUnstableOffsetMetadata.map(_.messageOffset)
For most code depending on the LSO, fetchLastStableOffsetMetadata is used to 
default it to the hwm if it's not set.
  private def fetchLastStableOffsetMetadata: LogOffsetMetadata = \{
checkIfMemoryMappedBufferClosed()

// cache the current high watermark to avoid a concurrent update 
invalidating the range check
val highWatermarkMetadata = fetchHighWatermarkMetadata

firstUnstableOffsetMetadata match {
  case Some(offsetMetadata) if offsetMetadata.messageOffset < 
highWatermarkMetadata.messageOffset =>
if (offsetMetadata.messageOffsetOnly) {
  lock synchronized {
val fullOffset = 
convertToOffsetMetadataOrThrow(offsetMetadata.messageOffset)
if (firstUnstableOffsetMetadata.contains(offsetMetadata))
  firstUnstableOffsetMetadata = Some(fullOffset)
fullOffset
  }
} else \{
  offsetMetadata
}
  case _ => highWatermarkMetadata
}
  }
This means that in the case where the hwm is prior to the active segment base, 
the log cleaner may clean past the hwm. This is most likely to occur after a 
broker restart when the log cleaner may start cleaning prior to replication 
becoming active.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12896) Group rebalance loop caused by repeated group leader JoinGroups

2021-06-04 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-12896:


 Summary: Group rebalance loop caused by repeated group leader 
JoinGroups
 Key: KAFKA-12896
 URL: https://issues.apache.org/jira/browse/KAFKA-12896
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.6.0
Reporter: Lucas Bradstreet


We encountered a strange case of a rebalance loop with the "cooperative-sticky" 
assignor. The logs show the following for several hours:

 

{{Apr 7, 2021 @ 03:58:36.040[GroupCoordinator 7]: Stabilized group mygroup 
generation 19830137 (__consumer_offsets-7)}}

{{Apr 7, 2021 @ 03:58:35.992[GroupCoordinator 7]: Preparing to rebalance 
group mygroup in state PreparingRebalance with old generation 19830136 
(__consumer_offsets-7) (reason: Updating metadata for member 
mygroup-1-7ad27e07-3784-4588-97e1-d796a74d4ecc during CompletingRebalance)}}

{{Apr 7, 2021 @ 03:58:35.988[GroupCoordinator 7]: Stabilized group mygroup 
generation 19830136 (__consumer_offsets-7)}}

{{Apr 7, 2021 @ 03:58:35.972[GroupCoordinator 7]: Preparing to rebalance 
group mygroup in state PreparingRebalance with old generation 19830135 
(__consumer_offsets-7) (reason: Updating metadata for member mygroup during 
CompletingRebalance)}}

{{Apr 7, 2021 @ 03:58:35.965[GroupCoordinator 7]: Stabilized group mygroup 
generation 19830135 (__consumer_offsets-7)}}

{{Apr 7, 2021 @ 03:58:35.953[GroupCoordinator 7]: Preparing to rebalance 
group mygroup in state PreparingRebalance with old generation 19830134 
(__consumer_offsets-7) (reason: Updating metadata for member 
mygroup-7ad27e07-3784-4588-97e1-d796a74d4ecc during CompletingRebalance)}}

{{Apr 7, 2021 @ 03:58:35.941[GroupCoordinator 7]: Stabilized group mygroup 
generation 19830134 (__consumer_offsets-7)}}

{{Apr 7, 2021 @ 03:58:35.926[GroupCoordinator 7]: Preparing to rebalance 
group mygroup in state PreparingRebalance with old generation 19830133 
(__consumer_offsets-7) (reason: Updating metadata for member mygroup during 
CompletingRebalance)}}

Every single time, it was the same member that triggered the JoinGroup and it 
was always the leader of the group.{{}}

The leader has the privilege of being able to trigger a rebalance by sending 
`JoinGroup` even if its subscription metadata has not changed. But why would it 
do so?

It is possible that this is due to the same issue or a similar bug to 
https://issues.apache.org/jira/browse/KAFKA-12890.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12791) ConcurrentModificationException in KafkaProducer constructor

2021-05-15 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-12791:


 Summary: ConcurrentModificationException in KafkaProducer 
constructor
 Key: KAFKA-12791
 URL: https://issues.apache.org/jira/browse/KAFKA-12791
 Project: Kafka
  Issue Type: Bug
Reporter: Lucas Bradstreet


Recently we have noticed multiple instances where KafkaProducers have failed to 
constructe due to the following exception:
{noformat}
org.apache.kafka.common.KafkaException: Failed to construct kafka producer at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:440) 
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:291) 
at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:318) 
java.base/java.lang.Thread.run(Thread.java:832) Caused by: 
java.util.ConcurrentModificationException at 
java.base/java.util.HashMap$HashIterator.nextNode(HashMap.java:1584) at 
java.base/java.util.HashMap$KeyIterator.next(HashMap.java:1607) at 
java.base/java.util.AbstractSet.removeAll(AbstractSet.java:171) at 
org.apache.kafka.common.config.AbstractConfig.unused(AbstractConfig.java:221) 
at 
org.apache.kafka.common.config.AbstractConfig.logUnused(AbstractConfig.java:379)
 at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:433) 
... 9 more exception.class:org.apache.kafka.common.KafkaException 
exception.message:Failed to construct kafka producer
{noformat}

It appears that this is due to the fact that `used` below is a synchronized set:



 
{code:java}
public Set unused() {
 Set keys = new HashSet<>(originals.keySet());
 keys.removeAll(used);
 return keys;
}{code}
It appears that `used` is being modified while removeAll is being called. This 
may be due to the way that keys are added to it when used:


{code:java}
protected Object get(String key) {
 if (!values.containsKey(key))
 throw new ConfigException(String.format("Unknown configuration '%s'", key));
 used.add(key);
 return values.get(key);
}{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12736) KafkaProducer.flush holds onto completed ProducerBatch(s) until flush completed

2021-04-30 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-12736:


 Summary: KafkaProducer.flush holds onto completed ProducerBatch(s) 
until flush completed
 Key: KAFKA-12736
 URL: https://issues.apache.org/jira/browse/KAFKA-12736
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Bradstreet


When flush is called a copy of the incomplete batches is made. This means that 
the full ProducerBatch(s) are held in memory until the flush has completed. For 
batches where the existing memory pool is used this is not as wasteful as the 
memory will already be returned to the pool, but for non pool memory it can 
only be GC'd after the flush has completed. Rather than use copyAll we can make 
a new array with only the produceFuture(s) and await on those.

 
{code:java}
/**
 * Mark all partitions as ready to send and block until the send is complete
 */
public void awaitFlushCompletion() throws InterruptedException {
 try {
 for (ProducerBatch batch : this.incomplete.copyAll())
 batch.produceFuture.await();
 } finally {
 this.flushesInProgress.decrementAndGet();
 }
}

{code}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12330) FetchSessionCache may cause starvation for partitions when FetchResponse is full

2021-02-16 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-12330:


 Summary: FetchSessionCache may cause starvation for partitions 
when FetchResponse is full
 Key: KAFKA-12330
 URL: https://issues.apache.org/jira/browse/KAFKA-12330
 Project: Kafka
  Issue Type: Bug
Reporter: Lucas Bradstreet


The incremental FetchSessionCache sessions deprioritizes partitions where a 
response is returned. This may happen if log metadata such as log start offset, 
hwm, etc is returned, or if data for that partition is returned.

When a fetch response fills to maxBytes, data may not be returned for 
partitions where it's available. However, the fetch response will still contain 
updates to metadata such as hwm if that metadata has changed. This can lead to 
degenerate behavior where a partition's hwm or log start offset is updated 
resulting in the next fetch being unnecessarily skipped for that partition. At 
first this appeared to be worse, as hwm updates occur frequently, but 
starvation should result in hwm movement becoming blocked, allowing a fetch to 
go through and then becoming unstuck. However, it'll still require one more 
fetch request than necessary to do so.

I believe we should only reorder the partition fetch priority if data is 
actually returned for a partition.

 
{code:java}
 {code}
{noformat}
private class PartitionIterator(val iter: FetchSession.RESP_MAP_ITER,
val updateFetchContextAndRemoveUnselected: 
Boolean)
  extends FetchSession.RESP_MAP_ITER {
  var nextElement: util.Map.Entry[TopicPartition, 
FetchResponse.PartitionData[Records]] = null

  override def hasNext: Boolean = {
while ((nextElement == null) && iter.hasNext) {
  val element = iter.next()
  val topicPart = element.getKey
  val respData = element.getValue
  val cachedPart = session.partitionMap.find(new CachedPartition(topicPart))
  val mustRespond = cachedPart.maybeUpdateResponseData(respData, 
updateFetchContextAndRemoveUnselected)
  if (mustRespond) {
nextElement = element
// Don't move partition to end of queue if we didn't actually fetch data
// This should help avoid starvation even when we are filling the fetch 
response fully while returning metadata for these partitions
if (updateFetchContextAndRemoveUnselected && respData.records != null 
&& respData.records.sizeInBytes > 0) {
  session.partitionMap.remove(cachedPart)
  session.partitionMap.mustAdd(cachedPart)
}
  } else {
if (updateFetchContextAndRemoveUnselected) {
  iter.remove()
}
  }
}
nextElement != null
  }{noformat}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12178) Improve guard rails for consumer commit when using EOS

2021-01-11 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-12178:


 Summary: Improve guard rails for consumer commit when using EOS
 Key: KAFKA-12178
 URL: https://issues.apache.org/jira/browse/KAFKA-12178
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Bradstreet


When EOS is in use, offsets are committed via the producer using the 
sendOffsetsToTransaction​ API. This is what ensures that a transaction is 
committed atomically along with the consumer offsets. Unfortunately this does 
not prevent the consumer from committing, making it easy to achieve non-EOS 
characteristics by accident. enable.auto.commit = true is the default setting 
for consumers. If this not set to false, or if commitSync/commitAsync are 
called manually offsets will no longer be committed correctly for EOS semantics.

We need more guard rails to prevent consumers from being incorrectly used in 
this way. Currently the consumers have no knowledge that a producer is even 
committing offsets and this is difficult to achieve.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12177) Retention is not idempotent

2021-01-11 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-12177:


 Summary: Retention is not idempotent
 Key: KAFKA-12177
 URL: https://issues.apache.org/jira/browse/KAFKA-12177
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Bradstreet


Kafka today applies retention in the following order:
 # Time
 # Size
 # Log start offset


Today it is possible for a segment with offsets less than the log start offset 
to contain data that is not deletable due to time retention. This means that 
it's possible for log start offset retention to unblock further deletions as a 
result of time based retention. Note that this does require a case where the 
max timestamp for each segment increases, decreases and then increases again. 
Even so it would be nice to make retention idempotent by applying log start 
offset retention first, followed by size and time. This would also be 
potentially cheaper to perform as neither log start offset and size retention 
require the maxTimestamp for a segment to be loaded from disk after a broker 
restart.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10839) Improve consumer group coordinator unavailable message

2020-12-10 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-10839:


 Summary: Improve consumer group coordinator unavailable message
 Key: KAFKA-10839
 URL: https://issues.apache.org/jira/browse/KAFKA-10839
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Bradstreet


When a consumer encounters an issue that triggers marking a coordinator as 
unknown, the error message it prints does not give context about the error that 
triggered it.
{noformat}
log.info("Group coordinator {} is unavailable or invalid, will attempt 
rediscovery", this.coordinator);{noformat}
These may be triggered by response errors or the coordinator becoming 
disconnected. We should improve this error message to make the cause clear.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10432) LeaderEpochCache is incorrectly recovered on segment recovery for epoch 0

2020-08-25 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-10432:


 Summary: LeaderEpochCache is incorrectly recovered on segment 
recovery for epoch 0
 Key: KAFKA-10432
 URL: https://issues.apache.org/jira/browse/KAFKA-10432
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.6.0, 2.5.0, 2.4.0, 2.3.0
Reporter: Lucas Bradstreet


I added some functionality to the system tests to compare epoch cache lineages 
([https://github.com/apache/kafka/pull/9213]), and I found a bug in leader 
epoch cache recovery.

The test hard kills a broker and the cache hasn't been flushed yet, and then it 
starts up and goes through log recovery. After recovery there is divergence in 
the epoch caches for epoch 0:
{noformat}
AssertionError: leader epochs for output-topic-1 didn't match
 [{0: 9393L, 2: 9441L, 4: 42656L},
 {0: 0L, 2: 9441L, 4: 42656L}, 
 {0: 0L, 2: 9441L, 4: 42656L}]  

  
{noformat}
The cache is supposed to include the offset for epoch 0 but in recovery it 
skips it 
[https://github.com/apache/kafka/blob/487b3682ebe0eefde3445b37ee72956451a9d15e/core/src/main/scala/kafka/log/LogSegment.scala#L364]
 due to 
[https://github.com/apache/kafka/commit/d152989f26f51b9004b881397db818ad6eaf0392].
 Then it stamps the epoch with a later offset when fetching from the leader.

I'm not sure why the recovery code includes the condition 
`batch.partitionLeaderEpoch > 0`. I discussed this with Jason Gustafson and he 
believes it may have been intended to avoid assigning negative epochs but is 
not sure why it was added. None of the tests fail with this check removed.
{noformat}
  leaderEpochCache.foreach { cache =>
if (batch.partitionLeaderEpoch > 0 && 
cache.latestEpoch.forall(batch.partitionLeaderEpoch > _))
  cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
  }
{noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10399) Producer and consumer clients could log IP addresses for brokers to ease debugging

2020-08-13 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-10399:


 Summary: Producer and consumer clients could log IP addresses for 
brokers to ease debugging
 Key: KAFKA-10399
 URL: https://issues.apache.org/jira/browse/KAFKA-10399
 Project: Kafka
  Issue Type: Improvement
  Components: consumer, producer 
Reporter: Lucas Bradstreet


Lag in DNS updates and resolution can cause connectivity problems in clients. 
client.dns.lookup = "use_all_dns_ips"
helps reduce the incidence of such issues, however it's still possible for DNS 
issues to cause real problems with clients.

The ZK client helpfully logs IP addresses with DNS addresses. We could do the 
same thing in the Kafka clients, e.g.
{noformat}
Group coordinator broker3.my.kafka.cluster.com/52.32.14.201:9092 (id: 3738382 
rack: null) is unavailable or invalid{noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10390) kafka-server-stop lookup is not specific enough and may kill other processes

2020-08-11 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-10390:


 Summary: kafka-server-stop lookup is not specific enough and may 
kill other processes
 Key: KAFKA-10390
 URL: https://issues.apache.org/jira/browse/KAFKA-10390
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Lucas Bradstreet


kafka-server-stop.sh picks out kafka processes by:


 
{noformat}
PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print 
$1}'){noformat}
 

This is not specific enough and may match unintended processes, e.g. one that 
even includes dependencies including *.kafka.kafka.*

**A better match would be:
{noformat}
PIDS=$(ps ax | grep ' kafka\.Kafka ' | grep java | grep -v grep | awk '{print 
$1}')
{noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9946) KAFKA-9539/StopReplicaRequest deletePartition changes may cause premature topic deletion handling in controller

2020-05-01 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9946:
---

 Summary: KAFKA-9539/StopReplicaRequest deletePartition changes may 
cause premature topic deletion handling in controller
 Key: KAFKA-9946
 URL: https://issues.apache.org/jira/browse/KAFKA-9946
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.6.0
Reporter: Lucas Bradstreet


It seems like 
[https://github.com/apache/kafka/commit/7c7d55dbd8d42f6378d13ba02d62633366a7ede8]
 does not handle StopReplicaRequest where deletePartition(s) is set to false 
correctly when another delete topic request is outstanding at the time of the 
response being received.

In the failing code it seems like two StopReplicaRequest(s) are sent, one with 
the delete flag set on partitions, and one without. It seems like the request 
without the delete flag set on any partitions is prematurely triggering the 
controller to believe that the topic was deleted successfully.

We previously didn't set a callback if the StopReplicaRequest was not a delete 
request 
[https://github.com/apache/kafka/commit/7c7d55dbd8d42f6378d13ba02d62633366a7ede8#diff-987fef43991384a3ebec5fb55e53b577L570|https://github.com/apache/kafka/commit/7c7d55dbd8d42f6378d13ba02d62633366a7ede8#diff-987fef43991384a3ebec5fb55e53b577L570,].
 Now we set it unconditionally 
[https://github.com/apache/kafka/commit/7c7d55dbd8d42f6378d13ba02d62633366a7ede8#diff-987fef43991384a3ebec5fb55e53b577L570,]
 but the callback does not distinguish between the partition states where a 
delete was being performed and where it was not. This happens on all IBP 
versions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9864) Avoid expensive QuotaViolationException usage

2020-04-14 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9864:
---

 Summary: Avoid expensive QuotaViolationException usage
 Key: KAFKA-9864
 URL: https://issues.apache.org/jira/browse/KAFKA-9864
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Lucas Bradstreet


QuotaViolationException generates stack traces and uses String.format in 
exception generation. QuotaViolationException is used for control flow and 
these costs add up even though the exception contents are ignored.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9820) validateMessagesAndAssignOffsetsCompressed allocates batch iterator which is not used

2020-04-03 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9820:
---

 Summary: validateMessagesAndAssignOffsetsCompressed allocates 
batch iterator which is not used
 Key: KAFKA-9820
 URL: https://issues.apache.org/jira/browse/KAFKA-9820
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Lucas Bradstreet


KAFKA-8106 added a new skip key/value iterator that reduces allocations 
[https://github.com/apache/kafka/commit/3e9d1c1411c5268de382f9dfcc95bdf66d0063a0].

Unfortunately in LogValidator it creates that iterator but it never uses it, 
and this is quite expensive in terms of allocations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8963) Benchmark and optimize incremental fetch session handler

2020-02-28 Thread Lucas Bradstreet (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Bradstreet resolved KAFKA-8963.
-
Fix Version/s: 2.5.0
   Resolution: Fixed

> Benchmark and optimize incremental fetch session handler
> 
>
> Key: KAFKA-8963
> URL: https://issues.apache.org/jira/browse/KAFKA-8963
> Project: Kafka
>  Issue Type: Task
>Reporter: Lucas Bradstreet
>Assignee: Lucas Bradstreet
>Priority: Major
> Fix For: 2.5.0
>
>
> The FetchSessionHandler is a cause of high CPU usage in the replica fetcher 
> for brokers with high partition counts. A jmh benchmark should be added and 
> the incremental fetch session handling should be measured and optimized.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9577) Client encountering SASL_HANDSHAKE protocol version errors on 2.5 / trunk

2020-02-19 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9577:
---

 Summary: Client encountering SASL_HANDSHAKE protocol version 
errors on 2.5 / trunk
 Key: KAFKA-9577
 URL: https://issues.apache.org/jira/browse/KAFKA-9577
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.5.0
Reporter: Lucas Bradstreet


I am trying 2.5.0 with sasl turned on and my consumer clients receive:
{noformat}
org.apache.kafka.common.errors.UnsupportedVersionException: The SASL_HANDSHAKE 
protocol does not support version 2
{noformat}
I believe this is due to 
[https://github.com/apache/kafka/commit/0a2569e2b9907a1217dd50ccbc320f8ad0b42fd0]
 which added flexible version support and bumped the protocol version.

It appears that the SaslClientAuthenticator uses the max version for 
SASL_HANDSHAKE returned by the broker's api versions request, and then uses 
that version even though it may not support it. See 
[https://github.com/apache/kafka/blob/eb09efa9ac79efa484307bdcf03ac8eb8a3a94e2/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java#L290].
 

This may make it hard to ever evolve this schema. In the short term I suggest 
we roll back the version bump and flexible schema until we figure out a path 
forward.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9137) Maintenance of FetchSession cache causing FETCH_SESSION_ID_NOT_FOUND in live sessions

2020-02-15 Thread Lucas Bradstreet (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Bradstreet resolved KAFKA-9137.
-
Resolution: Fixed

Closed by [https://github.com/apache/kafka/pull/7640]

> Maintenance of FetchSession cache causing FETCH_SESSION_ID_NOT_FOUND in live 
> sessions
> -
>
> Key: KAFKA-9137
> URL: https://issues.apache.org/jira/browse/KAFKA-9137
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Lucas Bradstreet
>Priority: Major
>
> We have recently seen cases where brokers end up in a bad state where fetch 
> session evictions occur at a high rate (> 16 per second) after a roll. This 
> increase in eviction rate included the following pattern in our logs:
>  
> {noformat}
> broker 6: October 31st 2019, 17:52:45.496 Created a new incremental 
> FetchContext for session id 2046264334, epoch 9790: added (), updated (), 
> removed ()
> broker 6: October 31st 2019, 17:52:45.496 Created a new incremental 
> FetchContext for session id 2046264334, epoch 9791: added (), updated (), 
> removed () broker 6: October 31st 2019, 17:52:45.500 Created a new 
> incremental FetchContext for session id 2046264334, epoch 9792: added (), 
> updated (lkc-7nv6o_tenant_soak_topic_144p-67), removed () 
> broker 6: October 31st 2019, 17:52:45.501 Created a new incremental 
> FetchContext for session id 2046264334, epoch 9793: added (), updated 
> (lkc-7nv6o_tenant_soak_topic_144p-59, lkc-7nv6o_tenant_soak_topic_144p-123, 
> lkc-7nv6o_tenant_soak_topic_144p-11, lkc-7nv6o_tenant_soak_topic_144p-3, 
> lkc-7nv6o_tenant_soak_topic_144p-67, lkc-7nv6o_tenant_soak_topic_144p-115), 
> removed () 
> broker 6: October 31st 2019, 17:52:45.501 Evicting stale FetchSession 
> 2046264334. 
> broker 6: October 31st 2019, 17:52:45.502 Session error for 2046264334: no 
> such session ID found. 
> broker 4: October 31st 2019, 17:52:45.813 [ReplicaFetcher replicaId=4, 
> leaderId=6, fetcherId=0] Node 6 was unable to process the fetch request with 
> (sessionId=2046264334, epoch=9793): FETCH_SESSION_ID_NOT_FOUND.  
> {noformat}
> This pattern appears to be problematic for two reasons. Firstly, the replica 
> fetcher for broker 4 was clearly able to send multiple incremental fetch 
> requests to broker 6, and receive replies, and did so right up to the point 
> where broker 6 evicted its fetch session within milliseconds of multiple 
> fetch requests. The second problem is that replica fetchers are considered 
> privileged for the fetch session cache, and should not be evicted by consumer 
> fetch sessions. This cluster only has 12 brokers and 1000 fetch session cache 
> slots (the default for max.incremental.fetch.session.cache.slots), and it 
> thus very unlikely that this session should have been evicted by another 
> replica fetcher session.
> This cluster also appears to be causing cycles of fetch session evictions 
> where the cluster never stabilizes into a state where fetch sessions are not 
> evicted. The above logs are the best example I could find of a case where a 
> session clearly should not have been evicted.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9513) Failed GroupMetadataManager loadGroupAndOffsets will consider groups as loaded

2020-02-05 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9513:
---

 Summary: Failed GroupMetadataManager loadGroupAndOffsets will 
consider groups as loaded
 Key: KAFKA-9513
 URL: https://issues.apache.org/jira/browse/KAFKA-9513
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Bradstreet


 

Bugs in group loading such as https://issues.apache.org/jira/browse/KAFKA-8896 
may cause errors loading offsets. loadGroupsAndOffsets's finally block adds the 
offsets partition to ownedPartitions and removes it from loadingPartitions even 
if this process does not succeed.
{code:java}
private[group] def loadGroupsAndOffsets(topicPartition: TopicPartition, 
onGroupLoaded: GroupMetadata => Unit): Unit = {
  try {
val startMs = time.milliseconds()
doLoadGroupsAndOffsets(topicPartition, onGroupLoaded)
val endMs = time.milliseconds()
val timeLapse = endMs - startMs
partitionLoadSensor.record(timeLapse, endMs, false)
info(s"Finished loading offsets and group metadata from $topicPartition in 
$timeLapse milliseconds.")
  } catch {
case t: Throwable => error(s"Error loading offsets from $topicPartition", t)
  } finally {
inLock(partitionLock) {
  ownedPartitions.add(topicPartition.partition)
  loadingPartitions.remove(topicPartition.partition)
}
  }
}
{code}
This means that the group is considered loaded by:
{code:java}
def isGroupLoading(groupId: String): Boolean = 
isPartitionLoading(partitionFor(groupId))
{code}
 

Which may result in consumers being able to load the wrong offsets.

We should consider whether we should be more defensive and instead mark the 
partition as failed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9401) High lock contention for kafka.server.FetchManager.newContext

2020-01-11 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9401:
---

 Summary: High lock contention for 
kafka.server.FetchManager.newContext
 Key: KAFKA-9401
 URL: https://issues.apache.org/jira/browse/KAFKA-9401
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Lucas Bradstreet


kafka.server.FetchManager.newContext takes out what is essentially a global 
fetch lock on kafka.server.FetchSessionCache, for updates to not only the 
FetchSessionCache but the also update the fetch sessions stored with in it. 
This causes a high amount of lock contention for fetches, as every fetch 
request must go through this lock.

I have taken an async-profiler lock profile on a high throughput cluster, and I 
see around 25s of waiting on this lock for a sixty second profile.
--- 25818577497 ns (20.84%), 5805 samples
  [ 0] kafka.server.FetchSessionCache
  [ 1] kafka.server.FetchManager.newContext
  [ 2] kafka.server.KafkaApis.handleFetchRequest
  [ 3] kafka.server.KafkaApis.handle
  [ 4] kafka.server.KafkaRequestHandler.run
  [ 5] java.lang.Thread.run



 
{code:java}

  cache.synchronized {
cache.get(reqMetadata.sessionId) match {
  case None => {
debug(s"Session error for ${reqMetadata.sessionId}: no such session ID 
found.")
new SessionErrorContext(Errors.FETCH_SESSION_ID_NOT_FOUND, reqMetadata)
  }
  case Some(session) => session.synchronized {
if (session.epoch != reqMetadata.epoch) {
  debug(s"Session error for ${reqMetadata.sessionId}: expected epoch " +
s"${session.epoch}, but got ${reqMetadata.epoch} instead.");
  new SessionErrorContext(Errors.INVALID_FETCH_SESSION_EPOCH, 
reqMetadata)
} else {
  val (added, updated, removed) = session.update(fetchData, toForget, 
reqMetadata)
  if (session.isEmpty) {
debug(s"Created a new sessionless FetchContext and closing session 
id ${session.id}, " +
  s"epoch ${session.epoch}: after removing 
${partitionsToLogString(removed)}, " +
  s"there are no more partitions left.")
cache.remove(session)
new SessionlessFetchContext(fetchData)
  } else {
cache.touch(session, time.milliseconds())
session.epoch = JFetchMetadata.nextEpoch(session.epoch)
debug(s"Created a new incremental FetchContext for session id 
${session.id}, " +
  s"epoch ${session.epoch}: added ${partitionsToLogString(added)}, 
" +
  s"updated ${partitionsToLogString(updated)}, " +
  s"removed ${partitionsToLogString(removed)}")
new IncrementalFetchContext(time, reqMetadata, session)
  }
}
  }
}
  }

{code}
Contention has been made worse by the solution for "KAFKA-9137: Fix incorrect 
FetchSessionCache eviction logic" 
([https://github.com/apache/kafka/pull/7640),] as the cache is correctly 
touched now, whereas previously the touch was being skipped.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9393) DeleteRecords triggers extreme lock contention for large partition directories

2020-01-09 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9393:
---

 Summary: DeleteRecords triggers extreme lock contention for large 
partition directories
 Key: KAFKA-9393
 URL: https://issues.apache.org/jira/browse/KAFKA-9393
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 2.4.0, 2.3.0, 2.2.0
Reporter: Lucas Bradstreet


DeleteRecords, frequently used by KStreams triggers a 
Log.maybeIncrementLogStartOffset call, calling 
kafka.log.ProducerStateManager.listSnapshotFiles which calls 
java.io.File.listFiles on the partition dir. The time taken to list this 
directory can be extreme for partitions with many small segments (e.g 2) 
taking multiple seconds to finish. This causes lock contention for the log, and 
if produce requests are also occurring for the same log can cause a majority of 
request handler threads to become blocked waiting for the DeleteRecords call to 
finish.

I believe this is a problem going back to the initial implementation of the 
transactional producer, but I need to confirm how far back it goes.

One possible solution is to maintain a producer state snapshot aligned to the 
log segment, and simply delete it whenever we delete a segment. This would 
ensure that we never have to perform a directory scan.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9359) Controller does not handle requests while broker is being shutdown

2020-01-02 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9359:
---

 Summary: Controller does not handle requests while broker is being 
shutdown
 Key: KAFKA-9359
 URL: https://issues.apache.org/jira/browse/KAFKA-9359
 Project: Kafka
  Issue Type: Improvement
  Components: controller, core
Reporter: Lucas Bradstreet


When a broker is shutdown it stops accepting requests, as it immediately socket 
server and handler pools are shutdown. It does so before shutting down the 
controller and or closing the log manager, and this may take some time to 
complete. During this time it will remain the controller as the zkClient has 
not been closed. We should improve the shutdown process such that a broker does 
not remain the controller while it is unable to accept requests that is 
expected of a controller.

See also https://issues.apache.org/jira/browse/KAFKA-9358



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9358) Explicitly resign controller leadership and broker znode

2020-01-02 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9358:
---

 Summary: Explicitly resign controller leadership and broker znode
 Key: KAFKA-9358
 URL: https://issues.apache.org/jira/browse/KAFKA-9358
 Project: Kafka
  Issue Type: Improvement
  Components: controller, core
Reporter: Lucas Bradstreet
Assignee: Lucas Bradstreet


When shutting down the controller the broker shuts down the controller and then 
closes the zookeeper connection. Closing the zookeeper connection results in 
ephemeral nodes being removed. It is currently critical that the zkClient is 
closed after the controller is shutdown, otherwise a controller election will 
not occur if the broker being shutdown is currently the controller.

We should consider resigning leadership explicitly in the controller rather 
than relying on the zookeeper client being closed. This would ensure that any 
changes in shutdown order cannot lead to periods where a broker's controller 
component is stopped while also maintaining leadership until the zkClient is 
closed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9338) Incremental fetch sessions do not maintain or use leader epoch for fencing purposes

2019-12-27 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9338:
---

 Summary: Incremental fetch sessions do not maintain or use leader 
epoch for fencing purposes
 Key: KAFKA-9338
 URL: https://issues.apache.org/jira/browse/KAFKA-9338
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.4.0, 2.3.0, 2.2.0, 2.1.0
Reporter: Lucas Bradstreet


KIP-320 adds the ability to fence replicas by detecting stale leader epochs 
from followers, and helping consumers handle unclean truncation.

Unfortunately the incremental fetch session handling does not maintain or use 
the leader epoch in the fetch session cache. As a result, it does not appear 
that the leader epoch is used for fencing a majority of the time. I'm not sure 
if this is only the case after incremental fetch sessions are established - it 
may be the case that the first "full" fetch session is safe.

Optional.empty is returned for the FetchRequest.PartitionData here:

[https://github.com/apache/kafka/blob/a4cbdc6a7b3140ccbcd0e2339e28c048b434974e/core/src/main/scala/kafka/server/FetchSession.scala#L111]

I believe this affects brokers from 2.1.0 when fencing was improved on the 
replica fetcher side, and 2.3.0 and above for consumers, which is when client 
side truncation detection was added on the consumer side.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9312) KafkaProducer flush behavior does not guarantee send completion under record batch splitting

2019-12-17 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9312:
---

 Summary: KafkaProducer flush behavior does not guarantee send 
completion under record batch splitting
 Key: KAFKA-9312
 URL: https://issues.apache.org/jira/browse/KAFKA-9312
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.4.0, 2.3.0, 2.2.0, 2.1.0
Reporter: Lucas Bradstreet


The KafkaProducer flush call guarantees that all records that have been sent at 
time of the flush call will be either sent successfully or will result in an 
error.

The KafkaProducer will split record batches upon receiving a MESSAGE_TOO_LARGE 
error from the broker. However the flush behavior relies on the accumulator 
checking incomplete sends that exist at the time of the flush call.
{code:java}
public void awaitFlushCompletion() throws InterruptedException {
try {
for (ProducerBatch batch : this.incomplete.copyAll())
batch.produceFuture.await();
} finally {
this.flushesInProgress.decrementAndGet();
}
}{code}
When large record batches are split, the batch producerFuture in question is 
completed, and new batches added to the incomplete list of record batches. This 
will break the flush guarantee as awaitFlushCompletion will finish without 
awaiting the corresponding batches.

This is demonstrated in a test case that can be found at 
[https://github.com/lbradstreet/kafka/commit/733a683273c31823df354d0a785cb2c24365735a#diff-0b8da0c7ceecaa1f00486dadb53208b1R2339]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9200) ListOffsetRequest missing error response for v5

2019-11-17 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9200:
---

 Summary: ListOffsetRequest missing error response for v5
 Key: KAFKA-9200
 URL: https://issues.apache.org/jira/browse/KAFKA-9200
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.3.0
Reporter: Lucas Bradstreet
Assignee: Lucas Bradstreet


It seems ListOffsetResponse getErrorResponse is missing a a case for version 5. 
I'd have hoped that this kind of case would be picked up by KafkaApisTest.
{noformat}
java.lang.IllegalArgumentException: Version 5 is not valid. Valid versions for 
ListOffsetRequest are 0 to 5


   
at 
org.apache.kafka.common.requests.ListOffsetRequest.getErrorResponse(ListOffsetRequest.java:282)




at 
kafka.server.KafkaApis.sendErrorOrCloseConnection(KafkaApis.scala:3062) 



   
at 
kafka.server.KafkaApis.sendErrorResponseMaybeThrottle(KafkaApis.scala:3045) 



   
at kafka.server.KafkaApis.handleError(KafkaApis.scala:3027) 



  
at kafka.server.KafkaApis.handle(KafkaApis.scala:209)   



  
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:78)   



  
at java.lang.Thread.run(Thread.java:748)

{noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9193) org.apache.kafka.common.utils.Timer should use monotonic clock

2019-11-14 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9193:
---

 Summary: org.apache.kafka.common.utils.Timer should use monotonic 
clock
 Key: KAFKA-9193
 URL: https://issues.apache.org/jira/browse/KAFKA-9193
 Project: Kafka
  Issue Type: Bug
Reporter: Lucas Bradstreet


utils.Timer uses System.currentTimeMillis to implement blocking methods with 
timeouts. We should not rely on a non-monotonic clock and should instead switch 
this to Time.hiResClockMs() (which uses System.nanoTime).

When we do so we should revert [https://github.com/apache/kafka/pull/7683] 
which was caused by inaccuracies in our current approach (the test was good, 
the code is bad).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9137) Maintenance of FetchSession cache causing FETCH_SESSION_ID_NOT_FOUND in live sessions

2019-11-04 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9137:
---

 Summary: Maintenance of FetchSession cache causing 
FETCH_SESSION_ID_NOT_FOUND in live sessions
 Key: KAFKA-9137
 URL: https://issues.apache.org/jira/browse/KAFKA-9137
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Lucas Bradstreet


We have recently seen cases where brokers end up in a bad state where fetch 
session evictions occur at a high rate (> 16 per second) after a roll. This 
increase in eviction rate included the following pattern in our logs:

 
{noformat}
broker 6: October 31st 2019, 17:52:45.496 Created a new incremental 
FetchContext for session id 2046264334, epoch 9790: added (), updated (), 
removed ()

broker 6: October 31st 2019, 17:52:45.496 Created a new incremental 
FetchContext for session id 2046264334, epoch 9791: added (), updated (), 
removed () broker 6: October 31st 2019, 17:52:45.500 Created a new incremental 
FetchContext for session id 2046264334, epoch 9792: added (), updated 
(lkc-7nv6o_tenant_soak_topic_144p-67), removed () 

broker 6: October 31st 2019, 17:52:45.501 Created a new incremental 
FetchContext for session id 2046264334, epoch 9793: added (), updated 
(lkc-7nv6o_tenant_soak_topic_144p-59, lkc-7nv6o_tenant_soak_topic_144p-123, 
lkc-7nv6o_tenant_soak_topic_144p-11, lkc-7nv6o_tenant_soak_topic_144p-3, 
lkc-7nv6o_tenant_soak_topic_144p-67, lkc-7nv6o_tenant_soak_topic_144p-115), 
removed () 

broker 6: October 31st 2019, 17:52:45.501 Evicting stale FetchSession 
2046264334. 

broker 6: October 31st 2019, 17:52:45.502 Session error for 2046264334: no such 
session ID found. 

broker 4: October 31st 2019, 17:52:45.813 [ReplicaFetcher replicaId=4, 
leaderId=6, fetcherId=0] Node 6 was unable to process the fetch request with 
(sessionId=2046264334, epoch=9793): FETCH_SESSION_ID_NOT_FOUND.  
{noformat}
This pattern appears to be problematic for two reasons. Firstly, the replica 
fetcher for broker 4 was clearly able to send multiple incremental fetch 
requests to broker 6, and receive replies, and did so right up to the point 
where broker 6 evicted its fetch session within milliseconds of multiple fetch 
requests. The second problem is that replica fetchers are considered privileged 
for the fetch session cache, and should not be evicted by consumer fetch 
sessions. This cluster only has 12 brokers and 1000 fetch session cache slots 
(the default for max.incremental.fetch.session.cache.slots), and it thus very 
unlikely that this session should have been evicted by another replica fetcher 
session.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9048) Improve partition scalability in replica fetcher

2019-10-15 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-9048:
---

 Summary: Improve partition scalability in replica fetcher
 Key: KAFKA-9048
 URL: https://issues.apache.org/jira/browse/KAFKA-9048
 Project: Kafka
  Issue Type: Task
  Components: core
Reporter: Lucas Bradstreet


https://issues.apache.org/jira/browse/KAFKA-9039 
([https://github.com/apache/kafka/pull/7443]) improves the performance of the 
replica fetcher (at both small and large numbers of partitions), but it does 
not improve its complexity or scalability in the number of partitions.

I took a profile using async-profiler for the 1000 partition JMH replica 
fetcher benchmark. The big remaining culprits are:
 * ~18% looking up logStartOffset
 * ~45% FetchSessionHandler$Builder.add
 * ~19% FetchSessionHandler$Builder.build

*Suggestions*
 #  The logStartOffset is looked up for every partition on each doWork pass. 
This requires a hashmap lookup even though the logStartOffset changes rarely. 
If the replica fetcher could be notified of updates to the logStartOffset, then 
we could reduce the overhead to a function of the number of updates to the 
logStartOffset instead of O(n) on each pass.
 #  The use of FetchSessionHandler means that we maintain a partitionStates 
hashmap in the replica fetcher, and a sessionPartitions hashmap in the 
FetchSessionHandler. On each incremental fetch session pass, we need to 
reconcile these two hashmaps to determine which partitions were added/updated 
and which partitions were removed. This reconciliation process is especially 
expensive, requiring multiple passes over the fetching partitions, and hashmap 
remove and puts for most partitions. The replica fetcher could be smarter by 
maintaining the fetch session *updated* hashmap containing 
FetchRequest.PartitionData(s) directly, as well as *removed* partitions list so 
that these do not need to be generated by reconciled on each fetch pass.
 #  maybeTruncate requires an O(n) pass over the elements in partitionStates 
even if there are no partitions in truncating state. If we can maintain some 
additional state about whether truncating partitions exist in partitionStates, 
or if we could separate these states into a separate data structure, we would 
not need to iterate across all partitions on every doWork pass. I’ve seen 
clusters where this work takes about 0.5%-1% of CPU, which is minor but will 
become more substantial as the number of partitions increases.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-8963) Benchmark and optimize incremental fetch session handler

2019-09-30 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-8963:
---

 Summary: Benchmark and optimize incremental fetch session handler
 Key: KAFKA-8963
 URL: https://issues.apache.org/jira/browse/KAFKA-8963
 Project: Kafka
  Issue Type: Task
Reporter: Lucas Bradstreet


The FetchSessionHandler is a cause of high CPU usage in the replica fetcher for 
brokers with high partition counts. We should add a jmh benchmark and optimize 
the incremental fetch session building.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8899) Optimize Partition.maybeIncrementLeaderHW

2019-09-12 Thread Lucas Bradstreet (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Bradstreet resolved KAFKA-8899.
-
Resolution: Duplicate

Duplicate of https://issues.apache.org/jira/browse/KAFKA-8841

> Optimize Partition.maybeIncrementLeaderHW
> -
>
> Key: KAFKA-8899
> URL: https://issues.apache.org/jira/browse/KAFKA-8899
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Affects Versions: 2.3.0, 2.2.1
>Reporter: Lucas Bradstreet
>Priority: Major
>
> Partition.maybeIncrementLeaderHW is in the hot path for 
> ReplicaManager.updateFollowerFetchState. When replicating between brokers 
> with high partition counts, maybeIncrementLeaderHW becomes expensive, with 
> much of the time going to calling Partition.remoteReplicas which performs a 
> toSet conversion. maybeIncrementLeaderHW should avoid generating any 
> intermediate collections when calculating the new HWM.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (KAFKA-8899) Optimize Partition.maybeIncrementLeaderHW

2019-09-11 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-8899:
---

 Summary: Optimize Partition.maybeIncrementLeaderHW
 Key: KAFKA-8899
 URL: https://issues.apache.org/jira/browse/KAFKA-8899
 Project: Kafka
  Issue Type: Task
  Components: core
Affects Versions: 2.2.1, 2.3.0
Reporter: Lucas Bradstreet


Partition.maybeIncrementLeaderHW is in the hot path for 
ReplicaManager.updateFollowerFetchState. When replicating between brokers with 
high partition counts, maybeIncrementLeaderHW becomes expensive, with much of 
the time going to calling Partition.remoteReplicas which performs a toSet 
conversion. maybeIncrementLeaderHW should avoid generating any intermediate 
collections when calculating the new HWM.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (KAFKA-8872) Improvements to controller "deleting" state / topic Identifiers

2019-09-04 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-8872:
---

 Summary: Improvements to controller "deleting" state /  topic 
Identifiers
 Key: KAFKA-8872
 URL: https://issues.apache.org/jira/browse/KAFKA-8872
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Bradstreet


Kafka currently uniquely identifies a topic by its name. This is generally 
sufficient, but there are flaws in this scheme if a topic is deleted and 
recreated with the same name. As a result, Kafka attempts to prevent these 
classes of issues by ensuring a topic is deleted from all replicas before 
completing a deletion. This solution is not perfect, as it is possible for 
partitions to be reassigned from brokers while they are down, and there are no 
guarantees that this state will ever be cleaned up and will not cause issues in 
the future.

As the controller must wait for all replicas to delete their local partitions, 
deletes can also become blocked, preventing topics from being created with the 
same name until the deletion is complete on all replicas. This can mean that 
downtime for a single broker can effectively cause a complete outage for 
everyone producing/consuming to that topic name, as the topic cannot be 
recreated without manual intervention.

Unique topic IDs could help address this issue by associating a unique ID with 
each topic, ensuring a newly created topic with a previously used name cannot 
be confused with a previous topic with that name.

 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (KAFKA-8499) Ducker missing java commands in path for ducker user on openjdk docker images

2019-06-06 Thread Lucas Bradstreet (JIRA)
Lucas Bradstreet created KAFKA-8499:
---

 Summary: Ducker missing java commands in path for ducker user on 
openjdk docker images
 Key: KAFKA-8499
 URL: https://issues.apache.org/jira/browse/KAFKA-8499
 Project: Kafka
  Issue Type: Improvement
  Components: system tests
Affects Versions: 2.2.0, 2.3.0
Reporter: Lucas Bradstreet


openjdk:8/openjdk:11 used to include java and other java programs in /usr/bin. 
It has since been moved to ```/usr/local/openjdk-VERSION/bin```, which will 
cause problems when the system tests invoke any java related utility as well as 
java itself if the user is using a later image with the same tag. The openjdk 
images have been updated with the same tag, so this can happen suddenly without 
any other code changes if the new version is pulled.

We need to ensure the ducker user that is created in the Dockerfile includes 
the new location that java is installed to is included in its path.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8125) Check for topic existence in CreateTopicsRequest prior to creating replica assignment

2019-03-18 Thread Lucas Bradstreet (JIRA)
Lucas Bradstreet created KAFKA-8125:
---

 Summary: Check for topic existence in CreateTopicsRequest prior to 
creating replica assignment
 Key: KAFKA-8125
 URL: https://issues.apache.org/jira/browse/KAFKA-8125
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 2.1.1
Reporter: Lucas Bradstreet


Imagine the following pattern to ensure topic creation in an application:
 # Attempt to create a topic with # partitions P and replication factor R.
 #  If topic creation fails with TopicExistsException, continue. If topic 
creation succeeds, continue, the topic now exists.

This normally works fine. However, if the topic has already been created, but 
if the number of live brokers < R, then the topic creation will fail an 
org.apache.kafka.common.errors.InvalidReplicationFactorException, even though 
the topic already exists.

This could be avoided if we check whether the topic exists prior to calling 
AdminUtils.assignReplicasToBrokers.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7410) Rack aware partitions assignment create unbalanced broker assignments on unbalanced racks

2018-09-13 Thread Lucas Bradstreet (JIRA)
Lucas Bradstreet created KAFKA-7410:
---

 Summary: Rack aware partitions assignment create unbalanced broker 
assignments on unbalanced racks
 Key: KAFKA-7410
 URL: https://issues.apache.org/jira/browse/KAFKA-7410
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 1.1.1
Reporter: Lucas Bradstreet
 Attachments: AdminUtilsTest.scala

AdminUtils creates a bad partition assignment when the number of brokers on 
each rack is unbalanced, e.g. 80 brokers rack A, 20 brokers rack B, 15 brokers 
rack C. Under such a scenario, a single broker from rack C may be assigned over 
and over again, when more balanced allocations exist.

kafka.admin.AdminUtils.getRackAlternatedBrokerList is supposed to create a list 
of brokers alternating by rack, however once it runs out of brokers on the 
racks with fewer brokers, it ends up placing a run of brokers from the same 
rack together as rackIterator.hasNext will return false for the other racks.
{code:java}
while (result.size < brokerRackMap.size) {
  val rackIterator = brokersIteratorByRack(racks(rackIndex))
  if (rackIterator.hasNext)
result += rackIterator.next()
  rackIndex = (rackIndex + 1) % racks.length
}{code}
Once assignReplicasToBrokersRackAware hits the run of brokers from the same 
rack, when choosing the replicas to go along with the leader on the rack with 
the most brokers e.g. C, it will skip all of the C brokers until it wraps 
around to the first broker in the alternated list, and choose the first broker 
in the alternated list.

 
{code:java}
if ((!racksWithReplicas.contains(rack) || racksWithReplicas.size == numRacks)
&& (!brokersWithReplicas.contains(broker) || brokersWithReplicas.size == 
numBrokers)) {
replicaBuffer += broker
racksWithReplicas += rack
brokersWithReplicas += broker
done = true
}
k += 1
{code}
It does so for each of the remaining brokers for C, choosing the first broker 
in the alternated list until it's allocated all of the partitions.

See the attached sample code for more details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)