[jira] [Updated] (KAFKA-14071) Kafka request handler threads saturated when moving a partition

2022-07-12 Thread Qinghui Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qinghui Xu updated KAFKA-14071:
---
Description: 
*Kafka version:* 2.7.1

 

*Our scenario:*

Each server has 72 cores, and running with around 100 request handler threads, 
50 network handler threads.

Many (a few hundreds) readers consuming from the same topic (and the same 
partitions) as they don't belong to the same consumer group.

Many (hundreds) producers are also producing data into the same topic, with a 
throughput around 100KB/s.

 

*The procedure to reproduce it:*

Move a partition leader replica to a new broker which was not the follower 
(meaning it does not have data for that partition)

 

*Observation:*

All Kafka request handler threads are overloaded. After an analysis of the 
threaddump, it seems most of them are trying to read the same log segment file 
which requires locking a monitor on a specific object in the 
`sun.nio.ch.FileChannelImpl`.

 

*Other remarks:*

Problem is not reproduced when it's a simple leadership transition between the 
replicas. For example, we try to shut down the leader broker, or move leader to 
another follower using kafka assignment script, it's working fine.

  was:
*Kafka version:* 2.7.1

{*}Our scenarios:{*}{*}{*}

Each server has 72 cores, and running with around 100 request handler threads, 
50 network handler threads.

Many (a few hundreds) readers consuming from the same topic (and the same 
partitions) as they don't belong to the same consumer group.

Many (hundreds) producers are also producing data into the same topic, with a 
throughput around 100KB/s.

 

*The procedure to reproduce it:*

Move a partition leader replica to a new broker which was not the follower 
(meaning it does not have data for that partition)

 

*Observation:*

All Kafka request handler threads are overloaded. After an analysis of the 
threaddump, it seems most of them are trying to read the same log segment file 
which requires locking a monitor on a specific object in the 
`sun.nio.ch.FileChannelImpl`.

 

*Other remarks:*

Problem is not reproduced when it's a simple leadership transition between the 
replicas. For example, we try to shut down the leader broker, or move leader to 
another follower using kafka assignment script, it's working fine.


> Kafka request handler threads saturated when moving a partition
> ---
>
> Key: KAFKA-14071
> URL: https://issues.apache.org/jira/browse/KAFKA-14071
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Qinghui Xu
>Priority: Major
>
> *Kafka version:* 2.7.1
>  
> *Our scenario:*
> Each server has 72 cores, and running with around 100 request handler 
> threads, 50 network handler threads.
> Many (a few hundreds) readers consuming from the same topic (and the same 
> partitions) as they don't belong to the same consumer group.
> Many (hundreds) producers are also producing data into the same topic, with a 
> throughput around 100KB/s.
>  
> *The procedure to reproduce it:*
> Move a partition leader replica to a new broker which was not the follower 
> (meaning it does not have data for that partition)
>  
> *Observation:*
> All Kafka request handler threads are overloaded. After an analysis of the 
> threaddump, it seems most of them are trying to read the same log segment 
> file which requires locking a monitor on a specific object in the 
> `sun.nio.ch.FileChannelImpl`.
>  
> *Other remarks:*
> Problem is not reproduced when it's a simple leadership transition between 
> the replicas. For example, we try to shut down the leader broker, or move 
> leader to another follower using kafka assignment script, it's working fine.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14071) Kafka request handler threads saturated when moving a partition

2022-07-12 Thread Qinghui Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qinghui Xu updated KAFKA-14071:
---
Description: 
*Kafka version:* 2.7.1


{*}Our scenarios:{*}{*}{*}

Each server has 72 cores, and running with around 100 request handler threads, 
50 network handler threads.

Many (a few hundreds) readers consuming from the same topic (and the same 
partitions) as they don't belong to the same consumer group.

Many (hundreds) producers are also producing data into the same topic, with a 
throughput around 100KB/s.

 

*The procedure to reproduce it:*

Move a partition leader replica to a new broker which was not the follower 
(meaning it does not have data for that partition)

 

*Observation:*

All Kafka request handler threads are overloaded. After an analysis of the 
threaddump, it seems most of them are trying to read the same log segment file 
which requires locking a monitor on a specific object in the 
`sun.nio.ch.FileChannelImpl`.

 

*Other remarks:*

Problem is not reproduced when it's a simple leadership transition between the 
replicas. For example, we try to shut down the leader broker, or move leader to 
another follower using kafka assignment script, it's working fine.

  was:
*Our scenarios:*

Many (a few hundreds) readers consuming from the same topic (and the same 
partitions) as they don't belong to the same consumer group.

Many (hundreds) producers are also producing data into the same topic, with a 
throughput around 100KB/s.

*The procedure to reproduce it:*

Move a partition leader replica to a new broker which was not the follower 
(meaning it does not have data for that partition)

 

*Observation:*

All Kafka request handler threads are overloaded. After an analysis of the 
threaddump, it seems most of them are trying to read the same log segment file 
which requires locking a monitor on a specific object in the 
`sun.nio.ch.FileChannelImpl`.

 

*Other remarks:*

Problem is not reproduced when it's a simple leadership transition between the 
replicas. For example, we try to shut down the leader broker, or move leader to 
another follower using kafka assignment script, it's working fine.


> Kafka request handler threads saturated when moving a partition
> ---
>
> Key: KAFKA-14071
> URL: https://issues.apache.org/jira/browse/KAFKA-14071
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Qinghui Xu
>Priority: Major
>
> *Kafka version:* 2.7.1
> {*}Our scenarios:{*}{*}{*}
> Each server has 72 cores, and running with around 100 request handler 
> threads, 50 network handler threads.
> Many (a few hundreds) readers consuming from the same topic (and the same 
> partitions) as they don't belong to the same consumer group.
> Many (hundreds) producers are also producing data into the same topic, with a 
> throughput around 100KB/s.
>  
> *The procedure to reproduce it:*
> Move a partition leader replica to a new broker which was not the follower 
> (meaning it does not have data for that partition)
>  
> *Observation:*
> All Kafka request handler threads are overloaded. After an analysis of the 
> threaddump, it seems most of them are trying to read the same log segment 
> file which requires locking a monitor on a specific object in the 
> `sun.nio.ch.FileChannelImpl`.
>  
> *Other remarks:*
> Problem is not reproduced when it's a simple leadership transition between 
> the replicas. For example, we try to shut down the leader broker, or move 
> leader to another follower using kafka assignment script, it's working fine.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14071) Kafka request handler threads saturated when moving a partition

2022-07-12 Thread Qinghui Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14071?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qinghui Xu updated KAFKA-14071:
---
Description: 
*Kafka version:* 2.7.1

{*}Our scenarios:{*}{*}{*}

Each server has 72 cores, and running with around 100 request handler threads, 
50 network handler threads.

Many (a few hundreds) readers consuming from the same topic (and the same 
partitions) as they don't belong to the same consumer group.

Many (hundreds) producers are also producing data into the same topic, with a 
throughput around 100KB/s.

 

*The procedure to reproduce it:*

Move a partition leader replica to a new broker which was not the follower 
(meaning it does not have data for that partition)

 

*Observation:*

All Kafka request handler threads are overloaded. After an analysis of the 
threaddump, it seems most of them are trying to read the same log segment file 
which requires locking a monitor on a specific object in the 
`sun.nio.ch.FileChannelImpl`.

 

*Other remarks:*

Problem is not reproduced when it's a simple leadership transition between the 
replicas. For example, we try to shut down the leader broker, or move leader to 
another follower using kafka assignment script, it's working fine.

  was:
*Kafka version:* 2.7.1


{*}Our scenarios:{*}{*}{*}

Each server has 72 cores, and running with around 100 request handler threads, 
50 network handler threads.

Many (a few hundreds) readers consuming from the same topic (and the same 
partitions) as they don't belong to the same consumer group.

Many (hundreds) producers are also producing data into the same topic, with a 
throughput around 100KB/s.

 

*The procedure to reproduce it:*

Move a partition leader replica to a new broker which was not the follower 
(meaning it does not have data for that partition)

 

*Observation:*

All Kafka request handler threads are overloaded. After an analysis of the 
threaddump, it seems most of them are trying to read the same log segment file 
which requires locking a monitor on a specific object in the 
`sun.nio.ch.FileChannelImpl`.

 

*Other remarks:*

Problem is not reproduced when it's a simple leadership transition between the 
replicas. For example, we try to shut down the leader broker, or move leader to 
another follower using kafka assignment script, it's working fine.


> Kafka request handler threads saturated when moving a partition
> ---
>
> Key: KAFKA-14071
> URL: https://issues.apache.org/jira/browse/KAFKA-14071
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Qinghui Xu
>Priority: Major
>
> *Kafka version:* 2.7.1
> {*}Our scenarios:{*}{*}{*}
> Each server has 72 cores, and running with around 100 request handler 
> threads, 50 network handler threads.
> Many (a few hundreds) readers consuming from the same topic (and the same 
> partitions) as they don't belong to the same consumer group.
> Many (hundreds) producers are also producing data into the same topic, with a 
> throughput around 100KB/s.
>  
> *The procedure to reproduce it:*
> Move a partition leader replica to a new broker which was not the follower 
> (meaning it does not have data for that partition)
>  
> *Observation:*
> All Kafka request handler threads are overloaded. After an analysis of the 
> threaddump, it seems most of them are trying to read the same log segment 
> file which requires locking a monitor on a specific object in the 
> `sun.nio.ch.FileChannelImpl`.
>  
> *Other remarks:*
> Problem is not reproduced when it's a simple leadership transition between 
> the replicas. For example, we try to shut down the leader broker, or move 
> leader to another follower using kafka assignment script, it's working fine.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14071) Kafka request handler threads saturated when moving a partition

2022-07-12 Thread Qinghui Xu (Jira)
Qinghui Xu created KAFKA-14071:
--

 Summary: Kafka request handler threads saturated when moving a 
partition
 Key: KAFKA-14071
 URL: https://issues.apache.org/jira/browse/KAFKA-14071
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Qinghui Xu


*Our scenarios:*

Many (a few hundreds) readers consuming from the same topic (and the same 
partitions) as they don't belong to the same consumer group.

Many (hundreds) producers are also producing data into the same topic, with a 
throughput around 100KB/s.

*The procedure to reproduce it:*

Move a partition leader replica to a new broker which was not the follower 
(meaning it does not have data for that partition)

 

*Observation:*

All Kafka request handler threads are overloaded. After an analysis of the 
threaddump, it seems most of them are trying to read the same log segment file 
which requires locking a monitor on a specific object in the 
`sun.nio.ch.FileChannelImpl`.

 

*Other remarks:*

Problem is not reproduced when it's a simple leadership transition between the 
replicas. For example, we try to shut down the leader broker, or move leader to 
another follower using kafka assignment script, it's working fine.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-9982) [kafka-connect] Source connector does not guarantee at least once delivery

2020-05-30 Thread Qinghui Xu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17120173#comment-17120173
 ] 

Qinghui Xu commented on KAFKA-9982:
---

Thanks [~ChrisEgerton], very nice pointers, now it starts to make sense for me.
But I still have doubts on some scenarios, I'll try to make some tests to 
check. Will keep you posted about it.

> [kafka-connect] Source connector does not guarantee at least once delivery
> --
>
> Key: KAFKA-9982
> URL: https://issues.apache.org/jira/browse/KAFKA-9982
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Qinghui Xu
>Priority: Major
>
> In kafka-connect runtime, the WorkerSourceTask is responsible for sending 
> records to the destination topics and managing the source offset commit. 
> Committed offsets are then used later for recovery of tasks during rebalance 
> or restart.
> But there are two concerns when looking into the WorkerSourceTask 
> implementation:
>  * When producer fail to send records, there's no retry but just skipping 
> offset commit and then execute next loop (poll for new records)
>  * The offset commit and effectively sending records over network are in fact 
> asynchronous, which means the offset commit could happen before records are 
> received by brokers, and a rebalance/restart in this gap could lead to 
> message loss.
> The conclusion is thus that the source connector does not support at least 
> once semantics by default (without the plugin implementation making extra 
> effort itself). I consider this as a bug.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9982) [kafka-connect] Source connector does not guarantee at least once delivery

2020-05-28 Thread Qinghui Xu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17118591#comment-17118591
 ] 

Qinghui Xu commented on KAFKA-9982:
---

For now as there is no metrics provided in the framework, it's difficult to 
track message lost (or not).

> [kafka-connect] Source connector does not guarantee at least once delivery
> --
>
> Key: KAFKA-9982
> URL: https://issues.apache.org/jira/browse/KAFKA-9982
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Qinghui Xu
>Priority: Major
>
> In kafka-connect runtime, the WorkerSourceTask is responsible for sending 
> records to the destination topics and managing the source offset commit. 
> Committed offsets are then used later for recovery of tasks during rebalance 
> or restart.
> But there are two concerns when looking into the WorkerSourceTask 
> implementation:
>  * When producer fail to send records, there's no retry but just skipping 
> offset commit and then execute next loop (poll for new records)
>  * The offset commit and effectively sending records over network are in fact 
> asynchronous, which means the offset commit could happen before records are 
> received by brokers, and a rebalance/restart in this gap could lead to 
> message loss.
> The conclusion is thus that the source connector does not support at least 
> once semantics by default (without the plugin implementation making extra 
> effort itself). I consider this as a bug.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9982) [kafka-connect] Source connector does not guarantee at least once delivery

2020-05-28 Thread Qinghui Xu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17118586#comment-17118586
 ] 

Qinghui Xu commented on KAFKA-9982:
---

Hi, [~ChrisEgerton] Sorry for the delay.

> As of the fix for https://issues.apache.org/jira/browse/KAFKA-8586, the 
>framework will cease processing records from a source task if it fails to send 
>a record to Kafka.
This indeed addresses my concern about failure retries.

> The framework does use an entirely different producer to write source offsets 
>to Kafka, but no offsets are written to Kafka unless the record they 
>correspond to has been ack'd by the broker and safely made it to Kafka.

I don't see how this is guaranteed, though. As offset commit and producer 
sending records are asynchronous, the two can happen in any order, and if the 
task is lost/restarted in the middle, there's a chance that offset is committed 
while records are not yet sent.

> [kafka-connect] Source connector does not guarantee at least once delivery
> --
>
> Key: KAFKA-9982
> URL: https://issues.apache.org/jira/browse/KAFKA-9982
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Qinghui Xu
>Priority: Major
>
> In kafka-connect runtime, the WorkerSourceTask is responsible for sending 
> records to the destination topics and managing the source offset commit. 
> Committed offsets are then used later for recovery of tasks during rebalance 
> or restart.
> But there are two concerns when looking into the WorkerSourceTask 
> implementation:
>  * When producer fail to send records, there's no retry but just skipping 
> offset commit and then execute next loop (poll for new records)
>  * The offset commit and effectively sending records over network are in fact 
> asynchronous, which means the offset commit could happen before records are 
> received by brokers, and a rebalance/restart in this gap could lead to 
> message loss.
> The conclusion is thus that the source connector does not support at least 
> once semantics by default (without the plugin implementation making extra 
> effort itself). I consider this as a bug.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9982) [kafka-connect] Source connector does not guarantee at least once delivery

2020-05-12 Thread Qinghui Xu (Jira)
Qinghui Xu created KAFKA-9982:
-

 Summary: [kafka-connect] Source connector does not guarantee at 
least once delivery
 Key: KAFKA-9982
 URL: https://issues.apache.org/jira/browse/KAFKA-9982
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.5.0
Reporter: Qinghui Xu


In kafka-connect runtime, the WorkerSourceTask is responsible for sending 
records to the destination topics and managing the source offset commit. 
Committed offsets are then used later for recovery of tasks during rebalance or 
restart.

But there are two concerns when looking into the WorkerSourceTask 
implementation:
 * When producer fail to send records, there's no retry but just skipping 
offset commit and then execute next loop (poll for new records)
 * The offset commit and effectively sending records over network are in fact 
asynchronous, which means the offset commit could happen before records are 
received by brokers, and a rebalance/restart in this gap could lead to message 
loss.

The conclusion is thus that the source connector does not support at least once 
semantics by default (without the plugin implementation making extra effort 
itself). I consider this as a bug.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9278) Avoid using static instance of DisconnectException

2019-12-06 Thread Qinghui Xu (Jira)
Qinghui Xu created KAFKA-9278:
-

 Summary: Avoid using static instance of DisconnectException
 Key: KAFKA-9278
 URL: https://issues.apache.org/jira/browse/KAFKA-9278
 Project: Kafka
  Issue Type: Bug
Reporter: Qinghui Xu


For every `java.lang.Throwable` instance, there is a stacktrace provided during 
its instantiation. Thus a static instance of DisconnectException contains a 
stacktrace of the thread creating it, which is irrelevant (and misleading) for 
all the calls (in other threads) that reuse it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-8889) Root cause is lost for FetchSessionHandler.handleError

2019-09-09 Thread Qinghui Xu (Jira)
Qinghui Xu created KAFKA-8889:
-

 Summary: Root cause is lost for FetchSessionHandler.handleError
 Key: KAFKA-8889
 URL: https://issues.apache.org/jira/browse/KAFKA-8889
 Project: Kafka
  Issue Type: Bug
Reporter: Qinghui Xu


FetchSessionHandler is not logging properly the errors, and produces error 
messages sometimes hard to understand the root cause.

More precisely, FetchSessionHandler.handleError prints Throwable.toString, 
which is merely a summary message, and loses the stacktrace of the error. For 
example, we can get some messages, but cannot get a clue about why:
{code:java}
Error sending fetch request (sessionId=1655236778, epoch=99) to node 01: 
org.apache.kafka.common.errors.DisconnectException {code}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (KAFKA-8790) [kafka-connect] KafkaBaseLog.WorkThread not recoverable

2019-08-13 Thread Qinghui Xu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qinghui Xu updated KAFKA-8790:
--
Description: 
We have a kafka (source) connector that's copying data from some kafka cluster 
to the target cluster. The connector is deployed to a bunch of workers running 
on mesos, thus the lifecycle of the workers are managed by mesos. Workers 
should be recovered by mesos in case of failure, and then source tasks will 
rely on kafka connect's KafkaOffsetBackingStore to recover the offsets to 
proceed.

Recently we witness some unrecoverable situation, though: worker is not doing 
anything after some network reset on the host where the worker is running. More 
specifically, it seems that the kafka connect tasks' on that worker stop to 
poll source kafka cluster, because the consumers are stuck in a rebalance state.

After some digging, we found that the thread to handle the source task offset 
recovery is dead, which makes the all rebalancing tasks stuck in the state of 
reading back the offset. The log we saw in our connect task:
{code:java}
2019-08-12 14:29:28,089 ERROR Unexpected exception in Thread[KafkaBasedLog Work 
Thread - kc_replicator_offsets,5,main] 
(org.apache.kafka.connect.util.KafkaBasedLog)
org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by times 
in 30001ms{code}
As far as I can see 
([https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L339]),
 the thread will be dead in case of error, while the worker is still alive, 
which means a worker without the thread to recover offset thus all tasks on 
that worker are not recoverable and will stuck in case of failure.

 

Solution to fix this issue will ideally the following:
 * Make the KafkaBasedLog Work Thread recover from some recoverable errors such 
as `TimeoutException` or more generally `RetriableException` (maybe 
`KafkaException` is too generic to handle, and some of them might be 
unrecoverable)
 * If several consecutive retries in the first case fail (we might consider it 
unrecoverable), we might want the worker to be shut down so that tasks can be 
rebalanced elsewhere.
 * In case of an unrecoverable error (such as a generic `Throwable`), 
"KafkaBasedLog Work Thread" would die, but it need to trigger the worker's 
shutdown (eg. a finally clause to call System.exit) in order to avoid the 
worker's tasks idling after failure. Then the worker lifecycle management (in 
our case, it's mesos) will restart the worker elsewhere

 

  was:
We have a kafka (source) connector that's copying data from some kafka cluster 
to the target cluster. The connector is deployed to a bunch of workers running 
on mesos, thus the lifecycle of the workers are managed by mesos. Workers 
should be recovered by mesos in case of failure, and then source tasks will 
rely on kafka connect's KafkaOffsetBackingStore to recover the offsets to 
proceed.

Recently we witness some unrecoverable situation, though: worker is not doing 
anything after some network reset on the host where the worker is running. More 
specifically, it seems that the kafka connect tasks' on that worker stop to 
poll source kafka cluster, because the consumers are stuck in a rebalance state.

After some digging, we found that the thread to handle the source task offset 
recovery is dead, which makes the all rebalancing tasks stuck in the state of 
reading back the offset. The log we saw in our connect task:
{code:java}
2019-08-12 14:29:28,089 ERROR Unexpected exception in Thread[KafkaBasedLog Work 
Thread - kc_replicator_offsets,5,main] 
(org.apache.kafka.connect.util.KafkaBasedLog)
org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by times 
in 30001ms{code}
As far as I can see 
([https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L339]),
 the thread will be dead in case of error, while the worker is still alive, 
which means a worker without the thread to recover offset thus all tasks on 
that worker are not recoverable and will stuck in case of failure.

 

Solution to fix this issue will ideally either of the following:
 * Make the KafkaBasedLog Work Thread recoverable from error
 * Or KafkaBasedLog Work Thread death should make the worker exit (a finally 
clause to call System.exit), then the worker lifecycle management (in our case, 
it's mesos) will restart the worker elsewhere

 


> [kafka-connect] KafkaBaseLog.WorkThread not recoverable
> ---
>
> Key: KAFKA-8790
> URL: https://issues.apache.org/jira/browse/KAFKA-8790
> Project: Kafka
>  Issue Type: Bug
>Reporter: Qinghui Xu
>Priority: Major
>
> We have a kafka (source) connector that's copying data from some kafka 
> cluster to 

[jira] [Commented] (KAFKA-8790) [kafka-connect] KafkaBaseLog.WorkThread not recoverable

2019-08-12 Thread Qinghui Xu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16905506#comment-16905506
 ] 

Qinghui Xu commented on KAFKA-8790:
---

[~pgwhalen]
Thanks for the hint, I'll have a look at your PR.

> [kafka-connect] KafkaBaseLog.WorkThread not recoverable
> ---
>
> Key: KAFKA-8790
> URL: https://issues.apache.org/jira/browse/KAFKA-8790
> Project: Kafka
>  Issue Type: Bug
>Reporter: Qinghui Xu
>Priority: Major
>
> We have a kafka (source) connector that's copying data from some kafka 
> cluster to the target cluster. The connector is deployed to a bunch of 
> workers running on mesos, thus the lifecycle of the workers are managed by 
> mesos. Workers should be recovered by mesos in case of failure, and then 
> source tasks will rely on kafka connect's KafkaOffsetBackingStore to recover 
> the offsets to proceed.
> Recently we witness some unrecoverable situation, though: worker is not doing 
> anything after some network reset on the host where the worker is running. 
> More specifically, it seems that the kafka connect tasks' on that worker stop 
> to poll source kafka cluster, because the consumers are stuck in a rebalance 
> state.
> After some digging, we found that the thread to handle the source task offset 
> recovery is dead, which makes the all rebalancing tasks stuck in the state of 
> reading back the offset. The log we saw in our connect task:
> {code:java}
> 2019-08-12 14:29:28,089 ERROR Unexpected exception in Thread[KafkaBasedLog 
> Work Thread - kc_replicator_offsets,5,main] 
> (org.apache.kafka.connect.util.KafkaBasedLog)
> org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by 
> times in 30001ms{code}
> As far as I can see 
> ([https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L339]),
>  the thread will be dead in case of error, while the worker is still alive, 
> which means a worker without the thread to recover offset thus all tasks on 
> that worker are not recoverable and will stuck in case of failure.
>  
> Solution to fix this issue will ideally either of the following:
>  * Make the KafkaBasedLog Work Thread recoverable from error
>  * Or KafkaBasedLog Work Thread death should make the worker exit (a finally 
> clause to call System.exit), then the worker lifecycle management (in our 
> case, it's mesos) will restart the worker elsewhere
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8790) [kafka-connect] KafkaBaseLog.WorkThread not recoverable

2019-08-12 Thread Qinghui Xu (JIRA)
Qinghui Xu created KAFKA-8790:
-

 Summary: [kafka-connect] KafkaBaseLog.WorkThread not recoverable
 Key: KAFKA-8790
 URL: https://issues.apache.org/jira/browse/KAFKA-8790
 Project: Kafka
  Issue Type: Bug
Reporter: Qinghui Xu


We have a kafka (source) connector that's copying data from some kafka cluster 
to the target cluster. The connector is deployed to a bunch of workers running 
on mesos, thus the lifecycle of the workers are managed by mesos. Workers 
should be recovered by mesos in case of failure, and then source tasks will 
rely on kafka connect's KafkaOffsetBackingStore to recover the offsets to 
proceed.

Recently we witness some unrecoverable situation, though: worker is not doing 
anything after some network reset on the host where the worker is running. More 
specifically, it seems that the kafka connect tasks' on that worker stop to 
poll source kafka cluster, because the consumers are stuck in a rebalance state.

After some digging, we found that the thread to handle the source task offset 
recovery is dead, which makes the all rebalancing tasks stuck in the state of 
reading back the offset. The log we saw in our connect task:
{code:java}
2019-08-12 14:29:28,089 ERROR Unexpected exception in Thread[KafkaBasedLog Work 
Thread - kc_replicator_offsets,5,main] 
(org.apache.kafka.connect.util.KafkaBasedLog)
org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by times 
in 30001ms{code}
As far as I can see 
([https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L339]),
 the thread will be dead in case of error, while the worker is still alive, 
which means a worker without the thread to recover offset thus all tasks on 
that worker are not recoverable and will stuck in case of failure.

 

Solution to fix this issue will ideally either of the following:
 * Make the KafkaBasedLog Work Thread recoverable from error
 * Or KafkaBasedLog Work Thread death should make the worker exit (a finally 
clause to call System.exit), then the worker lifecycle management (in our case, 
it's mesos) will restart the worker elsewhere

 



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (KAFKA-8735) BrokerMetadataCheckPoint should check metadata.properties existence itself

2019-07-31 Thread Qinghui Xu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16897153#comment-16897153
 ] 

Qinghui Xu edited comment on KAFKA-8735 at 7/31/19 1:03 PM:


Errors during the tests:
{code:java}
2019-07-30 15:36:31 ERROR BrokerMetadataCheckpoint:74 - Failed to read 
meta.properties file under dir 
/var/folders/h9/msx_bvyj4x1cmcc6wmndg0y416l7n9/T/junit4372466223918887888/junit4263066398108675507/meta.properties
 due to 
/var/folders/h9/msx_bvyj4x1cmcc6wmndg0y416l7n9/T/junit4372466223918887888/junit4263066398108675507/meta.properties
2019-07-30 15:36:31 ERROR KafkaServer:76 - Fail to read meta.properties under 
log directory 
/var/folders/h9/msx_bvyj4x1cmcc6wmndg0y416l7n9/T/junit4372466223918887888/junit4263066398108675507
java.nio.file.NoSuchFileException: 
/var/folders/h9/msx_bvyj4x1cmcc6wmndg0y416l7n9/T/junit4372466223918887888/junit4263066398108675507/meta.properties
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at 
sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
at java.nio.file.Files.newByteChannel(Files.java:361)
at java.nio.file.Files.newByteChannel(Files.java:407)
at 
java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
at java.nio.file.Files.newInputStream(Files.java:152)
at org.apache.kafka.common.utils.Utils.loadProps(Utils.java:574)
at 
kafka.server.BrokerMetadataCheckpoint.liftedTree2$1(BrokerMetadataCheckpoint.scala:63)
at kafka.server.BrokerMetadataCheckpoint.read(BrokerMetadataCheckpoint.scala:62)
at 
kafka.server.KafkaServer$$anonfun$getBrokerIdAndOfflineDirs$1.apply(KafkaServer.scala:668)
at 
kafka.server.KafkaServer$$anonfun$getBrokerIdAndOfflineDirs$1.apply(KafkaServer.scala:666)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at kafka.server.KafkaServer.getBrokerIdAndOfflineDirs(KafkaServer.scala:666)
at kafka.server.KafkaServer.startup(KafkaServer.scala:209)
at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)
...
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:46)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 

[jira] [Commented] (KAFKA-8735) BrokerMetadataCheckPoint should check metadata.properties existence itself

2019-07-31 Thread Qinghui Xu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16897153#comment-16897153
 ] 

Qinghui Xu commented on KAFKA-8735:
---

Errors during the tests:
{code:java}
2019-07-30 15:36:31 ERROR BrokerMetadataCheckpoint:74 - Failed to read 
meta.properties file under dir 
/var/folders/h9/msx_bvyj4x1cmcc6wmndg0y416l7n9/T/junit4372466223918887888/junit4263066398108675507/meta.properties
 due to 
/var/folders/h9/msx_bvyj4x1cmcc6wmndg0y416l7n9/T/junit4372466223918887888/junit4263066398108675507/meta.properties
2019-07-30 15:36:31 ERROR KafkaServer:76 - Fail to read meta.properties under 
log directory 
/var/folders/h9/msx_bvyj4x1cmcc6wmndg0y416l7n9/T/junit4372466223918887888/junit4263066398108675507
java.nio.file.NoSuchFileException: 
/var/folders/h9/msx_bvyj4x1cmcc6wmndg0y416l7n9/T/junit4372466223918887888/junit4263066398108675507/meta.properties
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at 
sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
at java.nio.file.Files.newByteChannel(Files.java:361)
at java.nio.file.Files.newByteChannel(Files.java:407)
at 
java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
at java.nio.file.Files.newInputStream(Files.java:152)
at org.apache.kafka.common.utils.Utils.loadProps(Utils.java:574)
at 
kafka.server.BrokerMetadataCheckpoint.liftedTree2$1(BrokerMetadataCheckpoint.scala:63)
at kafka.server.BrokerMetadataCheckpoint.read(BrokerMetadataCheckpoint.scala:62)
at 
kafka.server.KafkaServer$$anonfun$getBrokerIdAndOfflineDirs$1.apply(KafkaServer.scala:668)
at 
kafka.server.KafkaServer$$anonfun$getBrokerIdAndOfflineDirs$1.apply(KafkaServer.scala:666)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at kafka.server.KafkaServer.getBrokerIdAndOfflineDirs(KafkaServer.scala:666)
at kafka.server.KafkaServer.startup(KafkaServer.scala:209)
at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)
...
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:46)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 

[jira] [Created] (KAFKA-8735) BrokerMetadataCheckPoint should check metadata.properties existence itself

2019-07-30 Thread Qinghui Xu (JIRA)
Qinghui Xu created KAFKA-8735:
-

 Summary: BrokerMetadataCheckPoint should check metadata.properties 
existence itself 
 Key: KAFKA-8735
 URL: https://issues.apache.org/jira/browse/KAFKA-8735
 Project: Kafka
  Issue Type: Improvement
Reporter: Qinghui Xu


BrokerMetadataCheckPoint tries to read metadata.properties from log directory 
during server start up. And it relies on org.apache.kafka.common.util.Utils 
(from org.apache.kafka:kafka-clients) to load the properties file in a given 
directory.

During the process, we need to handle the case in which the properties file 
does not exist (not as an error). Currently, BrokerMetadataCheckPoint relies on 
the behavior of  `org.apache.kafka.common.util.Utils#loadProps` to find out if 
the file exists or not: if the properties file is absent, it is expecting 
NoSuchFileException (for branch 2.1 and above), and it was expecting 
FileNotFoundException (for branch 2.0 and before). Knowing that 
`org.apache.kafka.common.util.Utils#loadProps` signature throws only 
IOException, this exception pattern matching is thus sort of leak of 
abstraction making BrokerMetadataCheckPoint relies on the implementation 
details of `org.apache.kafka.common.util.Utils#loadProps`. 

This makes BrokerMetadataCheckPoint very fragile, especially when 
`org.apache.kafka.common.util.Utils` and 
`kafka.server.BrokerMetadataCheckPoint` are from different artifacts, an 
example that I just ran into:
 * We have a project A that depends on project B, and project B has a compile 
time dependency on `org.apache.kafka:kafka-clients`. A is relying 
`org.apach.kafka:kafka_2.11` in its tests: it will spawn some kafka brokers in 
the tests.
 * At first A and B are both using kafka libraries 2.0.1, and everything is 
working fine
 * At some point a newer version of B upgrades `org.apache.kafka:kafka-clients` 
to 2.3.0
 * When A wants to use the newer version of B, its tests are broken because 
kafka brokers fail to start: now `org.apache.kafka.common.util.Utils` (2.3.0) 
throws NoSucheFileException while BrokerMetadataCheckPoint (2.0.1) expects to 
catch FileNotFoundException

It would be much more reliable for BrokerMetadataCheckPoint to check the file 
existence before trying to load the properties from the file.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)