[jira] [Updated] (KAFKA-8362) LogCleaner gets stuck after partition move between log directories
[ https://issues.apache.org/jira/browse/KAFKA-8362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julio Ng updated KAFKA-8362: Description: When a partition is moved from one directory to another, their checkpoint entry in cleaner-offset-checkpoint file is not removed from the source directory. As a consequence when we read the last firstDirtyOffset, we might get a stale value from the old checkpoint file. Basically, we need clean up the entry from the check point file in the source directory when the move is completed The current issue is that the code in LogCleanerManager: {noformat} /** * @return the position processed for all logs. */ def allCleanerCheckpoints: Map[TopicPartition, Long] = { inLock(lock) { checkpoints.values.flatMap(checkpoint => { try { checkpoint.read() } catch { case e: KafkaStorageException => error(s"Failed to access checkpoint file ${checkpoint.file.getName} in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e) Map.empty[TopicPartition, Long] } }).toMap } }{noformat} collapses the offsets when multiple entries exist for the topicPartition was: When a partition is moved from one directory to another, their checkpoint entry in cleaner-offset-checkpoint file is not removed from the source directory. As a consequence when we read the last firstDirtyOffset, we might get a stale value from the old checkpoint file. Basically, we need clean up the entry from the check point file in the source directory when the move is completed The current issue is that the code in LogCleanerManager: {{def allCleanerCheckpoints: Map[TopicPartition, Long] = {}} {{ inLock(lock) {}} {{ checkpoints.values.flatMap(checkpoint => {}} {{ try {}} {{ checkpoint.read()}} {{ } catch {}} {{ case e: KafkaStorageException =>}} {{ error(s"Failed to access checkpoint file ${checkpoint.file.getName} in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e)}} {{ Map.empty[TopicPartition, Long]}} {{ }}} {{ }).toMap}} {{ }}} {{}}} collapses the offsets when multiple entries exist for the topicPartition > LogCleaner gets stuck after partition move between log directories > -- > > Key: KAFKA-8362 > URL: https://issues.apache.org/jira/browse/KAFKA-8362 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Reporter: Julio Ng >Priority: Major > > When a partition is moved from one directory to another, their checkpoint > entry in cleaner-offset-checkpoint file is not removed from the source > directory. > As a consequence when we read the last firstDirtyOffset, we might get a stale > value from the old checkpoint file. > Basically, we need clean up the entry from the check point file in the source > directory when the move is completed > The current issue is that the code in LogCleanerManager: > {noformat} > /** > * @return the position processed for all logs. > */ > def allCleanerCheckpoints: Map[TopicPartition, Long] = { > inLock(lock) { > checkpoints.values.flatMap(checkpoint => { > try { > checkpoint.read() > } catch { > case e: KafkaStorageException => > error(s"Failed to access checkpoint file ${checkpoint.file.getName} > in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e) > Map.empty[TopicPartition, Long] > } > }).toMap > } > }{noformat} > collapses the offsets when multiple entries exist for the topicPartition -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8362) LogCleaner gets stuck after partition move between log directories
Julio Ng created KAFKA-8362: --- Summary: LogCleaner gets stuck after partition move between log directories Key: KAFKA-8362 URL: https://issues.apache.org/jira/browse/KAFKA-8362 Project: Kafka Issue Type: Bug Components: log cleaner Reporter: Julio Ng When a partition is moved from one directory to another, their checkpoint entry in cleaner-offset-checkpoint file is not removed from the source directory. As a consequence when we read the last firstDirtyOffset, we might get a stale value from the old checkpoint file. Basically, we need clean up the entry from the check point file in the source directory when the move is completed The current issue is that the code in LogCleanerManager: {{def allCleanerCheckpoints: Map[TopicPartition, Long] = {}} {{ inLock(lock) {}} {{ checkpoints.values.flatMap(checkpoint => {}} {{ try {}} {{ checkpoint.read()}} {{ } catch {}} {{ case e: KafkaStorageException =>}} {{ error(s"Failed to access checkpoint file ${checkpoint.file.getName} in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e)}} {{ Map.empty[TopicPartition, Long]}} {{ }}} {{ }).toMap}} {{ }}} {{}}} collapses the offsets when multiple entries exist for the topicPartition -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown
[ https://issues.apache.org/jira/browse/KAFKA-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16399022#comment-16399022 ] Julio Ng commented on KAFKA-6649: - The investigation for our OutOfRangeException doesn't look like it had the same sequence of events as those described in KAFKA-3978 However, I was wondering whether it is by design that any non-fatal exception propagated to the doWork call will stop a ShutdownableThread. > ReplicaFetcher stopped after non fatal exception is thrown > -- > > Key: KAFKA-6649 > URL: https://issues.apache.org/jira/browse/KAFKA-6649 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 1.0.0, 0.11.0.2, 1.1.0, 1.0.1 >Reporter: Julio Ng >Priority: Major > > We have seen several under-replication partitions, usually triggered by topic > creation. After digging in the logs, we see the below: > {noformat} > [2018-03-12 22:40:17,641] ERROR [ReplicaFetcher replicaId=12, leaderId=0, > fetcherId=1] Error due to (kafka.server.ReplicaFetcherThread) > kafka.common.KafkaException: Error processing data for partition > [[TOPIC_NAME_REMOVED]]-84 offset 2098535 > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot > increment the log start offset to 2098535 of partition > [[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1 > [2018-03-12 22:40:17,641] INFO [ReplicaFetcher replicaId=12, leaderId=0, > fetcherId=1] Stopped (kafka.server.ReplicaFetcherThread){noformat} > It looks like that after the ReplicaFetcherThread is stopped, the replicas > start to lag behind, presumably because we are not fetching from the leader > anymore. Further examining, the ShutdownableThread.scala object: > {noformat} > override def run(): Unit = { > info("Starting") > try { >while (isRunning) > doWork() > } catch { >case e: FatalExitError => > shutdownInitiated.countDown() > shutdownComplete.countDown() > info("Stopped") > Exit.exit(e.statusCode()) >case e: Throwable => > if (isRunning) >error("Error due to", e) > } finally { >shutdownComplete.countDown() > } > info("Stopped") > }{noformat} > For the Throwable (non-fatal) case, it just exits the while loop and the > thread stops doing work. I am not sure whether this is the intended behavior > of the ShutdownableThread, or the exception should be caught and we should > keep calling doWork() > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown
[ https://issues.apache.org/jira/browse/KAFKA-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julio Ng updated KAFKA-6649: Affects Version/s: 1.0.0 0.11.0.2 1.0.1 > ReplicaFetcher stopped after non fatal exception is thrown > -- > > Key: KAFKA-6649 > URL: https://issues.apache.org/jira/browse/KAFKA-6649 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 1.0.0, 0.11.0.2, 1.1.0, 1.0.1 >Reporter: Julio Ng >Priority: Major > > We have seen several under-replication partitions, usually triggered by topic > creation. After digging in the logs, we see the below: > {noformat} > [2018-03-12 22:40:17,641] ERROR [ReplicaFetcher replicaId=12, leaderId=0, > fetcherId=1] Error due to (kafka.server.ReplicaFetcherThread) > kafka.common.KafkaException: Error processing data for partition > [[TOPIC_NAME_REMOVED]]-84 offset 2098535 > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166) > at > kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot > increment the log start offset to 2098535 of partition > [[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1 > [2018-03-12 22:40:17,641] INFO [ReplicaFetcher replicaId=12, leaderId=0, > fetcherId=1] Stopped (kafka.server.ReplicaFetcherThread){noformat} > It looks like that after the ReplicaFetcherThread is stopped, the replicas > start to lag behind, presumably because we are not fetching from the leader > anymore. Further examining, the ShutdownableThread.scala object: > {noformat} > override def run(): Unit = { > info("Starting") > try { >while (isRunning) > doWork() > } catch { >case e: FatalExitError => > shutdownInitiated.countDown() > shutdownComplete.countDown() > info("Stopped") > Exit.exit(e.statusCode()) >case e: Throwable => > if (isRunning) >error("Error due to", e) > } finally { >shutdownComplete.countDown() > } > info("Stopped") > }{noformat} > For the Throwable (non-fatal) case, it just exits the while loop and the > thread stops doing work. I am not sure whether this is the intended behavior > of the ShutdownableThread, or the exception should be caught and we should > keep calling doWork() > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown
Julio Ng created KAFKA-6649: --- Summary: ReplicaFetcher stopped after non fatal exception is thrown Key: KAFKA-6649 URL: https://issues.apache.org/jira/browse/KAFKA-6649 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 1.1.0 Reporter: Julio Ng We have seen several under-replication partitions, usually triggered by topic creation. After digging in the logs, we see the below: {noformat} [2018-03-12 22:40:17,641] ERROR [ReplicaFetcher replicaId=12, leaderId=0, fetcherId=1] Error due to (kafka.server.ReplicaFetcherThread) kafka.common.KafkaException: Error processing data for partition [[TOPIC_NAME_REMOVED]]-84 offset 2098535 at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169) at scala.Option.foreach(Option.scala:257) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot increment the log start offset to 2098535 of partition [[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1 [2018-03-12 22:40:17,641] INFO [ReplicaFetcher replicaId=12, leaderId=0, fetcherId=1] Stopped (kafka.server.ReplicaFetcherThread){noformat} It looks like that after the ReplicaFetcherThread is stopped, the replicas start to lag behind, presumably because we are not fetching from the leader anymore. Further examining, the ShutdownableThread.scala object: {noformat} override def run(): Unit = { info("Starting") try { while (isRunning) doWork() } catch { case e: FatalExitError => shutdownInitiated.countDown() shutdownComplete.countDown() info("Stopped") Exit.exit(e.statusCode()) case e: Throwable => if (isRunning) error("Error due to", e) } finally { shutdownComplete.countDown() } info("Stopped") }{noformat} For the Throwable (non-fatal) case, it just exits the while loop and the thread stops doing work. I am not sure whether this is the intended behavior of the ShutdownableThread, or the exception should be caught and we should keep calling doWork() -- This message was sent by Atlassian JIRA (v7.6.3#76005)