[jira] [Comment Edited] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2020-03-19 Thread zhangchenghui (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17062331#comment-17062331
 ] 

zhangchenghui edited comment on KAFKA-3919 at 3/19/20, 6:59 AM:


Hi, [~junrao]
When the Kafka partition is unavailable and the leader copy is damaged, I try 
to minimize data loss by doing the following:
1. Partition reallocation of unavailable partitions;
2. Modify zk node: / brokers / topics / test-topic / partitions / 0 / state
3.Restart the broker
When a partition is unavailable, can I provide an option for users to manually 
set any copy in the partition as the leader?
Because once the cluster has set unclean.leader.election.enable = false, it is 
not possible to elect a copy other than ISR as the leader. In extreme cases, 
only the leader copy is still in the ISR. At this time, the broker where the 
leader is located is down. What if the broker data is corrupted? In this case, 
is it possible for the user to choose the leader replica himself? Although this 
will also result in data loss, the situation will be much better than data loss 
for the entire partition.
This is my summary:
https://mp.weixin.qq.com/s/b1etPGC97xNjmgdQbycnSg


was (Author: zhangchenghui):
Hi, [~junrao]
When the Kafka partition is unavailable and the leader copy is damaged, I try 
to minimize data loss by doing the following:
1. Partition reallocation of unavailable partitions;
2. Modify zk node: / brokers / topics / test-topic / partitions / 0 / state
3.Restart the broker
When a partition is unavailable, can I provide an option for users to manually 
set any copy in the partition as the leader?
Because once the cluster has set unclean.leader.election.enable = false, it is 
not possible to elect a copy other than ISR as the leader. In extreme cases, 
only the leader copy is still in the ISR. At this time, the broker where the 
leader is located is down. What if the broker data is corrupted? In this case, 
is it possible for the user to choose the leader replica himself? Although this 
will also result in data loss, the situation will be much better than data loss 
for the entire partition.

> Broker faills to start after ungraceful shutdown due to non-monotonically 
> incrementing offsets in logs
> --
>
> Key: KAFKA-3919
> URL: https://issues.apache.org/jira/browse/KAFKA-3919
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Andy Coates
>Assignee: Ben Stopford
>Priority: Major
>  Labels: reliability
> Fix For: 0.11.0.0
>
>
> Hi All,
> I encountered an issue with Kafka following a power outage that saw a 
> proportion of our cluster disappear. When the power came back on several 
> brokers halted on start up with the error:
> {noformat}
>   Fatal error during KafkaServerStartable startup. Prepare to shutdown”
>   kafka.common.InvalidOffsetException: Attempt to append an offset 
> (1239742691) to position 35728 no larger than the last offset appended 
> (1239742822) to 
> /data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at 

[jira] [Commented] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2020-03-19 Thread zhangchenghui (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17062331#comment-17062331
 ] 

zhangchenghui commented on KAFKA-3919:
--

Hi, [~junrao]
When the Kafka partition is unavailable and the leader copy is damaged, I try 
to minimize data loss by doing the following:
1. Partition reallocation of unavailable partitions;
2. Modify zk node: / brokers / topics / test-topic / partitions / 0 / state
3.Restart the broker
When a partition is unavailable, can I provide an option for users to manually 
set any copy in the partition as the leader?
Because once the cluster has set unclean.leader.election.enable = false, it is 
not possible to elect a copy other than ISR as the leader. In extreme cases, 
only the leader copy is still in the ISR. At this time, the broker where the 
leader is located is down. What if the broker data is corrupted? In this case, 
is it possible for the user to choose the leader replica himself? Although this 
will also result in data loss, the situation will be much better than data loss 
for the entire partition.

> Broker faills to start after ungraceful shutdown due to non-monotonically 
> incrementing offsets in logs
> --
>
> Key: KAFKA-3919
> URL: https://issues.apache.org/jira/browse/KAFKA-3919
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Andy Coates
>Assignee: Ben Stopford
>Priority: Major
>  Labels: reliability
> Fix For: 0.11.0.0
>
>
> Hi All,
> I encountered an issue with Kafka following a power outage that saw a 
> proportion of our cluster disappear. When the power came back on several 
> brokers halted on start up with the error:
> {noformat}
>   Fatal error during KafkaServerStartable startup. Prepare to shutdown”
>   kafka.common.InvalidOffsetException: Attempt to append an offset 
> (1239742691) to position 35728 no larger than the last offset appended 
> (1239742822) to 
> /data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only way to recover the brokers was to delete the log files that 
> contained non monotonically incrementing offsets.
> We've spent some time digging through the logs and I feel I may have worked 
> out the sequence of events leading to this issue, (though this is based on 
> some assumptions I've made about the way Kafka is working, which may be 
> wrong).
> First off, we have unclean leadership elections disable. (We did later enable 
> them to help get around some other issues we were having, but this was 
> several hours after this issue manifested), and we're producing to the topic 
> with gzip compression and acks=1
> We looked through the data logs that were causing the brokers to not start. 
> What we found the initial part of the log has monotonically increasing 
> offset, where each compressed batch normally contained one or two records. 
> Then the is a batch that contains many records, whose first records have an 
> offset below the previous batch and whose last record has an offset above the 
> previous batch. 

[jira] [Commented] (KAFKA-4972) Kafka 0.10.0 Found a corrupted index file during Kafka broker startup

2020-03-14 Thread zhangchenghui (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059305#comment-17059305
 ] 

zhangchenghui commented on KAFKA-4972:
--

I made an analysis of this problem specifically:
https://objcoding.com/2020/03/14/kafka-invalid-offset-exception/

> Kafka 0.10.0  Found a corrupted index file during Kafka broker startup
> --
>
> Key: KAFKA-4972
> URL: https://issues.apache.org/jira/browse/KAFKA-4972
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0
> Environment: JDK: HotSpot  x64  1.7.0_80
> Tag: 0.10.0
>Reporter: fangjinuo
>Priority: Critical
>  Labels: reliability
> Attachments: Snap3.png
>
>
> -deleted text-After force shutdown all kafka brokers one by one, restart them 
> one by one, but a broker startup failure.
> The following WARN leval log was found in the log file:
> found a corrutped index file,  .index , delet it  ...
> you can view details by following attachment.
> ~I look up some codes in core module, found out :
> the nonthreadsafe method LogSegment.append(offset, messages)  has tow caller:
> 1) Log.append(messages)  // here has a synchronized 
> lock 
> 2) LogCleaner.cleanInto(topicAndPartition, source, dest, map, retainDeletes, 
> messageFormatVersion)   // here has not 
> So I guess this may be the reason for the repeated offset in 0xx.log file 
> (logsegment's .log) ~
> Although this is just my inference, but I hope that this problem can be 
> quickly repaired



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9347) Detect deleted log directory before becoming leader

2020-03-14 Thread zhangchenghui (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059306#comment-17059306
 ] 

zhangchenghui commented on KAFKA-9347:
--

I made an analysis of this problem specifically:
https://objcoding.com/2020/03/14/kafka-invalid-offset-exception/

> Detect deleted log directory before becoming leader
> ---
>
> Key: KAFKA-9347
> URL: https://issues.apache.org/jira/browse/KAFKA-9347
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>  Labels: needs-discussion
>
> There is no protection currently if a broker has had its log directory 
> deleted to prevent it from becoming the leader of a partition that it still 
> remains in the ISR of. This scenario can happen when the last remaining 
> replica in the ISR is shutdown. It will remain in the ISR and be eligible for 
> leadership as soon as it starts up. It would be useful to either detect this 
> case situation dynamically in order to force the user to do an unclean 
> election or recover another broker. One option might be just to pass a flag 
> on startup to specify that a broker should not be eligible for leadership. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (KAFKA-3410) Unclean leader election and "Halting because log truncation is not allowed"

2020-03-14 Thread zhangchenghui (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangchenghui updated KAFKA-3410:
-
Comment: was deleted

(was: I made an analysis of this problem specifically:
https://objcoding.com/2020/03/14/kafka-invalid-offset-exception/)

> Unclean leader election and "Halting because log truncation is not allowed"
> ---
>
> Key: KAFKA-3410
> URL: https://issues.apache.org/jira/browse/KAFKA-3410
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Reporter: James Cheng
>Priority: Major
>  Labels: reliability
>
> I ran into a scenario where one of my brokers would continually shutdown, 
> with the error message:
> [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because 
> log truncation is not allowed for topic test, Current leader 1's latest 
> offset 0 is less than replica 2's latest offset 151 
> (kafka.server.ReplicaFetcherThread)
> I managed to reproduce it with the following scenario:
> 1. Start broker1, with unclean.leader.election.enable=false
> 2. Start broker2, with unclean.leader.election.enable=false
> 3. Create topic, single partition, with replication-factor 2.
> 4. Write data to the topic.
> 5. At this point, both brokers are in the ISR. Broker1 is the partition 
> leader.
> 6. Ctrl-Z on broker2. (Simulates a GC pause or a slow network) Broker2 gets 
> dropped out of ISR. Broker1 is still the leader. I can still write data to 
> the partition.
> 7. Shutdown Broker1. Hard or controlled, doesn't matter.
> 8. rm -rf the log directory of broker1. (This simulates a disk replacement or 
> full hardware replacement)
> 9. Resume broker2. It attempts to connect to broker1, but doesn't succeed 
> because broker1 is down. At this point, the partition is offline. Can't write 
> to it.
> 10. Resume broker1. Broker1 resumes leadership of the topic. Broker2 attempts 
> to join ISR, and immediately halts with the error message:
> [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because 
> log truncation is not allowed for topic test, Current leader 1's latest 
> offset 0 is less than replica 2's latest offset 151 
> (kafka.server.ReplicaFetcherThread)
> I am able to recover by setting unclean.leader.election.enable=true on my 
> brokers.
> I'm trying to understand a couple things:
> * In step 10, why is broker1 allowed to resume leadership even though it has 
> no data?
> * In step 10, why is it necessary to stop the entire broker due to one 
> partition that is in this state? Wouldn't it be possible for the broker to 
> continue to serve traffic for all the other topics, and just mark this one as 
> unavailable?
> * Would it make sense to allow an operator to manually specify which broker 
> they want to become the new master? This would give me more control over how 
> much data loss I am willing to handle. In this case, I would want broker2 to 
> become the new master. Or, is that possible and I just don't know how to do 
> it?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-3410) Unclean leader election and "Halting because log truncation is not allowed"

2020-03-14 Thread zhangchenghui (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059304#comment-17059304
 ] 

zhangchenghui commented on KAFKA-3410:
--

I made an analysis of this problem specifically:
https://objcoding.com/2020/03/14/kafka-invalid-offset-exception/

> Unclean leader election and "Halting because log truncation is not allowed"
> ---
>
> Key: KAFKA-3410
> URL: https://issues.apache.org/jira/browse/KAFKA-3410
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Reporter: James Cheng
>Priority: Major
>  Labels: reliability
>
> I ran into a scenario where one of my brokers would continually shutdown, 
> with the error message:
> [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because 
> log truncation is not allowed for topic test, Current leader 1's latest 
> offset 0 is less than replica 2's latest offset 151 
> (kafka.server.ReplicaFetcherThread)
> I managed to reproduce it with the following scenario:
> 1. Start broker1, with unclean.leader.election.enable=false
> 2. Start broker2, with unclean.leader.election.enable=false
> 3. Create topic, single partition, with replication-factor 2.
> 4. Write data to the topic.
> 5. At this point, both brokers are in the ISR. Broker1 is the partition 
> leader.
> 6. Ctrl-Z on broker2. (Simulates a GC pause or a slow network) Broker2 gets 
> dropped out of ISR. Broker1 is still the leader. I can still write data to 
> the partition.
> 7. Shutdown Broker1. Hard or controlled, doesn't matter.
> 8. rm -rf the log directory of broker1. (This simulates a disk replacement or 
> full hardware replacement)
> 9. Resume broker2. It attempts to connect to broker1, but doesn't succeed 
> because broker1 is down. At this point, the partition is offline. Can't write 
> to it.
> 10. Resume broker1. Broker1 resumes leadership of the topic. Broker2 attempts 
> to join ISR, and immediately halts with the error message:
> [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because 
> log truncation is not allowed for topic test, Current leader 1's latest 
> offset 0 is less than replica 2's latest offset 151 
> (kafka.server.ReplicaFetcherThread)
> I am able to recover by setting unclean.leader.election.enable=true on my 
> brokers.
> I'm trying to understand a couple things:
> * In step 10, why is broker1 allowed to resume leadership even though it has 
> no data?
> * In step 10, why is it necessary to stop the entire broker due to one 
> partition that is in this state? Wouldn't it be possible for the broker to 
> continue to serve traffic for all the other topics, and just mark this one as 
> unavailable?
> * Would it make sense to allow an operator to manually specify which broker 
> they want to become the new master? This would give me more control over how 
> much data loss I am willing to handle. In this case, I would want broker2 to 
> become the new master. Or, is that possible and I just don't know how to do 
> it?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-3410) Unclean leader election and "Halting because log truncation is not allowed"

2020-03-14 Thread zhangchenghui (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059303#comment-17059303
 ] 

zhangchenghui commented on KAFKA-3410:
--

I made an analysis of this problem specifically:
https://objcoding.com/2020/03/14/kafka-invalid-offset-exception/

> Unclean leader election and "Halting because log truncation is not allowed"
> ---
>
> Key: KAFKA-3410
> URL: https://issues.apache.org/jira/browse/KAFKA-3410
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Reporter: James Cheng
>Priority: Major
>  Labels: reliability
>
> I ran into a scenario where one of my brokers would continually shutdown, 
> with the error message:
> [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because 
> log truncation is not allowed for topic test, Current leader 1's latest 
> offset 0 is less than replica 2's latest offset 151 
> (kafka.server.ReplicaFetcherThread)
> I managed to reproduce it with the following scenario:
> 1. Start broker1, with unclean.leader.election.enable=false
> 2. Start broker2, with unclean.leader.election.enable=false
> 3. Create topic, single partition, with replication-factor 2.
> 4. Write data to the topic.
> 5. At this point, both brokers are in the ISR. Broker1 is the partition 
> leader.
> 6. Ctrl-Z on broker2. (Simulates a GC pause or a slow network) Broker2 gets 
> dropped out of ISR. Broker1 is still the leader. I can still write data to 
> the partition.
> 7. Shutdown Broker1. Hard or controlled, doesn't matter.
> 8. rm -rf the log directory of broker1. (This simulates a disk replacement or 
> full hardware replacement)
> 9. Resume broker2. It attempts to connect to broker1, but doesn't succeed 
> because broker1 is down. At this point, the partition is offline. Can't write 
> to it.
> 10. Resume broker1. Broker1 resumes leadership of the topic. Broker2 attempts 
> to join ISR, and immediately halts with the error message:
> [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because 
> log truncation is not allowed for topic test, Current leader 1's latest 
> offset 0 is less than replica 2's latest offset 151 
> (kafka.server.ReplicaFetcherThread)
> I am able to recover by setting unclean.leader.election.enable=true on my 
> brokers.
> I'm trying to understand a couple things:
> * In step 10, why is broker1 allowed to resume leadership even though it has 
> no data?
> * In step 10, why is it necessary to stop the entire broker due to one 
> partition that is in this state? Wouldn't it be possible for the broker to 
> continue to serve traffic for all the other topics, and just mark this one as 
> unavailable?
> * Would it make sense to allow an operator to manually specify which broker 
> they want to become the new master? This would give me more control over how 
> much data loss I am willing to handle. In this case, I would want broker2 to 
> become the new master. Or, is that possible and I just don't know how to do 
> it?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-3955) Kafka log recovery doesn't truncate logs on non-monotonic offsets, leading to failed broker boot

2020-03-14 Thread zhangchenghui (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059302#comment-17059302
 ] 

zhangchenghui edited comment on KAFKA-3955 at 3/14/20, 11:21 AM:
-

I made an analysis of this problem specifically:
https://objcoding.com/2020/03/14/kafka-invalid-offset-exception/


was (Author: zhangchenghui):
I made an analysis of this problem specifically:
https://objcoding.com/2020/03/14/kafka-startup-fail-code-review/

> Kafka log recovery doesn't truncate logs on non-monotonic offsets, leading to 
> failed broker boot
> 
>
> Key: KAFKA-3955
> URL: https://issues.apache.org/jira/browse/KAFKA-3955
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0, 0.10.1.1, 0.11.0.0, 1.0.0
>Reporter: Tom Crayford
>Assignee: Dhruvil Shah
>Priority: Critical
>  Labels: reliability
>
> Hi,
> I've found a bug impacting kafka brokers on startup after an unclean 
> shutdown. If a log segment is corrupt and has non-monotonic offsets (see the 
> appendix of this bug for a sample output from {{DumpLogSegments}}), then 
> {{LogSegment.recover}} throws an {{InvalidOffsetException}} error: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/OffsetIndex.scala#L218
> That code is called by {{LogSegment.recover}}: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L191
> Which is called in several places in {{Log.scala}}. Notably it's called four 
> times during recovery:
> Thrice in Log.loadSegments
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L199
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L204
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L226
> and once in Log.recoverLog
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L268
> Of these, only the very last one has a {{catch}} for 
> {{InvalidOffsetException}}. When that catches the issue, it truncates the 
> whole log (not just this segment): 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L274
>  to the start segment of the bad log segment.
> However, this code can't be hit on recovery, because of the code paths in 
> {{loadSegments}} - they mean we'll never hit truncation here, as we always 
> throw this exception and that goes all the way to the toplevel exception 
> handler and crashes the JVM.
> As {{Log.recoverLog}} is always called during recovery, I *think* a fix for 
> this is to move this crash recovery/truncate code inside a new method in 
> {{Log.scala}}, and call that instead of {{LogSegment.recover}} in each place. 
> That code should return the number of {{truncatedBytes}} like we do in 
> {{Log.recoverLog}} and then truncate the log. The callers will have to be 
> notified "stop iterating over files in the directory", likely via a return 
> value of {{truncatedBytes}} like {{Log.recoverLog` does right now.
> I'm happy working on a patch for this. I'm aware this recovery code is tricky 
> and important to get right.
> I'm also curious (and currently don't have good theories as of yet) as to how 
> this log segment got into this state with non-monotonic offsets. This segment 
> is using gzip compression, and is under 0.9.0.1. The same bug with respect to 
> recovery exists in trunk, but I'm unsure if the new handling around 
> compressed messages (KIP-31) means the bug where non-monotonic offsets get 
> appended is still present in trunk.
> As a production workaround, one can manually truncate that log folder 
> yourself (delete all .index/.log files including and after the one with the 
> bad offset). However, kafka should (and can) handle this case well - with 
> replication we can truncate in broker startup.
> stacktrace and error message:
> {code}
> pri=WARN  t=pool-3-thread-4 at=Log Found a corrupted index file, 
> /$DIRECTORY/$TOPIC-22/14306536.index, deleting and rebuilding 
> index...
> pri=ERROR t=main at=LogManager There was an error in one of the threads 
> during logs loading: kafka.common.InvalidOffsetException: Attempt to append 
> an offset (15000337) to position 111719 no larger than the last offset 
> appended (15000337) to /$DIRECTORY/$TOPIC-8/14008931.index.
> pri=FATAL t=main at=KafkaServer Fatal error during KafkaServer startup. 
> Prepare to shutdown kafka.common.InvalidOffsetException: Attempt to append an 
> offset (15000337) to position 111719 no larger than the last offset appended 
> (15000337) to /$DIRECTORY/$TOPIC-8/14008931.index.
> at 
> 

[jira] [Comment Edited] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2020-03-14 Thread zhangchenghui (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059300#comment-17059300
 ] 

zhangchenghui edited comment on KAFKA-3919 at 3/14/20, 11:21 AM:
-

I made an analysis of this problem specifically:
https://objcoding.com/2020/03/14/kafka-invalid-offset-exception/


was (Author: zhangchenghui):
I made an analysis of this problem specifically:
https://objcoding.com/2020/03/14/kafka-startup-fail-code-review/

> Broker faills to start after ungraceful shutdown due to non-monotonically 
> incrementing offsets in logs
> --
>
> Key: KAFKA-3919
> URL: https://issues.apache.org/jira/browse/KAFKA-3919
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Andy Coates
>Assignee: Ben Stopford
>Priority: Major
>  Labels: reliability
> Fix For: 0.11.0.0
>
>
> Hi All,
> I encountered an issue with Kafka following a power outage that saw a 
> proportion of our cluster disappear. When the power came back on several 
> brokers halted on start up with the error:
> {noformat}
>   Fatal error during KafkaServerStartable startup. Prepare to shutdown”
>   kafka.common.InvalidOffsetException: Attempt to append an offset 
> (1239742691) to position 35728 no larger than the last offset appended 
> (1239742822) to 
> /data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only way to recover the brokers was to delete the log files that 
> contained non monotonically incrementing offsets.
> We've spent some time digging through the logs and I feel I may have worked 
> out the sequence of events leading to this issue, (though this is based on 
> some assumptions I've made about the way Kafka is working, which may be 
> wrong).
> First off, we have unclean leadership elections disable. (We did later enable 
> them to help get around some other issues we were having, but this was 
> several hours after this issue manifested), and we're producing to the topic 
> with gzip compression and acks=1
> We looked through the data logs that were causing the brokers to not start. 
> What we found the initial part of the log has monotonically increasing 
> offset, where each compressed batch normally contained one or two records. 
> Then the is a batch that contains many records, whose first records have an 
> offset below the previous batch and whose last record has an offset above the 
> previous batch. Following on from this there continues a period of large 
> batches, with monotonically increasing offsets, and then the log returns to 
> batches with one or two records.
> Our working assumption here is that the period before the offset dip, with 
> the small batches, is pre-outage normal operation. The period of larger 
> batches is from just after the outage, where producers have a back log to 
> processes when the partition becomes available, and then things return to 
> normal batch sizes again once the back log clears.
> We did also look through the Kafka's application logs to try and piece 

[jira] [Comment Edited] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2020-03-14 Thread zhangchenghui (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059300#comment-17059300
 ] 

zhangchenghui edited comment on KAFKA-3919 at 3/14/20, 11:18 AM:
-

I made an analysis of this problem specifically:
https://objcoding.com/2020/03/14/kafka-startup-fail-code-review/


was (Author: zhangchenghui):
I made an analysis of this problem specifically:
https://mp.weixin.qq.com/s/zbwGLygjvO_ncgp7FH9QMA

> Broker faills to start after ungraceful shutdown due to non-monotonically 
> incrementing offsets in logs
> --
>
> Key: KAFKA-3919
> URL: https://issues.apache.org/jira/browse/KAFKA-3919
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Andy Coates
>Assignee: Ben Stopford
>Priority: Major
>  Labels: reliability
> Fix For: 0.11.0.0
>
>
> Hi All,
> I encountered an issue with Kafka following a power outage that saw a 
> proportion of our cluster disappear. When the power came back on several 
> brokers halted on start up with the error:
> {noformat}
>   Fatal error during KafkaServerStartable startup. Prepare to shutdown”
>   kafka.common.InvalidOffsetException: Attempt to append an offset 
> (1239742691) to position 35728 no larger than the last offset appended 
> (1239742822) to 
> /data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only way to recover the brokers was to delete the log files that 
> contained non monotonically incrementing offsets.
> We've spent some time digging through the logs and I feel I may have worked 
> out the sequence of events leading to this issue, (though this is based on 
> some assumptions I've made about the way Kafka is working, which may be 
> wrong).
> First off, we have unclean leadership elections disable. (We did later enable 
> them to help get around some other issues we were having, but this was 
> several hours after this issue manifested), and we're producing to the topic 
> with gzip compression and acks=1
> We looked through the data logs that were causing the brokers to not start. 
> What we found the initial part of the log has monotonically increasing 
> offset, where each compressed batch normally contained one or two records. 
> Then the is a batch that contains many records, whose first records have an 
> offset below the previous batch and whose last record has an offset above the 
> previous batch. Following on from this there continues a period of large 
> batches, with monotonically increasing offsets, and then the log returns to 
> batches with one or two records.
> Our working assumption here is that the period before the offset dip, with 
> the small batches, is pre-outage normal operation. The period of larger 
> batches is from just after the outage, where producers have a back log to 
> processes when the partition becomes available, and then things return to 
> normal batch sizes again once the back log clears.
> We did also look through the Kafka's application logs to try and piece 
> together the 

[jira] [Commented] (KAFKA-3955) Kafka log recovery doesn't truncate logs on non-monotonic offsets, leading to failed broker boot

2020-03-14 Thread zhangchenghui (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059302#comment-17059302
 ] 

zhangchenghui commented on KAFKA-3955:
--

I made an analysis of this problem specifically:
https://objcoding.com/2020/03/14/kafka-startup-fail-code-review/

> Kafka log recovery doesn't truncate logs on non-monotonic offsets, leading to 
> failed broker boot
> 
>
> Key: KAFKA-3955
> URL: https://issues.apache.org/jira/browse/KAFKA-3955
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0, 0.10.1.1, 0.11.0.0, 1.0.0
>Reporter: Tom Crayford
>Assignee: Dhruvil Shah
>Priority: Critical
>  Labels: reliability
>
> Hi,
> I've found a bug impacting kafka brokers on startup after an unclean 
> shutdown. If a log segment is corrupt and has non-monotonic offsets (see the 
> appendix of this bug for a sample output from {{DumpLogSegments}}), then 
> {{LogSegment.recover}} throws an {{InvalidOffsetException}} error: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/OffsetIndex.scala#L218
> That code is called by {{LogSegment.recover}}: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L191
> Which is called in several places in {{Log.scala}}. Notably it's called four 
> times during recovery:
> Thrice in Log.loadSegments
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L199
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L204
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L226
> and once in Log.recoverLog
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L268
> Of these, only the very last one has a {{catch}} for 
> {{InvalidOffsetException}}. When that catches the issue, it truncates the 
> whole log (not just this segment): 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L274
>  to the start segment of the bad log segment.
> However, this code can't be hit on recovery, because of the code paths in 
> {{loadSegments}} - they mean we'll never hit truncation here, as we always 
> throw this exception and that goes all the way to the toplevel exception 
> handler and crashes the JVM.
> As {{Log.recoverLog}} is always called during recovery, I *think* a fix for 
> this is to move this crash recovery/truncate code inside a new method in 
> {{Log.scala}}, and call that instead of {{LogSegment.recover}} in each place. 
> That code should return the number of {{truncatedBytes}} like we do in 
> {{Log.recoverLog}} and then truncate the log. The callers will have to be 
> notified "stop iterating over files in the directory", likely via a return 
> value of {{truncatedBytes}} like {{Log.recoverLog` does right now.
> I'm happy working on a patch for this. I'm aware this recovery code is tricky 
> and important to get right.
> I'm also curious (and currently don't have good theories as of yet) as to how 
> this log segment got into this state with non-monotonic offsets. This segment 
> is using gzip compression, and is under 0.9.0.1. The same bug with respect to 
> recovery exists in trunk, but I'm unsure if the new handling around 
> compressed messages (KIP-31) means the bug where non-monotonic offsets get 
> appended is still present in trunk.
> As a production workaround, one can manually truncate that log folder 
> yourself (delete all .index/.log files including and after the one with the 
> bad offset). However, kafka should (and can) handle this case well - with 
> replication we can truncate in broker startup.
> stacktrace and error message:
> {code}
> pri=WARN  t=pool-3-thread-4 at=Log Found a corrupted index file, 
> /$DIRECTORY/$TOPIC-22/14306536.index, deleting and rebuilding 
> index...
> pri=ERROR t=main at=LogManager There was an error in one of the threads 
> during logs loading: kafka.common.InvalidOffsetException: Attempt to append 
> an offset (15000337) to position 111719 no larger than the last offset 
> appended (15000337) to /$DIRECTORY/$TOPIC-8/14008931.index.
> pri=FATAL t=main at=KafkaServer Fatal error during KafkaServer startup. 
> Prepare to shutdown kafka.common.InvalidOffsetException: Attempt to append an 
> offset (15000337) to position 111719 no larger than the last offset appended 
> (15000337) to /$DIRECTORY/$TOPIC-8/14008931.index.
> at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
> at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
> at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
> at 

[jira] [Commented] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2020-03-14 Thread zhangchenghui (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059300#comment-17059300
 ] 

zhangchenghui commented on KAFKA-3919:
--

I made an analysis of this problem specifically:
https://mp.weixin.qq.com/s/zbwGLygjvO_ncgp7FH9QMA

> Broker faills to start after ungraceful shutdown due to non-monotonically 
> incrementing offsets in logs
> --
>
> Key: KAFKA-3919
> URL: https://issues.apache.org/jira/browse/KAFKA-3919
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Andy Coates
>Assignee: Ben Stopford
>Priority: Major
>  Labels: reliability
> Fix For: 0.11.0.0
>
>
> Hi All,
> I encountered an issue with Kafka following a power outage that saw a 
> proportion of our cluster disappear. When the power came back on several 
> brokers halted on start up with the error:
> {noformat}
>   Fatal error during KafkaServerStartable startup. Prepare to shutdown”
>   kafka.common.InvalidOffsetException: Attempt to append an offset 
> (1239742691) to position 35728 no larger than the last offset appended 
> (1239742822) to 
> /data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only way to recover the brokers was to delete the log files that 
> contained non monotonically incrementing offsets.
> We've spent some time digging through the logs and I feel I may have worked 
> out the sequence of events leading to this issue, (though this is based on 
> some assumptions I've made about the way Kafka is working, which may be 
> wrong).
> First off, we have unclean leadership elections disable. (We did later enable 
> them to help get around some other issues we were having, but this was 
> several hours after this issue manifested), and we're producing to the topic 
> with gzip compression and acks=1
> We looked through the data logs that were causing the brokers to not start. 
> What we found the initial part of the log has monotonically increasing 
> offset, where each compressed batch normally contained one or two records. 
> Then the is a batch that contains many records, whose first records have an 
> offset below the previous batch and whose last record has an offset above the 
> previous batch. Following on from this there continues a period of large 
> batches, with monotonically increasing offsets, and then the log returns to 
> batches with one or two records.
> Our working assumption here is that the period before the offset dip, with 
> the small batches, is pre-outage normal operation. The period of larger 
> batches is from just after the outage, where producers have a back log to 
> processes when the partition becomes available, and then things return to 
> normal batch sizes again once the back log clears.
> We did also look through the Kafka's application logs to try and piece 
> together the series of events leading up to this. Here’s what we know 
> happened, with regards to one partition that has issues, from the logs:
> Prior to outage:
> * Replicas for the partition are brokers