[jira] [Updated] (KAFKA-8362) LogCleaner gets stuck after partition move between log directories

2019-05-13 Thread Julio Ng (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julio Ng updated KAFKA-8362:

Description: 
When a partition is moved from one directory to another, their checkpoint entry 
in cleaner-offset-checkpoint file is not removed from the source directory.

As a consequence when we read the last firstDirtyOffset, we might get a stale 
value from the old checkpoint file.

Basically, we need clean up the entry from the check point file in the source 
directory when the move is completed

The current issue is that the code in LogCleanerManager:
{noformat}
/**
 * @return the position processed for all logs.
 */
def allCleanerCheckpoints: Map[TopicPartition, Long] = {
  inLock(lock) {
checkpoints.values.flatMap(checkpoint => {
  try {
checkpoint.read()
  } catch {
case e: KafkaStorageException =>
  error(s"Failed to access checkpoint file ${checkpoint.file.getName} 
in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e)
  Map.empty[TopicPartition, Long]
  }
}).toMap
  }
}{noformat}
collapses the offsets when multiple entries exist for the topicPartition

  was:
When a partition is moved from one directory to another, their checkpoint entry 
in cleaner-offset-checkpoint file is not removed from the source directory.

As a consequence when we read the last firstDirtyOffset, we might get a stale 
value from the old checkpoint file.

Basically, we need clean up the entry from the check point file in the source 
directory when the move is completed

The current issue is that the code in LogCleanerManager:

{{def allCleanerCheckpoints: Map[TopicPartition, Long] = {}}
{{  inLock(lock) {}}
{{    checkpoints.values.flatMap(checkpoint => {}}
{{      try {}}
{{        checkpoint.read()}}
{{      } catch {}}
{{        case e: KafkaStorageException =>}}
{{          error(s"Failed to access checkpoint file ${checkpoint.file.getName} 
in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e)}}
{{          Map.empty[TopicPartition, Long]}}
{{      }}}
{{    }).toMap}}
{{  }}}
{{}}}

collapses the offsets when multiple entries exist for the topicPartition


> LogCleaner gets stuck after partition move between log directories
> --
>
> Key: KAFKA-8362
> URL: https://issues.apache.org/jira/browse/KAFKA-8362
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Reporter: Julio Ng
>Priority: Major
>
> When a partition is moved from one directory to another, their checkpoint 
> entry in cleaner-offset-checkpoint file is not removed from the source 
> directory.
> As a consequence when we read the last firstDirtyOffset, we might get a stale 
> value from the old checkpoint file.
> Basically, we need clean up the entry from the check point file in the source 
> directory when the move is completed
> The current issue is that the code in LogCleanerManager:
> {noformat}
> /**
>  * @return the position processed for all logs.
>  */
> def allCleanerCheckpoints: Map[TopicPartition, Long] = {
>   inLock(lock) {
> checkpoints.values.flatMap(checkpoint => {
>   try {
> checkpoint.read()
>   } catch {
> case e: KafkaStorageException =>
>   error(s"Failed to access checkpoint file ${checkpoint.file.getName} 
> in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e)
>   Map.empty[TopicPartition, Long]
>   }
> }).toMap
>   }
> }{noformat}
> collapses the offsets when multiple entries exist for the topicPartition



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8362) LogCleaner gets stuck after partition move between log directories

2019-05-13 Thread Julio Ng (JIRA)
Julio Ng created KAFKA-8362:
---

 Summary: LogCleaner gets stuck after partition move between log 
directories
 Key: KAFKA-8362
 URL: https://issues.apache.org/jira/browse/KAFKA-8362
 Project: Kafka
  Issue Type: Bug
  Components: log cleaner
Reporter: Julio Ng


When a partition is moved from one directory to another, their checkpoint entry 
in cleaner-offset-checkpoint file is not removed from the source directory.

As a consequence when we read the last firstDirtyOffset, we might get a stale 
value from the old checkpoint file.

Basically, we need clean up the entry from the check point file in the source 
directory when the move is completed

The current issue is that the code in LogCleanerManager:

{{def allCleanerCheckpoints: Map[TopicPartition, Long] = {}}
{{  inLock(lock) {}}
{{    checkpoints.values.flatMap(checkpoint => {}}
{{      try {}}
{{        checkpoint.read()}}
{{      } catch {}}
{{        case e: KafkaStorageException =>}}
{{          error(s"Failed to access checkpoint file ${checkpoint.file.getName} 
in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e)}}
{{          Map.empty[TopicPartition, Long]}}
{{      }}}
{{    }).toMap}}
{{  }}}
{{}}}

collapses the offsets when multiple entries exist for the topicPartition



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown

2018-03-14 Thread Julio Ng (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16399022#comment-16399022
 ] 

Julio Ng commented on KAFKA-6649:
-

The investigation for our OutOfRangeException doesn't look like it had the same 
sequence of events as those described in KAFKA-3978

However, I was wondering whether it is by design that any non-fatal exception 
propagated to the doWork call will stop a ShutdownableThread.

> ReplicaFetcher stopped after non fatal exception is thrown
> --
>
> Key: KAFKA-6649
> URL: https://issues.apache.org/jira/browse/KAFKA-6649
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 1.0.0, 0.11.0.2, 1.1.0, 1.0.1
>Reporter: Julio Ng
>Priority: Major
>
> We have seen several under-replication partitions, usually triggered by topic 
> creation. After digging in the logs, we see the below:
> {noformat}
> [2018-03-12 22:40:17,641] ERROR [ReplicaFetcher replicaId=12, leaderId=0, 
> fetcherId=1] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> [[TOPIC_NAME_REMOVED]]-84 offset 2098535
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169)
>  at scala.Option.foreach(Option.scala:257)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
> increment the log start offset to 2098535 of partition 
> [[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1
> [2018-03-12 22:40:17,641] INFO [ReplicaFetcher replicaId=12, leaderId=0, 
> fetcherId=1] Stopped (kafka.server.ReplicaFetcherThread){noformat}
> It looks like that after the ReplicaFetcherThread is stopped, the replicas 
> start to lag behind, presumably because we are not fetching from the leader 
> anymore. Further examining, the ShutdownableThread.scala object:
> {noformat}
> override def run(): Unit = {
>  info("Starting")
>  try {
>while (isRunning)
>  doWork()
>  } catch {
>case e: FatalExitError =>
>  shutdownInitiated.countDown()
>  shutdownComplete.countDown()
>  info("Stopped")
>  Exit.exit(e.statusCode())
>case e: Throwable =>
>  if (isRunning)
>error("Error due to", e)
>  } finally {
>shutdownComplete.countDown()
>  }
>  info("Stopped")
> }{noformat}
> For the Throwable (non-fatal) case, it just exits the while loop and the 
> thread stops doing work. I am not sure whether this is the intended behavior 
> of the ShutdownableThread, or the exception should be caught and we should 
> keep calling doWork()
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown

2018-03-13 Thread Julio Ng (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julio Ng updated KAFKA-6649:

Affects Version/s: 1.0.0
   0.11.0.2
   1.0.1

> ReplicaFetcher stopped after non fatal exception is thrown
> --
>
> Key: KAFKA-6649
> URL: https://issues.apache.org/jira/browse/KAFKA-6649
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 1.0.0, 0.11.0.2, 1.1.0, 1.0.1
>Reporter: Julio Ng
>Priority: Major
>
> We have seen several under-replication partitions, usually triggered by topic 
> creation. After digging in the logs, we see the below:
> {noformat}
> [2018-03-12 22:40:17,641] ERROR [ReplicaFetcher replicaId=12, leaderId=0, 
> fetcherId=1] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> [[TOPIC_NAME_REMOVED]]-84 offset 2098535
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169)
>  at scala.Option.foreach(Option.scala:257)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
> increment the log start offset to 2098535 of partition 
> [[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1
> [2018-03-12 22:40:17,641] INFO [ReplicaFetcher replicaId=12, leaderId=0, 
> fetcherId=1] Stopped (kafka.server.ReplicaFetcherThread){noformat}
> It looks like that after the ReplicaFetcherThread is stopped, the replicas 
> start to lag behind, presumably because we are not fetching from the leader 
> anymore. Further examining, the ShutdownableThread.scala object:
> {noformat}
> override def run(): Unit = {
>  info("Starting")
>  try {
>while (isRunning)
>  doWork()
>  } catch {
>case e: FatalExitError =>
>  shutdownInitiated.countDown()
>  shutdownComplete.countDown()
>  info("Stopped")
>  Exit.exit(e.statusCode())
>case e: Throwable =>
>  if (isRunning)
>error("Error due to", e)
>  } finally {
>shutdownComplete.countDown()
>  }
>  info("Stopped")
> }{noformat}
> For the Throwable (non-fatal) case, it just exits the while loop and the 
> thread stops doing work. I am not sure whether this is the intended behavior 
> of the ShutdownableThread, or the exception should be caught and we should 
> keep calling doWork()
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown

2018-03-13 Thread Julio Ng (JIRA)
Julio Ng created KAFKA-6649:
---

 Summary: ReplicaFetcher stopped after non fatal exception is thrown
 Key: KAFKA-6649
 URL: https://issues.apache.org/jira/browse/KAFKA-6649
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 1.1.0
Reporter: Julio Ng


We have seen several under-replication partitions, usually triggered by topic 
creation. After digging in the logs, we see the below:
{noformat}
[2018-03-12 22:40:17,641] ERROR [ReplicaFetcher replicaId=12, leaderId=0, 
fetcherId=1] Error due to (kafka.server.ReplicaFetcherThread)
kafka.common.KafkaException: Error processing data for partition 
[[TOPIC_NAME_REMOVED]]-84 offset 2098535
 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169)
 at scala.Option.foreach(Option.scala:257)
 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166)
 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
 at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
 at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164)
 at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
increment the log start offset to 2098535 of partition 
[[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1
[2018-03-12 22:40:17,641] INFO [ReplicaFetcher replicaId=12, leaderId=0, 
fetcherId=1] Stopped (kafka.server.ReplicaFetcherThread){noformat}
It looks like that after the ReplicaFetcherThread is stopped, the replicas 
start to lag behind, presumably because we are not fetching from the leader 
anymore. Further examining, the ShutdownableThread.scala object:
{noformat}
override def run(): Unit = {
 info("Starting")
 try {
   while (isRunning)
 doWork()
 } catch {
   case e: FatalExitError =>
 shutdownInitiated.countDown()
 shutdownComplete.countDown()
 info("Stopped")
 Exit.exit(e.statusCode())
   case e: Throwable =>
 if (isRunning)
   error("Error due to", e)
 } finally {
   shutdownComplete.countDown()
 }
 info("Stopped")
}{noformat}
For the Throwable (non-fatal) case, it just exits the while loop and the thread 
stops doing work. I am not sure whether this is the intended behavior of the 
ShutdownableThread, or the exception should be caught and we should keep 
calling doWork()

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)