[ 
https://issues.apache.org/jira/browse/KAFKA-8036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-8036:
---------------------------------------
    Description: 
When changing a partition's log directories for a follower broker, we move all 
the data related to that partition to the other log dir (as per 
[KIP-113|https://cwiki.apache.org/confluence/display/KAFKA/KIP-113:+Support+replicas+movement+between+log+directories]).
 On a successful move, we rename the original directory by adding a suffix 
consisting of an UUID and `-delete`. (e.g `test_log_dir` would be renamed to 
`test_log_dir-0.32e77c96939140f9a56a49b75ad8ec8d-delete`)

We copy every log file and [initialize a new leader epoch file 
cache|https://github.com/apache/kafka/blob/0d56f1413557adabc736cae2dffcdc56a620403e/core/src/main/scala/kafka/log/Log.scala#L768].
 The problem is that we do not update the associated `Replica` class' leader 
epoch cache - it still points to the old `LeaderEpochFileCache` instance.
This results in a FileNotFound exception when the broker is [elected as a 
leader for the 
[partition|https://github.com/apache/kafka/blob/255f4a6effdc71c273691859cd26c4138acad778/core/src/main/scala/kafka/cluster/Partition.scala#L312].
 This has the unintended side effect of marking the log directory as offline, 
resulting in all partitions from that log directory becoming unavailable for 
the specific broker.
h2.  
h2. Exception and logs

 I reproduced this locally by running two brokers. The steps to reproduce:

 
{code:java}
Create partition replicated across two brokers (A, B) with leader A
Move partition leadership to B
Alter log dirs on A
Move partition leadership back to A{code}
This results in a log directory structure on broker B similar to this:
{code:java}
├── new_dir
│   ├── cleaner-offset-checkpoint
│   ├── log-start-offset-checkpoint
│   ├── meta.properties
│   ├── recovery-point-offset-checkpoint
│   ├── replication-offset-checkpoint
│   └── test_log_dir-0
│   ├── 00000000000000000000.index
│   ├── 00000000000000000000.log
│   ├── 00000000000000000000.timeindex
│   └── leader-epoch-checkpoint
└── old_dir
  ├── cleaner-offset-checkpoint
  ├── log-start-offset-checkpoint
  ├── meta.properties
  ├── recovery-point-offset-checkpoint
  ├── replication-offset-checkpoint
  └── test_log_dir-0.32e77c96939140f9a56a49b75ad8ec8d-delete
    ├── 00000000000000000000.index
    ├── 00000000000000000000.log
    ├── 00000000000000000000.timeindex
    ├── 00000000000000000009.snapshot
    └── leader-epoch-checkpoint
{code}
 

 
{code:java}
[2019-03-04 15:36:56,854] INFO [Partition test_log_dir-0 broker=0] 
test_log_dir-0 starts at Leader Epoch 3 from offset 9. Previous Leader Epoch 
was: 2 (kafka.cluster.Partition) [2019-03-04 15:36:56,855] WARN 
[LeaderEpochCache test_log_dir-0] New epoch entry EpochEntry(epoch=3, 
startOffset=9) caused truncation of conflicting entries 
ListBuffer(EpochEntry(epoch=1, startOffset=9)). Cache now contains 2 entries. 
(kafka.server.epoch.LeaderEpochFileCache) [2019-03-04 15:36:56,857] ERROR Error 
while writing to checkpoint file 
/logs/old_dir/test_log_dir-0/leader-epoch-checkpoint 
(kafka.server.LogDirFailureChannel) java.io.FileNotFoundException: 
/logs/old_dir/test_log_dir-0/leader-epoch-checkpoint.tmp (No such file or 
directory) at java.base/java.io.FileOutputStream.open0(Native Method) at 
java.base/java.io.FileOutputStream.open(FileOutputStream.java:299) at 
java.base/java.io.FileOutputStream.<init>(FileOutputStream.java:238) at 
java.base/java.io.FileOutputStream.<init>(FileOutputStream.java:188) at 
kafka.server.checkpoints.CheckpointFile.liftedTree1$1(CheckpointFile.scala:52) 
at kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:50) at 
kafka.server.checkpoints.LeaderEpochCheckpointFile.write(LeaderEpochCheckpointFile.scala:64)
 at 
kafka.server.epoch.LeaderEpochFileCache.kafka$server$epoch$LeaderEpochFileCache$$flush(LeaderEpochFileCache.scala:219)
 at 
kafka.server.epoch.LeaderEpochFileCache$$anonfun$assign$1.apply$mcV$sp(LeaderEpochFileCache.scala:62)
 at 
kafka.server.epoch.LeaderEpochFileCache$$anonfun$assign$1.apply(LeaderEpochFileCache.scala:52)
 at 
kafka.server.epoch.LeaderEpochFileCache$$anonfun$assign$1.apply(LeaderEpochFileCache.scala:52)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at 
kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259) at 
kafka.server.epoch.LeaderEpochFileCache.assign(LeaderEpochFileCache.scala:52) 
at 
kafka.cluster.Partition$$anonfun$5$$anonfun$apply$8.apply(Partition.scala:395) 
at 
kafka.cluster.Partition$$anonfun$5$$anonfun$apply$8.apply(Partition.scala:394) 
at scala.Option.foreach(Option.scala:257) at 
kafka.cluster.Partition$$anonfun$5.apply(Partition.scala:394) at 
kafka.cluster.Partition$$anonfun$5.apply(Partition.scala:367) at 
kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at 
kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259) at 
kafka.cluster.Partition.makeLeader(Partition.scala:367) at 
kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:1162)
 at 
kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:1160)
 at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) at 
scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at 
scala.collection.mutable.HashMap.foreach(HashMap.scala:130) at 
kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:1160) at 
kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1072) 
at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:185) at 
kafka.server.KafkaApis.handle(KafkaApis.scala:110) at 
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at 
java.base/java.lang.Thread.run(Thread.java:844) [2019-03-04 15:36:56,864] INFO 
[ReplicaManager broker=0] Stopping serving replicas in dir /logs/old_dir 
(kafka.server.ReplicaManager)
{code}
 As you can see from the stack trace, `Replica#epochs`'s `LeaderEpochFileCache` 
still points to the old `/logs/old_dir/test_log_dir-0/leader-epoch-checkpoint` 
file

  was:
When changing a partition's log directories for a follower broker, we move all 
the data related to that partition to the other log dir (as per 
[KIP-113|https://cwiki.apache.org/confluence/display/KAFKA/KIP-113:+Support+replicas+movement+between+log+directories]).
 On a successful move, we rename the original directory by adding a suffix 
consisting of an UUID and `-delete`. (e.g `test_log_dir` would be renamed to 
`test_log_dir-0.32e77c96939140f9a56a49b75ad8ec8d-delete`)

We copy every log file and [initialize a new leader epoch file 
cache|https://github.com/apache/kafka/blob/0d56f1413557adabc736cae2dffcdc56a620403e/core/src/main/scala/kafka/log/Log.scala#L768].
 The problem is that we do not update the associated `Replica` class' leader 
epoch cache - it still points to the old `LeaderEpochFileCache` instance.
This results in a FileNotFound exception when the broker is [elected as a 
leader for the 
[partition|https://github.com/apache/kafka/blob/255f4a6effdc71c273691859cd26c4138acad778/core/src/main/scala/kafka/cluster/Partition.scala#L312].
 This has the unintended side effect of marking the log directory as offline, 
resulting in all partitions from that log directory becoming unavailable for 
the specific broker.
h2.  
h3. Exception and logs

 

I reproduced this locally by running two brokers. The steps to reproduce:

 
{code:java}
Create partition replicated across two brokers (A, B) with leader A
Move partition leadership to B
Alter log dirs on A
Move partition leadership back to A{code}
This results in a log directory structure on broker B similar to this:
{code:java}
├── new_dir
│   ├── cleaner-offset-checkpoint
│   ├── log-start-offset-checkpoint
│   ├── meta.properties
│   ├── recovery-point-offset-checkpoint
│   ├── replication-offset-checkpoint
│   └── test_log_dir-0
│   ├── 00000000000000000000.index
│   ├── 00000000000000000000.log
│   ├── 00000000000000000000.timeindex
│   └── leader-epoch-checkpoint
└── old_dir
  ├── cleaner-offset-checkpoint
  ├── log-start-offset-checkpoint
  ├── meta.properties
  ├── recovery-point-offset-checkpoint
  ├── replication-offset-checkpoint
  └── test_log_dir-0.32e77c96939140f9a56a49b75ad8ec8d-delete
    ├── 00000000000000000000.index
    ├── 00000000000000000000.log
    ├── 00000000000000000000.timeindex
    ├── 00000000000000000009.snapshot
    └── leader-epoch-checkpoint
{code}
 

 
{code:java}
[2019-03-04 15:36:56,854] INFO [Partition test_log_dir-0 broker=0] 
test_log_dir-0 starts at Leader Epoch 3 from offset 9. Previous Leader Epoch 
was: 2 (kafka.cluster.Partition) [2019-03-04 15:36:56,855] WARN 
[LeaderEpochCache test_log_dir-0] New epoch entry EpochEntry(epoch=3, 
startOffset=9) caused truncation of conflicting entries 
ListBuffer(EpochEntry(epoch=1, startOffset=9)). Cache now contains 2 entries. 
(kafka.server.epoch.LeaderEpochFileCache) [2019-03-04 15:36:56,857] ERROR Error 
while writing to checkpoint file 
/logs/old_dir/test_log_dir-0/leader-epoch-checkpoint 
(kafka.server.LogDirFailureChannel) java.io.FileNotFoundException: 
/logs/old_dir/test_log_dir-0/leader-epoch-checkpoint.tmp (No such file or 
directory) at java.base/java.io.FileOutputStream.open0(Native Method) at 
java.base/java.io.FileOutputStream.open(FileOutputStream.java:299) at 
java.base/java.io.FileOutputStream.<init>(FileOutputStream.java:238) at 
java.base/java.io.FileOutputStream.<init>(FileOutputStream.java:188) at 
kafka.server.checkpoints.CheckpointFile.liftedTree1$1(CheckpointFile.scala:52) 
at kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:50) at 
kafka.server.checkpoints.LeaderEpochCheckpointFile.write(LeaderEpochCheckpointFile.scala:64)
 at 
kafka.server.epoch.LeaderEpochFileCache.kafka$server$epoch$LeaderEpochFileCache$$flush(LeaderEpochFileCache.scala:219)
 at 
kafka.server.epoch.LeaderEpochFileCache$$anonfun$assign$1.apply$mcV$sp(LeaderEpochFileCache.scala:62)
 at 
kafka.server.epoch.LeaderEpochFileCache$$anonfun$assign$1.apply(LeaderEpochFileCache.scala:52)
 at 
kafka.server.epoch.LeaderEpochFileCache$$anonfun$assign$1.apply(LeaderEpochFileCache.scala:52)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at 
kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259) at 
kafka.server.epoch.LeaderEpochFileCache.assign(LeaderEpochFileCache.scala:52) 
at 
kafka.cluster.Partition$$anonfun$5$$anonfun$apply$8.apply(Partition.scala:395) 
at 
kafka.cluster.Partition$$anonfun$5$$anonfun$apply$8.apply(Partition.scala:394) 
at scala.Option.foreach(Option.scala:257) at 
kafka.cluster.Partition$$anonfun$5.apply(Partition.scala:394) at 
kafka.cluster.Partition$$anonfun$5.apply(Partition.scala:367) at 
kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at 
kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259) at 
kafka.cluster.Partition.makeLeader(Partition.scala:367) at 
kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:1162)
 at 
kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:1160)
 at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) at 
scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at 
scala.collection.mutable.HashMap.foreach(HashMap.scala:130) at 
kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:1160) at 
kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1072) 
at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:185) at 
kafka.server.KafkaApis.handle(KafkaApis.scala:110) at 
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at 
java.base/java.lang.Thread.run(Thread.java:844) [2019-03-04 15:36:56,864] INFO 
[ReplicaManager broker=0] Stopping serving replicas in dir /logs/old_dir 
(kafka.server.ReplicaManager)
{code}
 As you can see from the stack trace, `Replica#epochs`'s `LeaderEpochFileCache` 
still points to the old `/logs/old_dir/test_log_dir-0/leader-epoch-checkpoint` 
file


> Log dir reassignment on followers fails with FileNotFoundException for the 
> leader epoch cache on leader election
> ----------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8036
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8036
>             Project: Kafka
>          Issue Type: Improvement
>    Affects Versions: 1.1.1, 2.0.1
>            Reporter: Stanislav Kozlovski
>            Assignee: Stanislav Kozlovski
>            Priority: Major
>
> When changing a partition's log directories for a follower broker, we move 
> all the data related to that partition to the other log dir (as per 
> [KIP-113|https://cwiki.apache.org/confluence/display/KAFKA/KIP-113:+Support+replicas+movement+between+log+directories]).
>  On a successful move, we rename the original directory by adding a suffix 
> consisting of an UUID and `-delete`. (e.g `test_log_dir` would be renamed to 
> `test_log_dir-0.32e77c96939140f9a56a49b75ad8ec8d-delete`)
> We copy every log file and [initialize a new leader epoch file 
> cache|https://github.com/apache/kafka/blob/0d56f1413557adabc736cae2dffcdc56a620403e/core/src/main/scala/kafka/log/Log.scala#L768].
>  The problem is that we do not update the associated `Replica` class' leader 
> epoch cache - it still points to the old `LeaderEpochFileCache` instance.
> This results in a FileNotFound exception when the broker is [elected as a 
> leader for the 
> [partition|https://github.com/apache/kafka/blob/255f4a6effdc71c273691859cd26c4138acad778/core/src/main/scala/kafka/cluster/Partition.scala#L312].
>  This has the unintended side effect of marking the log directory as offline, 
> resulting in all partitions from that log directory becoming unavailable for 
> the specific broker.
> h2.  
> h2. Exception and logs
>  I reproduced this locally by running two brokers. The steps to reproduce:
>  
> {code:java}
> Create partition replicated across two brokers (A, B) with leader A
> Move partition leadership to B
> Alter log dirs on A
> Move partition leadership back to A{code}
> This results in a log directory structure on broker B similar to this:
> {code:java}
> ├── new_dir
> │   ├── cleaner-offset-checkpoint
> │   ├── log-start-offset-checkpoint
> │   ├── meta.properties
> │   ├── recovery-point-offset-checkpoint
> │   ├── replication-offset-checkpoint
> │   └── test_log_dir-0
> │   ├── 00000000000000000000.index
> │   ├── 00000000000000000000.log
> │   ├── 00000000000000000000.timeindex
> │   └── leader-epoch-checkpoint
> └── old_dir
>   ├── cleaner-offset-checkpoint
>   ├── log-start-offset-checkpoint
>   ├── meta.properties
>   ├── recovery-point-offset-checkpoint
>   ├── replication-offset-checkpoint
>   └── test_log_dir-0.32e77c96939140f9a56a49b75ad8ec8d-delete
>     ├── 00000000000000000000.index
>     ├── 00000000000000000000.log
>     ├── 00000000000000000000.timeindex
>     ├── 00000000000000000009.snapshot
>     └── leader-epoch-checkpoint
> {code}
>  
>  
> {code:java}
> [2019-03-04 15:36:56,854] INFO [Partition test_log_dir-0 broker=0] 
> test_log_dir-0 starts at Leader Epoch 3 from offset 9. Previous Leader Epoch 
> was: 2 (kafka.cluster.Partition) [2019-03-04 15:36:56,855] WARN 
> [LeaderEpochCache test_log_dir-0] New epoch entry EpochEntry(epoch=3, 
> startOffset=9) caused truncation of conflicting entries 
> ListBuffer(EpochEntry(epoch=1, startOffset=9)). Cache now contains 2 entries. 
> (kafka.server.epoch.LeaderEpochFileCache) [2019-03-04 15:36:56,857] ERROR 
> Error while writing to checkpoint file 
> /logs/old_dir/test_log_dir-0/leader-epoch-checkpoint 
> (kafka.server.LogDirFailureChannel) java.io.FileNotFoundException: 
> /logs/old_dir/test_log_dir-0/leader-epoch-checkpoint.tmp (No such file or 
> directory) at java.base/java.io.FileOutputStream.open0(Native Method) at 
> java.base/java.io.FileOutputStream.open(FileOutputStream.java:299) at 
> java.base/java.io.FileOutputStream.<init>(FileOutputStream.java:238) at 
> java.base/java.io.FileOutputStream.<init>(FileOutputStream.java:188) at 
> kafka.server.checkpoints.CheckpointFile.liftedTree1$1(CheckpointFile.scala:52)
>  at kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:50) at 
> kafka.server.checkpoints.LeaderEpochCheckpointFile.write(LeaderEpochCheckpointFile.scala:64)
>  at 
> kafka.server.epoch.LeaderEpochFileCache.kafka$server$epoch$LeaderEpochFileCache$$flush(LeaderEpochFileCache.scala:219)
>  at 
> kafka.server.epoch.LeaderEpochFileCache$$anonfun$assign$1.apply$mcV$sp(LeaderEpochFileCache.scala:62)
>  at 
> kafka.server.epoch.LeaderEpochFileCache$$anonfun$assign$1.apply(LeaderEpochFileCache.scala:52)
>  at 
> kafka.server.epoch.LeaderEpochFileCache$$anonfun$assign$1.apply(LeaderEpochFileCache.scala:52)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at 
> kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259) at 
> kafka.server.epoch.LeaderEpochFileCache.assign(LeaderEpochFileCache.scala:52) 
> at 
> kafka.cluster.Partition$$anonfun$5$$anonfun$apply$8.apply(Partition.scala:395)
>  at 
> kafka.cluster.Partition$$anonfun$5$$anonfun$apply$8.apply(Partition.scala:394)
>  at scala.Option.foreach(Option.scala:257) at 
> kafka.cluster.Partition$$anonfun$5.apply(Partition.scala:394) at 
> kafka.cluster.Partition$$anonfun$5.apply(Partition.scala:367) at 
> kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at 
> kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259) at 
> kafka.cluster.Partition.makeLeader(Partition.scala:367) at 
> kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:1162)
>  at 
> kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManager.scala:1160)
>  at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
> at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) 
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at 
> scala.collection.mutable.HashMap.foreach(HashMap.scala:130) at 
> kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:1160) at 
> kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1072) 
> at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:185) at 
> kafka.server.KafkaApis.handle(KafkaApis.scala:110) at 
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at 
> java.base/java.lang.Thread.run(Thread.java:844) [2019-03-04 15:36:56,864] 
> INFO [ReplicaManager broker=0] Stopping serving replicas in dir /logs/old_dir 
> (kafka.server.ReplicaManager)
> {code}
>  As you can see from the stack trace, `Replica#epochs`'s 
> `LeaderEpochFileCache` still points to the old 
> `/logs/old_dir/test_log_dir-0/leader-epoch-checkpoint` file



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to