[ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17059300#comment-17059300
 ] 

zhangchenghui commented on KAFKA-3919:
--------------------------------------

I made an analysis of this problem specifically:
https://mp.weixin.qq.com/s/zbwGLygjvO_ncgp7FH9QMA

> Broker faills to start after ungraceful shutdown due to non-monotonically 
> incrementing offsets in logs
> ------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-3919
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3919
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.9.0.1
>            Reporter: Andy Coates
>            Assignee: Ben Stopford
>            Priority: Major
>              Labels: reliability
>             Fix For: 0.11.0.0
>
>
> Hi All,
> I encountered an issue with Kafka following a power outage that saw a 
> proportion of our cluster disappear. When the power came back on several 
> brokers halted on start up with the error:
> {noformat}
>       Fatal error during KafkaServerStartable startup. Prepare to shutdown”
>       kafka.common.InvalidOffsetException: Attempt to append an offset 
> (1239742691) to position 35728 no larger than the last offset appended 
> (1239742822) to 
> /data3/kafka/mt_xp_its_music_main_itsevent-20/00000000001239444214.index.
>       at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>       at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>       at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>       at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>       at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>       at kafka.log.LogSegment.recover(LogSegment.scala:188)
>       at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>       at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>       at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>       at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>       at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>       at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>       at kafka.log.Log.loadSegments(Log.scala:160)
>       at kafka.log.Log.<init>(Log.scala:90)
>       at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>       at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only way to recover the brokers was to delete the log files that 
> contained non monotonically incrementing offsets.
> We've spent some time digging through the logs and I feel I may have worked 
> out the sequence of events leading to this issue, (though this is based on 
> some assumptions I've made about the way Kafka is working, which may be 
> wrong).
> First off, we have unclean leadership elections disable. (We did later enable 
> them to help get around some other issues we were having, but this was 
> several hours after this issue manifested), and we're producing to the topic 
> with gzip compression and acks=1
> We looked through the data logs that were causing the brokers to not start. 
> What we found the initial part of the log has monotonically increasing 
> offset, where each compressed batch normally contained one or two records. 
> Then the is a batch that contains many records, whose first records have an 
> offset below the previous batch and whose last record has an offset above the 
> previous batch. Following on from this there continues a period of large 
> batches, with monotonically increasing offsets, and then the log returns to 
> batches with one or two records.
> Our working assumption here is that the period before the offset dip, with 
> the small batches, is pre-outage normal operation. The period of larger 
> batches is from just after the outage, where producers have a back log to 
> processes when the partition becomes available, and then things return to 
> normal batch sizes again once the back log clears.
> We did also look through the Kafka's application logs to try and piece 
> together the series of events leading up to this. Here’s what we know 
> happened, with regards to one partition that has issues, from the logs:
> Prior to outage:
> * Replicas for the partition are brokers 2011, 2012,  2024, with 2024 being 
> the preferred leader.
> * Producers using acks=1, compression=gzip
> * Brokers configured with unclean.elections=false, zk.session-timeout=36s
> Post outage:
> * 2011 comes up first, (also as the Controller), recovers unflushed log 
> segment 1239444214, completes load with offset 1239740602, and becomes leader 
> of the partition.
> * 2012 comes up next, recovers its log,  recovers unflushed log segment 
> 1239444214, truncates to offset 1239742830, (thats 2,228 records ahead of the 
> recovered offset of the current leader), and starts following.
> * 2024 comes up quickly after 2012.  recovers unflushed log segment 
> 1239444214, truncates to offset  1239742250, (thats 1,648 records ahead of 
> the recovered offset of the current leader), and starts following.
> * The Controller adds 2024 to the replica set just before 2024 halts due to 
> another partition having an offset greater than the leader.
> * The Controller adds 2012 to the replica set just before 2012 halts due to 
> another partition having an offset greater than the leader.
> * When 2012 is next restarted, it fails to fully start as its complaining of 
> invalid offsets in the log.
> You’ll notice that the offset the brokers truncate to are different for each 
> of the three brokers. 
> We're assuming that when the 2012 starts up and follows the leader it request 
> records from its truncated offsets, but that the logs have diverged on these 
> two brokers to the point that the requested offset corresponds within the 
> leader's log to the middle of a compressed record set, not at a record set 
> boundary.  The leader then returns the whole compressed set, which the 
> follower appends to its log - unknowingly introducing a dip in its otherwise 
> monotonically incrementing offsets.
> Several of our brokers were unlucky enough to have this dip at the 4K 
> boundary used by the offset indexer, causing a protracted outage.  We’ve 
> written a little utility that shows several more brokers have a dip outside 
> of the 4K boundary.
> There are some assumptions in there, which I’ve not got around to confirming 
> / denying. (A quick attempt to recreate this failed and I've not found the 
> time to invest more).
> Of course I'd really appreciate the community / experts stepping in and 
> commenting on whether our assumptions are right or wrong, or if there is 
> another explanation to the problem. 
> Obviously, the fact the broker got into this state and then won’t start is 
> obviously a bug, and one I’d like to fix.  A Kafka broker should not corrupt 
> its own log during normal operation to the point that it can’t restart! :D
> A secondary issue is if we think the divergent logs are acceptable? This may 
> be deemed acceptable given the producers have chosen availability over 
> consistency when they produced with acks = 1?  Though personally, the system 
> having diverging replicas of an immutable commit log just doesn't sit right.
> I see us having a few options here:
> * Have the replicas detect the divergence of their logs e.g. a follower 
> compares the checksum of its last record with the same offset on the leader. 
> The follower can then workout that its log has diverged from the leader.  At 
> which point it could either halt, stop replicating that partition or search 
> backwards to find the point of divergence, truncate and recover. (possibly 
> saving the truncated part somewhere). This would be a protocol change for 
> Kafka.  This solution trades availability, (you’ve got less ISRs during the 
> extended re-sync process), for consistency.
> * Leave the logs as they are and have the indexing of offsets in the log on 
> start up handle such a situation gracefully.  This leaves logs in a divergent 
> state between replicas, (meaning replays would yield different messages if 
> the leader was up to down), but gives better availability, (no time spent not 
> being an ISR while it repairs any divergence).
> * Support multiple options and allow it be tuned, ideally by topic.
> * Something else...
> I’m happy/keen to contribute here. But I’d like to first discuss which option 
> should be investigated.
> Andy



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to