[ 
https://issues.apache.org/jira/browse/KAFKA-8362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julio Ng updated KAFKA-8362:
----------------------------
    Description: 
When a partition is moved from one directory to another, their checkpoint entry 
in cleaner-offset-checkpoint file is not removed from the source directory.

As a consequence when we read the last firstDirtyOffset, we might get a stale 
value from the old checkpoint file.

Basically, we need clean up the entry from the check point file in the source 
directory when the move is completed

The current issue is that the code in LogCleanerManager:
{noformat}
/**
 * @return the position processed for all logs.
 */
def allCleanerCheckpoints: Map[TopicPartition, Long] = {
  inLock(lock) {
    checkpoints.values.flatMap(checkpoint => {
      try {
        checkpoint.read()
      } catch {
        case e: KafkaStorageException =>
          error(s"Failed to access checkpoint file ${checkpoint.file.getName} 
in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e)
          Map.empty[TopicPartition, Long]
      }
    }).toMap
  }
}{noformat}
collapses the offsets when multiple entries exist for the topicPartition

  was:
When a partition is moved from one directory to another, their checkpoint entry 
in cleaner-offset-checkpoint file is not removed from the source directory.

As a consequence when we read the last firstDirtyOffset, we might get a stale 
value from the old checkpoint file.

Basically, we need clean up the entry from the check point file in the source 
directory when the move is completed

The current issue is that the code in LogCleanerManager:

{{def allCleanerCheckpoints: Map[TopicPartition, Long] = {}}
{{  inLock(lock) {}}
{{    checkpoints.values.flatMap(checkpoint => {}}
{{      try {}}
{{        checkpoint.read()}}
{{      } catch {}}
{{        case e: KafkaStorageException =>}}
{{          error(s"Failed to access checkpoint file ${checkpoint.file.getName} 
in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e)}}
{{          Map.empty[TopicPartition, Long]}}
{{      }}}
{{    }).toMap}}
{{  }}}
{{}}}

collapses the offsets when multiple entries exist for the topicPartition


> LogCleaner gets stuck after partition move between log directories
> ------------------------------------------------------------------
>
>                 Key: KAFKA-8362
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8362
>             Project: Kafka
>          Issue Type: Bug
>          Components: log cleaner
>            Reporter: Julio Ng
>            Priority: Major
>
> When a partition is moved from one directory to another, their checkpoint 
> entry in cleaner-offset-checkpoint file is not removed from the source 
> directory.
> As a consequence when we read the last firstDirtyOffset, we might get a stale 
> value from the old checkpoint file.
> Basically, we need clean up the entry from the check point file in the source 
> directory when the move is completed
> The current issue is that the code in LogCleanerManager:
> {noformat}
> /**
>  * @return the position processed for all logs.
>  */
> def allCleanerCheckpoints: Map[TopicPartition, Long] = {
>   inLock(lock) {
>     checkpoints.values.flatMap(checkpoint => {
>       try {
>         checkpoint.read()
>       } catch {
>         case e: KafkaStorageException =>
>           error(s"Failed to access checkpoint file ${checkpoint.file.getName} 
> in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e)
>           Map.empty[TopicPartition, Long]
>       }
>     }).toMap
>   }
> }{noformat}
> collapses the offsets when multiple entries exist for the topicPartition



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to