[jira] [Created] (KAFKA-8921) Avoid excessive info logs in the client side for incremental fetch

2019-09-18 Thread Zhanxiang (Patrick) Huang (Jira)
Zhanxiang (Patrick) Huang created KAFKA-8921:


 Summary: Avoid excessive info logs in the client side for 
incremental fetch
 Key: KAFKA-8921
 URL: https://issues.apache.org/jira/browse/KAFKA-8921
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Zhanxiang (Patrick) Huang
Assignee: Zhanxiang (Patrick) Huang


 

Currently in FetchSessionHandler::handleResponse, the following info log will 
get printed out excessively when the session is evicted from the server-side 
cache even though there is nothing wrong with the fetch request and client 
cannot do much to improve it.
{noformat}
Node xxx was unable to process the fetch request with (sessionId=xxx, 
epoch=xxx): FETCH_SESSION_ID_NOT_FOUND.
 
{noformat}
 

Moreover, when the fetch request gets throttled, the following info logs will 
also get printed out, which are very misleading.
{noformat}
Node xxx sent an invalid full fetch response with ... 
Node xxx sent an invalid incremental fetch response with ...
{noformat}
 

We should avoid logging these things in INFO level and print out more 
informative logs for throttling.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-8668) Improve broker shutdown time

2019-07-15 Thread Zhanxiang (Patrick) Huang (JIRA)
Zhanxiang (Patrick) Huang created KAFKA-8668:


 Summary: Improve broker shutdown time
 Key: KAFKA-8668
 URL: https://issues.apache.org/jira/browse/KAFKA-8668
 Project: Kafka
  Issue Type: Improvement
Reporter: Zhanxiang (Patrick) Huang
Assignee: Zhanxiang (Patrick) Huang


 During LogManager shutdown, we need to call {{timeIndex.maybeAppend()}} on 
each log segment close. Since in 2.0 we completely skip sanity check and do 
lazy mmap on indexes of segments below recovery point, in 
{{timeIndex.maybeAppend()}} we may need to mmap the time index file for the 
inactive segment if it didn't get mmap before, which is not neccessary becuase 
the time index hasn't changed at all.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8667) Improve leadership transition time

2019-07-15 Thread Zhanxiang (Patrick) Huang (JIRA)
Zhanxiang (Patrick) Huang created KAFKA-8667:


 Summary: Improve leadership transition time
 Key: KAFKA-8667
 URL: https://issues.apache.org/jira/browse/KAFKA-8667
 Project: Kafka
  Issue Type: Improvement
Reporter: Zhanxiang (Patrick) Huang
Assignee: Zhanxiang (Patrick) Huang


When the replica fetcher thread processes fetch response, it will hold the 
{{partitionMapLock}}. If at the same time, a LeaderAndIsr request comes in, it 
will be blocked at the end of its processing when calling 
{{shutdownIdleFetcherThread}} because it will need to wait for the 
{{partitionMapLock}} of each replica fetcher thread to be acquired to check 
whether there is any partition assigned to each fetcher and the request handler 
thread performs this check sequentially for the fetcher threads

For example, in a cluster with 20 brokers and num.replica.fetcher.thread set to 
32, if each fetcher thread holds lock for a little bit longer, the total time 
for the request handler thread to finish shutdownIdleFetcherThread can be a lot 
larger due to waiting for the partitionMapLock for a longer time for each 
fetcher thread. If the LeaderAndIsr gets blocked for >request.timeout.ms 
(default to 30s) in the broker, request send thread in the controller side will 
timeout while waiting for the response and try to establish a new connection to 
the broker and re-send the request, which will break in-order delivery because 
we will have more than one channel talking to the broker. Moreover, this may 
make the lock contention problem worse or saturate request handler threads 
because duplicate control requests are sent to the broker for multiple time. In 
our own testing, we saw up to *8 duplicate LeaderAndIsrRequest* being sent to 
the broker during bounce and the 99th LeaderAndIsr local time goes up to ~500s.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8571) Not complete delayed produce requests when processing StopReplicaRequest causing high produce latency for acks=all

2019-06-19 Thread Zhanxiang (Patrick) Huang (JIRA)
Zhanxiang (Patrick) Huang created KAFKA-8571:


 Summary: Not complete delayed produce requests when processing 
StopReplicaRequest causing high produce latency for acks=all
 Key: KAFKA-8571
 URL: https://issues.apache.org/jira/browse/KAFKA-8571
 Project: Kafka
  Issue Type: Bug
Reporter: Zhanxiang (Patrick) Huang
Assignee: Zhanxiang (Patrick) Huang


Currently a broker will only attempt to complete delayed requests upon 
highwater mark changes and receiving LeaderAndIsrRequest. When a broker 
receives StopReplicaRequest, it will not try to complete delayed operations 
including delayed produce for acks=all, which can cause the producer to timeout 
even though the producer should have attempted to talk to the new leader faster 
if a NotLeaderForPartition error is sent.

This can happen during partition reassignment when controller is trying to kick 
the previous leader out of the replica set. It this case, controller will only 
send StopReplicaRequest (not LeaderAndIsrRequest) to the previous leader in the 
replica set shrink phase. Here is an example:
{noformat}
During Reassign the replica set of partition A from [B1, B2] to [B2, B3]:
t0: Controller expands the replica set to [B1, B2, B3]

t1: B1 receives produce request PR on partition A with acks=all and timetout T. 
B1 puts PR into the DelayedProducePurgatory with timeout T.

t2: Controller elects B2 as the new leader and shrinks the replica set fo [B2, 
B3]. LeaderAndIsrRequests are sent to B2 and B3. StopReplicaRequest is sent to 
B!.

t3: B1 receives StopReplicaRequest but doesn't try to comeplete PR.

If PR cannot be fullfilled by t3, and t1 + T > t3, PR will eventually time out 
in the purgatory and producer will eventually time out the produce 
request.{noformat}
Since it is possible for the leader to receive only a StopReplicaRequest 
(without receiving any LeaderAndIsrRequest) to leave the replica set, a fix for 
this issue is to also try to complete delay operations in processing 
StopReplicaRequest.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8069) Committed offsets get cleaned up right after the coordinator loading them back from __consumer_offsets in broker with old inter-broker protocol version (< 2.2)

2019-03-07 Thread Zhanxiang (Patrick) Huang (JIRA)
Zhanxiang (Patrick) Huang created KAFKA-8069:


 Summary: Committed offsets get cleaned up right after the 
coordinator loading them back from __consumer_offsets in broker with old 
inter-broker protocol version (< 2.2)
 Key: KAFKA-8069
 URL: https://issues.apache.org/jira/browse/KAFKA-8069
 Project: Kafka
  Issue Type: Bug
Reporter: Zhanxiang (Patrick) Huang
Assignee: Zhanxiang (Patrick) Huang


After the 2.1 release, if the broker hasn't been upgrade to the latest 
inter-broker protocol version, 
the committed offsets stored in the __consumer_offset topic will get cleaned up 
way earlier than it should be when the offsets are loaded back from the 
__consumer_offset topic in GroupCoordinator, which will happen during 
leadership transition or after broker bounce.

TL;DR
For V1 on-disk format for __consumer_offsets, we have the *expireTimestamp* 
field and if the inter-broker protocol (IBP) version is prior to 2.1 (prior to 
[KIP-211|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets])
 for a kafka 2.1 broker, the logic of getting the expired offsets looks like:
{code:java}
def getExpiredOffsets(baseTimestamp: CommitRecordMetadataAndOffset => Long): 
Map[TopicPartition, OffsetAndMetadata] = {
 offsets.filter {
 case (topicPartition, commitRecordMetadataAndOffset) =>
 ... && {
 commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp match {
 case None =>
 // current version with no per partition retention
 currentTimestamp - baseTimestamp(commitRecordMetadataAndOffset) >= 
offsetRetentionMs
 case Some(expireTimestamp) =>
 // older versions with explicit expire_timestamp field => old expiration 
semantics is used
 currentTimestamp >= expireTimestamp
 }
 }
 }
 }
{code}
The expireTimestamp in the on-disk offset record can only be set when storing 
the committed offset in the __consumer_offset topic. But the GroupCoordinator 
also has keep a in-memory representation for the expireTimestamp (see the codes 
above), which can be set in the following two cases:
 # Upon the GroupCoordinator receiving OffsetCommitRequest, the expireTimestamp 
is set using the following logic:
{code:java}
expireTimestamp = offsetCommitRequest.retentionTime match {
 case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None
 case retentionTime => Some(currentTimestamp + retentionTime)
}
{code}
In all the latest client versions, the consumer will set out 
OffsetCommitRequest with DEFAULT_RETENTION_TIME so the expireTimestamp will 
always be None in this case. *This means any committed offset set in this case 
will always hit the "case None" in the "getExpiredOffsets(...)" when 
coordinator is doing the cleanup, which is correct.*

 # Upon the GroupCoordinatorReceiving loading the committed offset stored in 
the __consumer_offsets topic from disk, the expireTimestamp is set using the 
following logic if IBP<2.1:
{code:java}
val expireTimestamp = 
value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
{code}
and the logic to persist the expireTimestamp is:
{code:java}
// OffsetCommitRequest.DEFAULT_TIMESTAMP = -1
value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, 
offsetAndMetadata.expireTimestamp.getOrElse(OffsetCommitRequest.DEFAULT_TIMESTAMP))
{code}
Since the in-memory expireTimestamp will always be None in our case as 
mentioned in 1), we will always store -1 on-disk. Therefore, when the offset is 
loaded from the __consumer_offsets topic, the in-memory expireTimestamp will 
always be set to -1. *This means any committed offset set in this case will 
always hit "case Some(expireTimestamp)" in the "getExpiredOffsets(...)" when 
coordinator is doing the cleanup, which basically indicates we will always 
expire the committed offset on the first expiration check (which is shortly 
after they are loaded from __consumer_offsets topic)*.

I am able to reproduce this bug on my local box with one broker using 2.*,1.* 
and 0.11.* consumer. The consumer will see null committed offset after the 
broker is bounced.

This bug is introduced by [PR-5690|https://github.com/apache/kafka/pull/5690] 
in the kafka 2.1 release and the fix is very straight-forward, which is 
basically set the expireTimestamp to None if it is -1 in the on-disk format.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7854) Behavior change in controller picking up partition reassignment tasks since 1.1.0

2019-01-22 Thread Zhanxiang (Patrick) Huang (JIRA)
Zhanxiang (Patrick) Huang created KAFKA-7854:


 Summary: Behavior change in controller picking up partition 
reassignment tasks since 1.1.0
 Key: KAFKA-7854
 URL: https://issues.apache.org/jira/browse/KAFKA-7854
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Reporter: Zhanxiang (Patrick) Huang


After [https://github.com/apache/kafka/pull/4143,] the controller does not 
subscribe to data change on /admin/reassign_partitions any more (in order to 
avoid unnecessarily loading the reassignment data again after controller 
updating the znode) as opposed to the previous kafka versions. However, there 
are systems built around kafka relying on the previous behavior to 
incrementally update the list of partition reassignment since kafka does not 
natively support that.

 

For example, [cruise control|https://github.com/linkedin/cruise-control] can 
rely on the previous behavior (controller listening to data changes) to 
maintain the reassignment concurrency by dynamically updating the data in the 
reassignment znode instead of waiting for the current batch to finish and doing 
reassignment batch by batch, which can significantly reduce the rebalance time 
in production clusters. Although directly updating the znode can somehow be 
viewed as an anti-pattern in the long term, this is necessary since kafka does 
not natively support incrementally submit more reassignment tasks. However, 
after our kafka clusters migrate from 0.11 to 2.0, cruise control no longer 
works because the controller behavior has changed. This reveals the following 
problems:
 * These behavior changes may be viewed as internal changes so compatibility is 
not guaranteed but I think by convention people do view this as public 
interfaces and rely on the compatibility. In this case, I think we should 
clearly document the data contract for the partition reassignment task to avoid 
misusage and making controller changes that break the defined data contract. 
There may be other cases (e.g. topic deletion) whose data contracts need to be 
clearly defined and we should keep it in mind when making controller changes.
 * Kafka does not natively support incrementally submit more reassignment 
tasks. If we do want to support that nicely, we should consider change how we 
store the reassignment data to store the data in child nodes and let the 
controller listen on child node changes, similar to what we do for 
/admin/delete_topics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7452) Deleting snapshot files after check-pointing log recovery offsets can slow down replication when truncation happens

2018-11-12 Thread Zhanxiang (Patrick) Huang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhanxiang (Patrick) Huang resolved KAFKA-7452.
--
Resolution: Duplicate

KAFKA-7557 fixed this.

> Deleting snapshot files after check-pointing log recovery offsets can slow 
> down replication when truncation happens
> ---
>
> Key: KAFKA-7452
> URL: https://issues.apache.org/jira/browse/KAFKA-7452
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0, 1.0.1, 1.1.0, 1.1.1, 2.0.0
>Reporter: Zhanxiang (Patrick) Huang
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Major
>
> After KAFKA-5829, Kafka will try to iterate through all the partition dirs to 
> delete useless snapshot files in "checkpointLogRecoveryOffsetsInDir". 
> Currently, "checkpointLogRecoveryOffsetsInDir" is used in the following 
> places:
>  # Truncation
>  # Log dir deletion and movement
>  # Background thread checkpointing recovery offsets
> In 2.0 deployment on a cluster with 10k partitions per broker, we found out 
> that deleting useless snapshot files in the critical path of log truncation 
> can significantly slow down followers to catch up with leader during rolling 
> bounce (~2x slower than 0.11). The reason is that we basically do a "ls -R" 
> for the whole data directory only to potentially delete the snapshot files in 
> one partition directory because the way we identify snapshot files is to list 
> the directories and check the filename suffix.
> In our case, "listSnapshotFiles" takes ~1ms per partition directory so it 
> takes ~1ms * 10k = ~10s to just delete snapshot files in one partition after 
> the truncation, which delays future fetches in the fetcher thread.
> Here are the related code snippets:
>  LogManager.scala
>  
> {code:java}
> private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = {
>   for {
> partitionToLog <- logsByDir.get(dir.getAbsolutePath)
> checkpoint <- recoveryPointCheckpoints.get(dir)
>   } {
> try {
>   checkpoint.write(partitionToLog.mapValues(_.recoveryPoint))
>   allLogs.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint())
> } catch {
>   case e: IOException =>
> logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, 
> s"Disk error while writing to recovery point " +
>   s"file in directory $dir", e)
> }
>   }
> }
> {code}
>  
>  ProducerStateChangeManager.scala
>  
> {code:java}
> private[log] def listSnapshotFiles(dir: File): Seq[File] = {
>   if (dir.exists && dir.isDirectory) {
> Option(dir.listFiles).map { files =>
>   files.filter(f => f.isFile && isSnapshotFile(f)).toSeq
> }.getOrElse(Seq.empty)
>   } else Seq.empty
> }
> private def deleteSnapshotFiles(dir: File, predicate: Long => Boolean = _ => 
> true) {
>   listSnapshotFiles(dir).filter(file => 
> predicate(offsetFromFile(file))).foreach { file =>
> Files.deleteIfExists(file.toPath)
>   }
> }
> {code}
>  
> There are a few things that can be optimized here:
>  # We can have an in-memory cache for the snapshot files metadata (filename) 
> in ProducerStateManager to avoid calling dir.listFiles in 
> "deleteSnapshotFiles", "latestSnapshotFile" and "oldestSnapshotFile".
>  # After truncation, we can only try to delete snapshot files for the 
> truncated partitions (in replica fetcher thread, we truncate one partition at 
> a time) instead of all partitions. Or maybe we don't even need to delete 
> snapshot files in the critical path of truncation because the background 
> log-recovery-offset-checkpoint-thread will do it periodically. This can also 
> apply on log deletion/movement.
>  # If we want to further optimize the actual snapshot file deletion, we can 
> make it async. But I am not sure whether it is needed after we have 1) and 2).
> Also, we notice that there is no way to disable transaction/exactly-once 
> support in the broker-side given that it will bring in some extra overhead 
> even though we have no clients using this feature. Not sure whether this is a 
> common use case, but it is useful if we can have a switch to avoid the extra 
> performance overhead.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7537) Only include live brokers in the UpdateMetadataRequest sent to existing brokers if there is no change in the partition states

2018-10-24 Thread Zhanxiang (Patrick) Huang (JIRA)
Zhanxiang (Patrick) Huang created KAFKA-7537:


 Summary: Only include live brokers in the UpdateMetadataRequest 
sent to existing brokers if there is no change in the partition states
 Key: KAFKA-7537
 URL: https://issues.apache.org/jira/browse/KAFKA-7537
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Reporter: Zhanxiang (Patrick) Huang
Assignee: Zhanxiang (Patrick) Huang


Currently if when brokers join/leave the cluster without any partition states 
changes, controller will send out UpdateMetadataRequests containing the states 
of all partitions to all brokers. But for existing brokers in the cluster, the 
metadata diff between controller and the broker should only be the 
"live_brokers" info. Only the brokers with empty metadata cache need the full 
UpdateMetadataRequest. Sending the full UpdateMetadataRequest to all brokers 
can place nonnegligible memory pressure on the controller side.

Let's say in total we have N brokers, M partitions in the cluster and we want 
to add 1 brand new broker in the cluster. With RF=2, the memory footprint per 
partition in the UpdateMetadataRequest is ~200 Bytes. In the current controller 
implementation, if each of the N RequestSendThreads serializes and sends out 
the UpdateMetadataRequest at roughly the same time (which is very likely the 
case), we will end up using *(N+1)*M*200B*. In a large kafka cluster, we can 
have:
{noformat}
N=99
M=100k

Memory usage to send out UpdateMetadataRequest to all brokers:
100 * 100K * 200B = 2G

However, we only need to send out full UpdateMetadataRequest to the newly added 
broker. We only need to include live broker ids (4B * 100 brokers) in the 
UpdateMetadataRequest sent to the existing 99 brokers. So the amount of data 
that is actully needed will be:
1 * 100K * 200B + 99 * (100 * 4B) = ~21M


We will can potentially reduce 2G / 21M = ~95x memory footprint as well as the 
data tranferred in the network.{noformat}
 

This issue kind of hurts the scalability of a kafka cluster. KIP-380 and 
KAFKA-7186 also help to further reduce the controller memory footprint.

 

In terms of implementation, we can keep some in-memory state in the controller 
side to differentiate existing brokers and uninitialized brokers (e.g. brand 
new brokers) so that if there is no change in partition states, we only send 
out live brokers info to existing brokers.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7464) Fail to shutdown ReplicaManager during broker cleaned shutdown

2018-10-01 Thread Zhanxiang (Patrick) Huang (JIRA)
Zhanxiang (Patrick) Huang created KAFKA-7464:


 Summary: Fail to shutdown ReplicaManager during broker cleaned 
shutdown
 Key: KAFKA-7464
 URL: https://issues.apache.org/jira/browse/KAFKA-7464
 Project: Kafka
  Issue Type: Bug
Reporter: Zhanxiang (Patrick) Huang
Assignee: Zhanxiang (Patrick) Huang


In 2.0 deployment, we saw the following log when shutting down the 
ReplicaManager in broker cleaned shutdown:
{noformat}
2018/09/27 08:22:18.699 WARN [CoreUtils$] [Thread-1] [kafka-server] [] null
java.lang.IllegalArgumentException: null
at java.nio.Buffer.position(Buffer.java:244) ~[?:1.8.0_121]
at sun.nio.ch.IOUtil.write(IOUtil.java:68) ~[?:1.8.0_121]
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) 
~[?:1.8.0_121]
at 
org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:214)
 ~[kafka-clients-2.0.0.22.jar:?]
at 
org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:164)
 ~[kafka-clients-2.0.0.22.jar:?]
at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:806) 
~[kafka-clients-2.0.0.22.jar:?]
at 
org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:107) 
~[kafka-clients-2.0.0.22.jar:?]
at org.apache.kafka.common.network.Selector.doClose(Selector.java:751) 
~[kafka-clients-2.0.0.22.jar:?]
at org.apache.kafka.common.network.Selector.close(Selector.java:739) 
~[kafka-clients-2.0.0.22.jar:?]
at org.apache.kafka.common.network.Selector.close(Selector.java:701) 
~[kafka-clients-2.0.0.22.jar:?]
at org.apache.kafka.common.network.Selector.close(Selector.java:315) 
~[kafka-clients-2.0.0.22.jar:?]
at org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:595) 
~[kafka-clients-2.0.0.22.jar:?]
at 
kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107)
 ~[kafka_2.11-2.0.0.22.jar:?]
at 
kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:108)
 ~[kafka_2.11-2.0.0.22.jar:?]
at 
kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:183)
 ~[kafka_2.11-2.0.0.22.jar:?]
at 
kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:182)
 ~[kafka_2.11-2.0.0.22.jar:?]
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
 ~[scala-library-2.11.12.jar:?]
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
~[scala-library-2.11.12.jar:?]
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
~[scala-library-2.11.12.jar:?]
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) 
~[scala-library-2.11.12.jar:?]
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) 
~[scala-library-2.11.12.jar:?]
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) 
~[scala-library-2.11.12.jar:?]
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) 
~[scala-library-2.11.12.jar:?]
at 
kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:182)
 ~[kafka_2.11-2.0.0.22.jar:?]
at 
kafka.server.ReplicaFetcherManager.shutdown(ReplicaFetcherManager.scala:37) 
~[kafka_2.11-2.0.0.22.jar:?]
at kafka.server.ReplicaManager.shutdown(ReplicaManager.scala:1471) 
~[kafka_2.11-2.0.0.22.jar:?]
at 
kafka.server.KafkaServer$$anonfun$shutdown$12.apply$mcV$sp(KafkaServer.scala:616)
 ~[kafka_2.11-2.0.0.22.jar:?]
at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86) 
~[kafka_2.11-2.0.0.22.jar:?]
at kafka.server.KafkaServer.shutdown(KafkaServer.scala:616) 
~[kafka_2.11-2.0.0.22.jar:?]
{noformat}
After that, we noticed that some of the replica fetcher thread fail to shutdown:
{noformat}
2018/09/27 08:22:46.176 ERROR [LogDirFailureChannel] 
[ReplicaFetcherThread-26-13085] [kafka-server] [] Error while rolling log 
segment for video-social-gestures-30 in dir /export/content/kafka/i001_caches
java.nio.channels.ClosedChannelException: null
at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110) 
~[?:1.8.0_121]
at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300) 
~[?:1.8.0_121]
at 
org.apache.kafka.common.record.FileRecords.truncateTo(FileRecords.java:244) 
~[kafka-clients-2.0.0.22.jar:?]
at 
org.apache.kafka.common.record.FileRecords.trim(FileRecords.java:206) 
~[kafka-clients-2.0.0.22.jar:?]
at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:512) 
~[kafka_2.11-2.0.0.22.jar:?]
at 
kafka.log.Log$$anonfun$roll$2$$anonfun$apply$30.apply(Log.scala:1493) 
~[kafka_2.11-2.0.0.22.jar:?]
at 

[jira] [Created] (KAFKA-7459) Concurrency bug in updating RequestsPerSec metric

2018-09-29 Thread Zhanxiang (Patrick) Huang (JIRA)
Zhanxiang (Patrick) Huang created KAFKA-7459:


 Summary: Concurrency bug in updating RequestsPerSec metric 
 Key: KAFKA-7459
 URL: https://issues.apache.org/jira/browse/KAFKA-7459
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.0.0, 2.0.1
Reporter: Zhanxiang (Patrick) Huang
Assignee: Zhanxiang (Patrick) Huang


After KAFKA-6514, we add API version as a tag for the RequestsPerSec metric but 
in the implementation, we use the non-threadsafe mutable.HashMap to store the 
version -> metric mapping without any protection 
([https://github.com/apache/kafka/pull/4506/files#diff-d0332a0ff31df50afce3809d90505b25R357|https://github.com/apache/kafka/pull/4506/files#diff-d0332a0ff31df50afce3809d90505b25R357).]
 ). This can mess up the data structure and cause unexpected behavior 
([https://github.com/scala/bug/issues/10436|https://github.com/scala/bug/issues/10436).]
 ). We should use ConcurrentHashMap instead.

 

In our case, clean shutdown a 2.0 broker takes forever because of this 
concurrency bug leading to an infinite loop in HapMap resize.
 Thread-1 is doing the clean shutdown but stuck on waiting for one of the 
network thread to shutdown: 
{noformat}
"Thread-1" #25 prio=5 os_prio=0 tid=0x7f05c401 nid=0x79f4 waiting on 
condition [0x7f0597cfb000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0001ffad1500> (a 
java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at kafka.network.AbstractServerThread.shutdown(SocketServer.scala:282)
at kafka.network.Processor.shutdown(SocketServer.scala:873)
at 
kafka.network.Acceptor$$anonfun$shutdown$3.apply(SocketServer.scala:368)
at 
kafka.network.Acceptor$$anonfun$shutdown$3.apply(SocketServer.scala:368)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at kafka.network.Acceptor.shutdown(SocketServer.scala:368)
- locked <0x0001fdcf1000> (a kafka.network.Acceptor)
at 
kafka.network.SocketServer$$anonfun$stopProcessingRequests$2.apply(SocketServer.scala:178)
at 
kafka.network.SocketServer$$anonfun$stopProcessingRequests$2.apply(SocketServer.scala:178)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at 
scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
at 
kafka.network.SocketServer.stopProcessingRequests(SocketServer.scala:178)
- locked <0x0001fba0e610> (a kafka.network.SocketServer)
at 
kafka.server.KafkaServer$$anonfun$shutdown$3.apply$mcV$sp(KafkaServer.scala:595)
at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86)
at kafka.server.KafkaServer.shutdown(KafkaServer.scala:595){noformat}
The network thread is always in HashTable.resize and never finishes 
updateRequestMetrics:
{noformat}
"kafka-network-thread-13673-ListenerName(SSL)-SSL-2" #201 prio=5 os_prio=0 
tid=0x7f441dae4000 nid=0x4cdc runnable [0x7f2404189000]
   java.lang.Thread.State: RUNNABLE
at scala.collection.mutable.HashTable$class.resize(HashTable.scala:268)
at 
scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$addEntry0(HashTable.scala:157)
at 
scala.collection.mutable.HashTable$class.addEntry(HashTable.scala:148)
at scala.collection.mutable.HashMap.addEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.addEntry(HashMap.scala:93)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:79)
at kafka.network.RequestMetrics.requestRate(RequestChannel.scala:438)
at 
kafka.network.RequestChannel$Request$$anonfun$updateRequestMetrics$1.apply(RequestChannel.scala:161)
at 
kafka.network.RequestChannel$Request$$anonfun$updateRequestMetrics$1.apply(RequestChannel.scala:159)
at scala.collection.immutable.List.foreach(List.scala:392)
at 
kafka.network.RequestChannel$Request.updateRequestMetrics(RequestChannel.scala:159)
at 
kafka.network.Processor.kafka$network$Processor$$updateRequestMetrics(SocketServer.scala:740)
at 

[jira] [Created] (KAFKA-7452) Deleting snapshot files after check-pointing log recovery offsets can slow down replication when truncation happens

2018-09-28 Thread Zhanxiang (Patrick) Huang (JIRA)
Zhanxiang (Patrick) Huang created KAFKA-7452:


 Summary: Deleting snapshot files after check-pointing log recovery 
offsets can slow down replication when truncation happens
 Key: KAFKA-7452
 URL: https://issues.apache.org/jira/browse/KAFKA-7452
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.0.0, 1.1.1, 1.1.0, 1.0.1, 1.0.0
Reporter: Zhanxiang (Patrick) Huang


After KAFKA-5829, Kafka will try to iterate through all the partition dirs to 
delete useless snapshot files in "checkpointLogRecoveryOffsetsInDir". 
Currently, "checkpointLogRecoveryOffsetsInDir" is used in the following places:
 # Truncation
 # Log dir deletion and movement
 # Background thread checkpointing recovery offsets

In 2.0 deployment on a cluster with 10k partitions per broker, we found out 
that deleting useless snapshot files in the critical path of log truncation can 
significantly slow down followers to catch up with leader during rolling bounce 
(~2x slower than 0.11). The reason is that we basically do a "ls -R" for the 
whole data directory only to potentially delete the snapshot files in one 
partition directory because the way we identify snapshot files is to list the 
directories and check the filename suffix.

In our case, "listSnapshotFiles" takes ~1ms per partition directory so it takes 
~1ms * 10k = ~10s to just delete snapshot files in one partition after the 
truncation, which delays future fetches in the fetcher thread.

Here are the related code snippets:

 LogManager.scala

 
{code:java}
private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = {
  for {
partitionToLog <- logsByDir.get(dir.getAbsolutePath)
checkpoint <- recoveryPointCheckpoints.get(dir)
  } {
try {
  checkpoint.write(partitionToLog.mapValues(_.recoveryPoint))
  allLogs.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint())
} catch {
  case e: IOException =>
logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk 
error while writing to recovery point " +
  s"file in directory $dir", e)
}
  }
}
{code}
 

 ProducerStateChangeManager.scala

 
{code:java}
private[log] def listSnapshotFiles(dir: File): Seq[File] = {
  if (dir.exists && dir.isDirectory) {
Option(dir.listFiles).map { files =>
  files.filter(f => f.isFile && isSnapshotFile(f)).toSeq
}.getOrElse(Seq.empty)
  } else Seq.empty
}


private def deleteSnapshotFiles(dir: File, predicate: Long => Boolean = _ => 
true) {
  listSnapshotFiles(dir).filter(file => 
predicate(offsetFromFile(file))).foreach { file =>
Files.deleteIfExists(file.toPath)
  }
}

{code}
 

There are a few things that can be optimized here:
 # We can have an in-memory cache for the snapshot files metadata (filename) in 
ProducerStateManager to avoid calling dir.listFiles in "deleteSnapshotFiles", 
"latestSnapshotFile" and "oldestSnapshotFile".
 # After truncation, we can only try to delete snapshot files for the truncated 
partitions (in replica fetcher thread, we truncate one partition at a time) 
instead of all partitions. Or maybe we don't even need to delete snapshot files 
in the critical path of truncation because the background 
log-recovery-offset-checkpoint-thread will do it periodically. This can also 
apply on log deletion/movement.
 # If we want to further optimize the actual snapshot file deletion, we can 
make it async. But I am not sure whether it is needed after we have 1) and 2).

Also, we notice that there is no way to disable transaction/exactly-once 
support in the broker-side given that it will bring in some extra overhead even 
though we have no clients using this feature. Not sure whether this is a common 
use case, but it is useful if we can have a switch to avoid the extra 
performance overhead.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7283) mmap indexes lazily and skip sanity check for segments below recovery point

2018-08-13 Thread Zhanxiang (Patrick) Huang (JIRA)
Zhanxiang (Patrick) Huang created KAFKA-7283:


 Summary: mmap indexes lazily and skip sanity check for segments 
below recovery point
 Key: KAFKA-7283
 URL: https://issues.apache.org/jira/browse/KAFKA-7283
 Project: Kafka
  Issue Type: New Feature
Reporter: Zhanxiang (Patrick) Huang
Assignee: Zhanxiang (Patrick) Huang


This is a follow-up ticket for KIP-263.

Currently broker will mmap the index files, read the length as well as the last 
entry of the file, and sanity check index files of all log segments in the log 
directory after the broker is started. These operations can be slow because 
broker needs to open index file and read data into page cache. In this case, 
the time to restart a broker will increase proportional to the number of 
segments in the log directory.

Per the KIP discussion, we think we can skip sanity check for segments below 
the recovery point since Kafka does not provide guarantee for segments already 
flushed to disk and sanity checking only index file benefits little when the 
segment is also corrupted because of disk failure. Therefore, we can make the 
following changes to improve broker startup time:
 # Mmap the index file and populate fields of the index file on-demand rather 
than performing costly disk operations when creating the index object on broker 
startup.
 # Skip sanity checks on indexes of segments below the recovery point.

With these changes, the broker startup time will increase only proportional to 
the number of partitions in the log directly after cleaned shutdown because 
only active segments are mmaped and sanity checked.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7186) Controller uses too much memory when sending out UpdateMetadataRequest that can cause OutOfMemoryError

2018-07-19 Thread Zhanxiang (Patrick) Huang (JIRA)
Zhanxiang (Patrick) Huang created KAFKA-7186:


 Summary: Controller uses too much memory when sending out 
UpdateMetadataRequest that can cause OutOfMemoryError
 Key: KAFKA-7186
 URL: https://issues.apache.org/jira/browse/KAFKA-7186
 Project: Kafka
  Issue Type: Bug
  Components: controller
Reporter: Zhanxiang (Patrick) Huang
Assignee: Zhanxiang (Patrick) Huang


During controller failover and broker changes, it sends out 
UpdateMetadataRequest to all brokers in the cluster containing the states for 
all partitions and live brokers. The current implementation will instantiate 
the UpdateMetadataRequest object and its serialized form (Struct) for <# of 
brokers> times, which causes OOM if the memory exceeds the configure JVM heap 
size. We have seen this issue in the production environment for multiple times. 

 

For example, if we have 100 brokers in the cluster and each broker is the 
leader of 2k partitions, the extra memory usage introduced by controller trying 
to send out UpdateMetadataRequest is around:

 * <# of brokers> * 

= 250B * 100 * 200k = 5GB



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)