[jira] [Updated] (KAFKA-7441) Allow LogCleanerManager.resumeCleaning() to be used concurrently

2018-09-25 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7441:

Fix Version/s: 2.1.0

> Allow LogCleanerManager.resumeCleaning() to be used concurrently
> 
>
> Key: KAFKA-7441
> URL: https://issues.apache.org/jira/browse/KAFKA-7441
> Project: Kafka
>  Issue Type: Improvement
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
> Fix For: 2.1.0
>
>
> LogCleanerManger provides APIs abortAndPauseCleaning(TopicPartition) and 
> resumeCleaning(Iterable[TopicPartition]). The abortAndPauseCleaning(...) will 
> do nothing if the partition is already in paused state. And 
> resumeCleaning(..) will always clear the state for the partition if the 
> partition is in paused state. Also, resumeCleaning(...) will throw 
> IllegalStateException if the partition does not have any state (e.g. its 
> state is cleared).
>  
> This will cause problem in the following scenario:
> 1) Background thread invokes LogManager.cleanupLogs() which in turn does  
> abortAndPauseCleaning(...) for a given partition. Now this partition is in 
> paused state.
> 2) User requests deletion for this partition. Controller sends 
> StopReplicaRequest with delete=true for this partition. RequestHanderThread 
> calls abortAndPauseCleaning(...) followed by resumeCleaning(...) for the same 
> partition. Now there is no state for this partition.
> 3) Background thread invokes resumeCleaning(...) as part of 
> LogManager.cleanupLogs(). Because there is no state for this partition, it 
> causes IllegalStateException.
>  
> This issue can also happen before KAFKA-7322 if unclean leader election 
> triggers log truncation for a partition at the same time that the partition 
> is deleted upon user request. But unclean leader election is very rare. The 
> fix made in https://issues.apache.org/jira/browse/KAFKA-7322 makes this issue 
> much more frequent.
> The solution is to record the number of pauses.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7441) Allow LogCleanerManager.resumeCleaning() to be used concurrently

2018-09-25 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7441:

Reporter: xiongqi wu  (was: Dong Lin)

> Allow LogCleanerManager.resumeCleaning() to be used concurrently
> 
>
> Key: KAFKA-7441
> URL: https://issues.apache.org/jira/browse/KAFKA-7441
> Project: Kafka
>  Issue Type: Improvement
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
> Fix For: 2.1.0
>
>
> LogCleanerManger provides APIs abortAndPauseCleaning(TopicPartition) and 
> resumeCleaning(Iterable[TopicPartition]). The abortAndPauseCleaning(...) will 
> do nothing if the partition is already in paused state. And 
> resumeCleaning(..) will always clear the state for the partition if the 
> partition is in paused state. Also, resumeCleaning(...) will throw 
> IllegalStateException if the partition does not have any state (e.g. its 
> state is cleared).
>  
> This will cause problem in the following scenario:
> 1) Background thread invokes LogManager.cleanupLogs() which in turn does  
> abortAndPauseCleaning(...) for a given partition. Now this partition is in 
> paused state.
> 2) User requests deletion for this partition. Controller sends 
> StopReplicaRequest with delete=true for this partition. RequestHanderThread 
> calls abortAndPauseCleaning(...) followed by resumeCleaning(...) for the same 
> partition. Now there is no state for this partition.
> 3) Background thread invokes resumeCleaning(...) as part of 
> LogManager.cleanupLogs(). Because there is no state for this partition, it 
> causes IllegalStateException.
>  
> This issue can also happen before KAFKA-7322 if unclean leader election 
> triggers log truncation for a partition at the same time that the partition 
> is deleted upon user request. But unclean leader election is very rare. The 
> fix made in https://issues.apache.org/jira/browse/KAFKA-7322 makes this issue 
> much more frequent.
> The solution is to record the number of pauses.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7441) Allow LogCleanerManager.resumeCleaning() to be used concurrently

2018-09-25 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-7441:
---

 Summary: Allow LogCleanerManager.resumeCleaning() to be used 
concurrently
 Key: KAFKA-7441
 URL: https://issues.apache.org/jira/browse/KAFKA-7441
 Project: Kafka
  Issue Type: Improvement
Reporter: Dong Lin
Assignee: xiongqi wu


LogCleanerManger provides APIs abortAndPauseCleaning(TopicPartition) and 
resumeCleaning(Iterable[TopicPartition]). The abortAndPauseCleaning(...) will 
do nothing if the partition is already in paused state. And resumeCleaning(..) 
will always clear the state for the partition if the partition is in paused 
state. Also, resumeCleaning(...) will throw IllegalStateException if the 
partition does not have any state (e.g. its state is cleared).

 

This will cause problem in the following scenario:

1) Background thread invokes LogManager.cleanupLogs() which in turn does  
abortAndPauseCleaning(...) for a given partition. Now this partition is in 
paused state.

2) User requests deletion for this partition. Controller sends 
StopReplicaRequest with delete=true for this partition. RequestHanderThread 
calls abortAndPauseCleaning(...) followed by resumeCleaning(...) for the same 
partition. Now there is no state for this partition.

3) Background thread invokes resumeCleaning(...) as part of 
LogManager.cleanupLogs(). Because there is no state for this partition, it 
causes IllegalStateException.

 

This issue can also happen before KAFKA-7322 if unclean leader election 
triggers log truncation for a partition at the same time that the partition is 
deleted upon user request. But unclean leader election is very rare. The fix 
made in https://issues.apache.org/jira/browse/KAFKA-7322 makes this issue much 
more frequent.

The solution is to record the number of pauses.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3190) KafkaProducer should not invoke callback in send()

2018-09-25 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-3190:

Fix Version/s: (was: 2.1.0)
   2.2.0

> KafkaProducer should not invoke callback in send()
> --
>
> Key: KAFKA-3190
> URL: https://issues.apache.org/jira/browse/KAFKA-3190
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.9.0.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Critical
> Fix For: 2.2.0
>
>
> Currently KafkaProducer will invoke callback.onComplete() if it receives an 
> ApiException during send(). This breaks the guarantee that callback will be 
> invoked in order. It seems ApiException in send() only comes from metadata 
> refresh. If so, we can probably simply throw it instead of invoking 
> callback().



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3190) KafkaProducer should not invoke callback in send()

2018-09-25 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16627813#comment-16627813
 ] 

Dong Lin commented on KAFKA-3190:
-

Since there has been no activity on the PR for several months, moving this out 
to 2.2.0

> KafkaProducer should not invoke callback in send()
> --
>
> Key: KAFKA-3190
> URL: https://issues.apache.org/jira/browse/KAFKA-3190
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.9.0.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Critical
> Fix For: 2.2.0
>
>
> Currently KafkaProducer will invoke callback.onComplete() if it receives an 
> ApiException during send(). This breaks the guarantee that callback will be 
> invoked in order. It seems ApiException in send() only comes from metadata 
> refresh. If so, we can probably simply throw it instead of invoking 
> callback().



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5066) KafkaMetricsConfig properties and description notably missing from documentation

2018-09-24 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-5066:

Fix Version/s: 2.1.0

> KafkaMetricsConfig properties and description notably missing from 
> documentation
> 
>
> Key: KAFKA-5066
> URL: https://issues.apache.org/jira/browse/KAFKA-5066
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Reporter: Ryan P
>Priority: Major
> Fix For: 2.1.0
>
>
> `KafkaMetrics` implementations do not appear to be exposed to all the Yammer 
> metrics exposed to implementations of the `KafkaMetricsReporter` 
> Currently the docs only cover the `metric.reporters` which allows clients to 
> configure a `MetricsReporter` plugin. Clients are then disappointed to learn 
> that this affords them access to only a small subset of metrics. 
> Proper monitoring of the broker requires access to the Yammer metrics which 
> clients can gain access to with a `KafkaMetricsReporter` plugin. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7332) Improve error message when trying to produce message without key for compacted topic

2018-09-19 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16621050#comment-16621050
 ] 

Dong Lin commented on KAFKA-7332:
-

[~wushujames] The problem described in KAFKA-4808 says that user today has to 
wait for timeout if they produce to a compacted topic without key and it should 
be better to throw non-retriable exception immediately without timeout. From 
design perspective, I think the idea make sense and it is good to address that.

In terms of its impact in practice, I personally don't find that issue to be 
important because in all cases I can think of (please correct me if I am 
wrong), the issue only happens the first time when user setups a new workflow 
and tries to produce to a compacted topic with null key. It is probably OK for 
user to spend a minute or so to receive this first error given that it probably 
takes much longer time to create and debug a new workflow anyway. Thus it may 
not be worthwhile to add a new exception to address that problem.

So it is nice to address KAFKA-4808 if other developers are interested in 
pushing progress for KAFKA-4808.

 

 

> Improve error message when trying to produce message without key for 
> compacted topic
> 
>
> Key: KAFKA-7332
> URL: https://issues.apache.org/jira/browse/KAFKA-7332
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 1.1.0
>Reporter: Patrik Kleindl
>Assignee: Manikumar
>Priority: Trivial
> Fix For: 2.1.0
>
>
> Goal:
> Return a specific error message like e.g. "Message without a key is not valid 
> for a compacted topic" when trying to produce such a message instead of a 
> CorruptRecordException.
>  
> > Yesterday we had the following exception:
> > 
> > Exception thrown when sending a message with key='null' and payload='...'
> > to topic sometopic:: org.apache.kafka.common.errors.CorruptRecordException:
> > This message has failed its CRC checksum, exceeds the valid size, or is
> > otherwise corrupt.
> > 
> > The cause was identified with the help of
> > 
> >[https://stackoverflow.com/questions/49098274/kafka-stream-get-corruptrecordexception]
> > 
> > Is it possible / would it makes sense to open an issue to improve the error
> > message for this case?
> > A simple "Message without a key is not valid for a compacted topic" would
> > suffice and point a user  in the right direction.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7332) Improve error message when trying to produce message without key for compacted topic

2018-09-18 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin reassigned KAFKA-7332:
---

Assignee: Manikumar

> Improve error message when trying to produce message without key for 
> compacted topic
> 
>
> Key: KAFKA-7332
> URL: https://issues.apache.org/jira/browse/KAFKA-7332
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 1.1.0
>Reporter: Patrik Kleindl
>Assignee: Manikumar
>Priority: Trivial
> Fix For: 2.1.0
>
>
> Goal:
> Return a specific error message like e.g. "Message without a key is not valid 
> for a compacted topic" when trying to produce such a message instead of a 
> CorruptRecordException.
>  
> > Yesterday we had the following exception:
> > 
> > Exception thrown when sending a message with key='null' and payload='...'
> > to topic sometopic:: org.apache.kafka.common.errors.CorruptRecordException:
> > This message has failed its CRC checksum, exceeds the valid size, or is
> > otherwise corrupt.
> > 
> > The cause was identified with the help of
> > 
> >[https://stackoverflow.com/questions/49098274/kafka-stream-get-corruptrecordexception]
> > 
> > Is it possible / would it makes sense to open an issue to improve the error
> > message for this case?
> > A simple "Message without a key is not valid for a compacted topic" would
> > suffice and point a user  in the right direction.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7332) Improve error message when trying to produce message without key for compacted topic

2018-09-18 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-7332.
-
Resolution: Fixed

> Improve error message when trying to produce message without key for 
> compacted topic
> 
>
> Key: KAFKA-7332
> URL: https://issues.apache.org/jira/browse/KAFKA-7332
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 1.1.0
>Reporter: Patrik Kleindl
>Priority: Trivial
> Fix For: 2.1.0
>
>
> Goal:
> Return a specific error message like e.g. "Message without a key is not valid 
> for a compacted topic" when trying to produce such a message instead of a 
> CorruptRecordException.
>  
> > Yesterday we had the following exception:
> > 
> > Exception thrown when sending a message with key='null' and payload='...'
> > to topic sometopic:: org.apache.kafka.common.errors.CorruptRecordException:
> > This message has failed its CRC checksum, exceeds the valid size, or is
> > otherwise corrupt.
> > 
> > The cause was identified with the help of
> > 
> >[https://stackoverflow.com/questions/49098274/kafka-stream-get-corruptrecordexception]
> > 
> > Is it possible / would it makes sense to open an issue to improve the error
> > message for this case?
> > A simple "Message without a key is not valid for a compacted topic" would
> > suffice and point a user  in the right direction.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7332) Improve error message when trying to produce message without key for compacted topic

2018-09-18 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7332:

Fix Version/s: 2.1.0

> Improve error message when trying to produce message without key for 
> compacted topic
> 
>
> Key: KAFKA-7332
> URL: https://issues.apache.org/jira/browse/KAFKA-7332
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 1.1.0
>Reporter: Patrik Kleindl
>Priority: Trivial
> Fix For: 2.1.0
>
>
> Goal:
> Return a specific error message like e.g. "Message without a key is not valid 
> for a compacted topic" when trying to produce such a message instead of a 
> CorruptRecordException.
>  
> > Yesterday we had the following exception:
> > 
> > Exception thrown when sending a message with key='null' and payload='...'
> > to topic sometopic:: org.apache.kafka.common.errors.CorruptRecordException:
> > This message has failed its CRC checksum, exceeds the valid size, or is
> > otherwise corrupt.
> > 
> > The cause was identified with the help of
> > 
> >[https://stackoverflow.com/questions/49098274/kafka-stream-get-corruptrecordexception]
> > 
> > Is it possible / would it makes sense to open an issue to improve the error
> > message for this case?
> > A simple "Message without a key is not valid for a compacted topic" would
> > suffice and point a user  in the right direction.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7322) Fix race condition between log cleaner thread and log retention thread when topic cleanup policy is updated

2018-09-18 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7322:

Fix Version/s: 2.1.0

> Fix race condition between log cleaner thread and log retention thread when 
> topic cleanup policy is updated
> ---
>
> Key: KAFKA-7322
> URL: https://issues.apache.org/jira/browse/KAFKA-7322
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
> Fix For: 2.1.0
>
>
> The deletion thread will grab the log.lock when it tries to rename log 
> segment and schedule for actual deletion.
> The compaction thread only grabs the log.lock when it tries to replace the 
> original segments with the cleaned segment. The compaction thread doesn't 
> grab the log when it reads records from the original segments to build 
> offsetmap and new segments. As a result, if both deletion and compaction 
> threads work on the same log partition. We have a race condition. 
> This race happens when the topic cleanup policy is updated on the fly.  
> One case to hit this race condition:
> 1: topic clean up policy is "compact" initially 
> 2: log cleaner (compaction) thread picks up the partition for compaction and 
> still in progress
> 3: the topic clean up policy has been updated to "deletion"
> 4: retention thread pick up the topic partition and delete some old segments.
> 5: log cleaner thread reads from the deleted log and raise an IO exception. 
>  
> The proposed solution is to use "inprogress" map that cleaner manager has to 
> protect such a race.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7322) Fix race condition between log cleaner thread and log retention thread when topic cleanup policy is updated

2018-09-18 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-7322.
-
Resolution: Fixed

> Fix race condition between log cleaner thread and log retention thread when 
> topic cleanup policy is updated
> ---
>
> Key: KAFKA-7322
> URL: https://issues.apache.org/jira/browse/KAFKA-7322
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
> Fix For: 2.1.0
>
>
> The deletion thread will grab the log.lock when it tries to rename log 
> segment and schedule for actual deletion.
> The compaction thread only grabs the log.lock when it tries to replace the 
> original segments with the cleaned segment. The compaction thread doesn't 
> grab the log when it reads records from the original segments to build 
> offsetmap and new segments. As a result, if both deletion and compaction 
> threads work on the same log partition. We have a race condition. 
> This race happens when the topic cleanup policy is updated on the fly.  
> One case to hit this race condition:
> 1: topic clean up policy is "compact" initially 
> 2: log cleaner (compaction) thread picks up the partition for compaction and 
> still in progress
> 3: the topic clean up policy has been updated to "deletion"
> 4: retention thread pick up the topic partition and delete some old segments.
> 5: log cleaner thread reads from the deleted log and raise an IO exception. 
>  
> The proposed solution is to use "inprogress" map that cleaner manager has to 
> protect such a race.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7322) Fix race condition between compaction thread and retention thread when topic cleanup policy is updated

2018-09-18 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7322:

Summary: Fix race condition between compaction thread and retention thread 
when topic cleanup policy is updated  (was: race between compaction thread and 
retention thread when changing topic cleanup policy)

> Fix race condition between compaction thread and retention thread when topic 
> cleanup policy is updated
> --
>
> Key: KAFKA-7322
> URL: https://issues.apache.org/jira/browse/KAFKA-7322
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
>
> The deletion thread will grab the log.lock when it tries to rename log 
> segment and schedule for actual deletion.
> The compaction thread only grabs the log.lock when it tries to replace the 
> original segments with the cleaned segment. The compaction thread doesn't 
> grab the log when it reads records from the original segments to build 
> offsetmap and new segments. As a result, if both deletion and compaction 
> threads work on the same log partition. We have a race condition. 
> This race happens when the topic cleanup policy is updated on the fly.  
> One case to hit this race condition:
> 1: topic clean up policy is "compact" initially 
> 2: log cleaner (compaction) thread picks up the partition for compaction and 
> still in progress
> 3: the topic clean up policy has been updated to "deletion"
> 4: retention thread pick up the topic partition and delete some old segments.
> 5: log cleaner thread reads from the deleted log and raise an IO exception. 
>  
> The proposed solution is to use "inprogress" map that cleaner manager has to 
> protect such a race.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7322) Fix race condition between log cleaner thread and log retention thread when topic cleanup policy is updated

2018-09-18 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7322:

Summary: Fix race condition between log cleaner thread and log retention 
thread when topic cleanup policy is updated  (was: Fix race condition between 
compaction thread and retention thread when topic cleanup policy is updated)

> Fix race condition between log cleaner thread and log retention thread when 
> topic cleanup policy is updated
> ---
>
> Key: KAFKA-7322
> URL: https://issues.apache.org/jira/browse/KAFKA-7322
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
>
> The deletion thread will grab the log.lock when it tries to rename log 
> segment and schedule for actual deletion.
> The compaction thread only grabs the log.lock when it tries to replace the 
> original segments with the cleaned segment. The compaction thread doesn't 
> grab the log when it reads records from the original segments to build 
> offsetmap and new segments. As a result, if both deletion and compaction 
> threads work on the same log partition. We have a race condition. 
> This race happens when the topic cleanup policy is updated on the fly.  
> One case to hit this race condition:
> 1: topic clean up policy is "compact" initially 
> 2: log cleaner (compaction) thread picks up the partition for compaction and 
> still in progress
> 3: the topic clean up policy has been updated to "deletion"
> 4: retention thread pick up the topic partition and delete some old segments.
> 5: log cleaner thread reads from the deleted log and raise an IO exception. 
>  
> The proposed solution is to use "inprogress" map that cleaner manager has to 
> protect such a race.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-5690) kafka-acls command should be able to list per principal

2018-09-17 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-5690.
-
Resolution: Fixed

> kafka-acls command should be able to list per principal
> ---
>
> Key: KAFKA-5690
> URL: https://issues.apache.org/jira/browse/KAFKA-5690
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Koelli Mungee
>Assignee: Manikumar
>Priority: Major
> Fix For: 2.1.0
>
>
> Currently the `kafka-acls` command has a `--list` option that can list per 
> resource which is --topic  or --group  or --cluster. In order 
> to look at the ACLs for a particular principal the user needs to iterate 
> through the entire list to figure out what privileges a particular principal 
> has been granted. An option to list the ACL per principal would simplify this 
> process.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-5690) kafka-acls command should be able to list per principal

2018-09-16 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin reopened KAFKA-5690:
-

> kafka-acls command should be able to list per principal
> ---
>
> Key: KAFKA-5690
> URL: https://issues.apache.org/jira/browse/KAFKA-5690
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Koelli Mungee
>Assignee: Manikumar
>Priority: Major
> Fix For: 2.1.0
>
>
> Currently the `kafka-acls` command has a `--list` option that can list per 
> resource which is --topic  or --group  or --cluster. In order 
> to look at the ACLs for a particular principal the user needs to iterate 
> through the entire list to figure out what privileges a particular principal 
> has been granted. An option to list the ACL per principal would simplify this 
> process.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5690) kafka-acls command should be able to list per principal

2018-09-16 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-5690:

Fix Version/s: 2.1.0

> kafka-acls command should be able to list per principal
> ---
>
> Key: KAFKA-5690
> URL: https://issues.apache.org/jira/browse/KAFKA-5690
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Koelli Mungee
>Assignee: Manikumar
>Priority: Major
> Fix For: 2.1.0
>
>
> Currently the `kafka-acls` command has a `--list` option that can list per 
> resource which is --topic  or --group  or --cluster. In order 
> to look at the ACLs for a particular principal the user needs to iterate 
> through the entire list to figure out what privileges a particular principal 
> has been granted. An option to list the ACL per principal would simplify this 
> process.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-5690) kafka-acls command should be able to list per principal

2018-09-16 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-5690.
-
Resolution: Fixed

> kafka-acls command should be able to list per principal
> ---
>
> Key: KAFKA-5690
> URL: https://issues.apache.org/jira/browse/KAFKA-5690
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.0, 0.11.0.0
>Reporter: Koelli Mungee
>Assignee: Manikumar
>Priority: Major
> Fix For: 2.1.0
>
>
> Currently the `kafka-acls` command has a `--list` option that can list per 
> resource which is --topic  or --group  or --cluster. In order 
> to look at the ACLs for a particular principal the user needs to iterate 
> through the entire list to figure out what privileges a particular principal 
> has been granted. An option to list the ACL per principal would simplify this 
> process.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7096) Consumer should drop the data for unassigned topic partitions

2018-09-10 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7096:

Fix Version/s: 2.1.0

> Consumer should drop the data for unassigned topic partitions
> -
>
> Key: KAFKA-7096
> URL: https://issues.apache.org/jira/browse/KAFKA-7096
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
>Priority: Major
> Fix For: 2.1.0
>
>
> currently if a client has assigned topics : T1, T2, T3 and calls poll(), the 
> poll might fetch data for partitions for all 3 topics T1, T2, T3. Now if the 
> client unassigns some topics (for example T3) and calls poll() we still hold 
> the data (for T3) in the completedFetches queue until we actually reach the 
> buffered data for the unassigned Topics (T3 in our example) on subsequent 
> poll() calls, at which point we drop that data. This process of holding the 
> data is unnecessary.
> When a client creates a topic, it takes time for the broker to fetch ACLs for 
> the topic. But during this time, the client will issue fetchRequest for the 
> topic, it will get response for the partitions of this topic. The response 
> consist of TopicAuthorizationException for each of the partitions. This 
> response for each partition is wrapped with a completedFetch and added to the 
> completedFetches queue. Now when the client calls the next poll() it sees the 
> TopicAuthorizationException from the first buffered CompletedFetch. At this 
> point the client chooses to sleep for 1.5 min as a backoff (as per the 
> design), hoping that the Broker fetches the ACL from ACL store in the 
> meantime. Actually the Broker has already fetched the ACL by this time. When 
> the client calls poll() after the sleep, it again sees the 
> TopicAuthorizationException from the second completedFetch and it sleeps 
> again. So it takes (1.5 * 60 * partitions) seconds before the client can see 
> any data. With this patch, the client when it sees the first 
> TopicAuthorizationException, it can all assign(EmptySet), which will get rid 
> of the buffered completedFetches (those with TopicAuthorizationException) and 
> it can again call assign(TopicPartitions) before calling poll(). With this 
> patch we found that client was able to get the records as soon as the Broker 
> fetched the ACLs from ACL store.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7383) Verify leader epoch in produce requests (KIP-359)

2018-09-09 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7383:

Fix Version/s: 2.1.0

> Verify leader epoch in produce requests (KIP-359)
> -
>
> Key: KAFKA-7383
> URL: https://issues.apache.org/jira/browse/KAFKA-7383
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.1.0
>
>
> Implementation of 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-359%3A+Verify+leader+epoch+in+produce+requests.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2018-09-09 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-3514:

Fix Version/s: 2.1.0

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: architecture, kip
> Fix For: 2.1.0
>
>
> KIP-353: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-353%3A+Improve+Kafka+Streams+Timestamp+Synchronization]
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code:java}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code:java}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.
> *Update*
> There is one more thing to consider (full discussion found here: 
> [http://search-hadoop.com/m/Kafka/uyzND1iKZJN1yz0E5?subj=Order+of+punctuate+and+process+in+a+stream+processor])
> {quote}Let's assume the following case.
>  - a stream processor that uses the Processor API
>  - context.schedule(1000) is called in the init()
>  - the processor reads only one topic that has one partition
>  - using custom timestamp extractor, but that timestamp is just a wall
>  clock time
>  Image the following events:
>  1., for 10 seconds I send in 5 messages / second
>  2., does not send any messages for 3 seconds
>  3., starts the 5 messages / second again
> I see that punctuate() is not called during the 3 seconds when I do not 
>  send any messages. This is ok according to the documentation, because 
>  there is not any new messages to trigger the punctuate() call. When the 
>  first few messages arrives after a restart the sending (point 3. above) I 
>  see the following sequence of method calls:
> 1., process() on the 1st message
>  2., punctuate() is called 3 times
>  3., process() on the 2nd message
>  4., process() on each following message
> What I would expect instead is that punctuate() is called first and then 
>  process() is called on the messages, because the first message's timestamp 
>  is already 3 seconds older then the last punctuate() was called, so the 
>  first message belongs after the 3 punctuate() calls.
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4587) Rethink Unification of Caching with Dedupping

2018-09-09 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-4587:

Fix Version/s: 2.1.0

> Rethink Unification of Caching with Dedupping
> -
>
> Key: KAFKA-4587
> URL: https://issues.apache.org/jira/browse/KAFKA-4587
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
> Fix For: 2.1.0
>
>
> This is discussed in PR https://github.com/apache/kafka/pull/1588
> In order to support user-customizable state store suppliers in the DSL we did 
> the following:
> 1) Introduce a {{TupleForwarder}} to forward tuples from cached stores that 
> is wrapping user customized stores.
> 2) Narrow the scope to only dedup on forwarding if it is the default 
> CachingXXStore with wrapper RocksDB. 
> With this, the unification of dedupping and caching is less useful now, and 
> we are complicating the inner implementations for forwarding a lot. We need 
> to re-consider this feature with finer granularity of turning on / off 
> caching per store, potentially with explicit triggers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6556) allow to attach callbacks in kafka streams, to be triggered when a window is expired

2018-09-09 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-6556:

Fix Version/s: 2.1.0

> allow to attach callbacks in kafka streams, to be triggered when a window is 
> expired 
> -
>
> Key: KAFKA-6556
> URL: https://issues.apache.org/jira/browse/KAFKA-6556
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: igor mazor
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.1.0
>
>
> Allowing to attach callbacks in kafka streams, to be triggered when a window 
> is expired,
>  would help in use cases when the final state of the window is required.
>  It would be also useful if together with that functionally the user would be 
> able to control whether the callback would be triggered in addition to 
> emitting the normal change log down the stream, or only triggering the 
> callback when the window is expired. (for example in cases when only the 
> final window state is required, and any updates to the window state during 
> the window time interval are not important)  
> An example for use case could be left join with proper sql semantics:
>  A left join B currently would emit (a1,null) and (a1,b1) if b1 arrives 
> within the defined join time window.
>  what I would like is to have ONLY ONE result:
>  (a1,null) if no b1 arrived during the defined join time window, OR ONLY 
> (a1,b1) if b1 did arrived in the specified join time window.
>  One possible solution could be to use the current kafka streams left join 
> with downstream processor which would put the results in a windowed Ktable.
>  The window size would be same as for the left join operation, however, only 
> the final state of that window would be emitted down the stream once the time 
> window is expired.
>  So if the left join produces (a1, null) and after X minutes no (a1, b1) was 
> produced, eventually only (a1, null) would be emitted, on the other hand, if 
> the left join produces (a1, null) and after X-t minutes (a1, b1) was produced 
> by the left join operation => only (a1, b1) would be emitted eventually down 
> the stream after X minutes.
>  
> Another use case is when the window state is written to another kafka topic 
> which is then used to persist the window states into a db, However, many 
> times only the final window state
>  is required, and functionality to get only the last window state would help 
> in reducing load from the db, since only the final window state would be 
> persisted to the db, instead of multiple db writes for each window state 
> update. 
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6443) KTable involved in multiple joins could result in duplicate results

2018-09-09 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-6443:

Fix Version/s: 2.1.0

> KTable involved in multiple joins could result in duplicate results
> ---
>
> Key: KAFKA-6443
> URL: https://issues.apache.org/jira/browse/KAFKA-6443
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
> Fix For: 2.1.0
>
>
> Consider the following multi table-table joins:
> {code}
> table1.join(table2).join(table2);// "join" could be replaced with 
> "leftJoin" and "outerJoin"
> {code}
> where {{table2}} is involved multiple times in this multi-way joins. In this 
> case, when a new record from the source topic of {{table2}} is being 
> processing, it will send to two children down in the topology and hence may 
> resulting in duplicated join results depending on the join types.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4690) IllegalStateException using DeleteTopicsRequest when delete.topic.enable=false

2018-09-09 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-4690:

Fix Version/s: 2.1.0

> IllegalStateException using DeleteTopicsRequest when delete.topic.enable=false
> --
>
> Key: KAFKA-4690
> URL: https://issues.apache.org/jira/browse/KAFKA-4690
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.0
> Environment: OS X
>Reporter: Jon Chiu
>Assignee: Manikumar
>Priority: Major
> Fix For: 2.1.0
>
> Attachments: delete-topics-request.java
>
>
> There is no indication as to why the delete request fails. Perhaps an error 
> code?
> This can be reproduced with the following steps:
> 1. Start ZK and 1 broker (with default {{delete.topic.enable=false}})
> 2. Create a topic test
> {noformat}
> bin/kafka-topics.sh --zookeeper localhost:2181 \
>   --create --topic test --partition 1 --replication-factor 1
> {noformat}
> 3. Delete topic by sending a DeleteTopicsRequest
> 4. An error is returned
> {noformat}
> org.apache.kafka.common.errors.DisconnectException
> {noformat}
> or 
> {noformat}
> java.lang.IllegalStateException: Attempt to retrieve exception from future 
> which hasn't failed
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.exception(RequestFuture.java:99)
>   at 
> io.confluent.adminclient.KafkaAdminClient.send(KafkaAdminClient.java:195)
>   at 
> io.confluent.adminclient.KafkaAdminClient.deleteTopic(KafkaAdminClient.java:152)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6880) Zombie replicas must be fenced

2018-09-09 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-6880:

Fix Version/s: 2.1.0

> Zombie replicas must be fenced
> --
>
> Key: KAFKA-6880
> URL: https://issues.apache.org/jira/browse/KAFKA-6880
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.1.0
>
>
> Let's say we have three replicas for a partition: 1, 2 ,and 3.
> In epoch 0, broker 1 is the leader and writes up to offset 50. Broker 2 
> replicates up to offset 50, but broker 3 is a little behind at offset 40. The 
> high watermark is 40. 
> Suppose that broker 2 has a zk session expiration event, but fails to detect 
> it or fails to reestablish a session (e.g. due to a bug like KAFKA-6879), and 
> it continues fetching from broker 1.
> For whatever reason, broker 3 is elected the leader for epoch 1 beginning at 
> offset 40. Broker 1 detects the leader change and truncates its log to offset 
> 40. Some new data is appended up to offset 60, which is fully replicated to 
> broker 1. Broker 2 continues fetching from broker 1 at offset 50, but gets 
> NOT_LEADER_FOR_PARTITION errors, which is retriable and hence broker 2 will 
> retry.
> After some time, broker 1 becomes the leader again for epoch 2. Broker 1 
> begins at offset 60. Broker 2 has not exhausted retries and is now able to 
> fetch at offset 50 and append the last 10 records in order to catch up. 
> However, because it did not observed the leader changes, it never saw the 
> need to truncate its log. Hence offsets 40-49 still reflect the uncommitted 
> changes from epoch 0. Neither KIP-101 nor KIP-279 can fix this because the 
> tail of the log is correct.
> The basic problem is that zombie replicas are not fenced properly by the 
> leader epoch. We actually observed a sequence roughly like this after a 
> broker had partially deadlocked from KAFKA-6879. We should add the leader 
> epoch to fetch requests so that the leader can fence the zombie replicas.
> A related problem is that we currently allow such zombie replicas to be added 
> to the ISR even if they are in an offline state. The problem is that the 
> controller will never elect them, so being part of the ISR does not give the 
> availability guarantee that is intended. This would also be fixed by 
> verifying replica leader epoch in fetch requests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6774) Improve default groupId behavior in consumer

2018-09-09 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-6774:

Fix Version/s: 2.1.0

> Improve default groupId behavior in consumer
> 
>
> Key: KAFKA-6774
> URL: https://issues.apache.org/jira/browse/KAFKA-6774
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Vahid Hashemian
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.1.0
>
>
> At the moment, the default groupId in the consumer is "". If you try to use 
> this to subscribe() to a topic, the broker will reject the group as invalid. 
> On the other hand, if you use it with assign(), then the user will be able to 
> fetch and commit offsets using the empty groupId. Probably 99% of the time, 
> this is not what the user expects. Instead you would probably expect that if 
> no groupId is provided, then no committed offsets will be fetched at all and 
> we'll just use the auto reset behavior if we don't have a current position.
> Here are two potential solutions (both requiring a KIP):
> 1. Change the default to null. We will preserve the current behavior for 
> subscribe(). When using assign(), we will not bother fetching committed 
> offsets for the null groupId, and any attempt to commit offsets will raise an 
> error. The user can still use the empty groupId, but they have to specify it 
> explicitly.
> 2. Keep the current default, but change the consumer to treat this value as 
> if it were null as described in option 1. The argument for this behavior is 
> that using the empty groupId to commit offsets is inherently a dangerous 
> practice and should not be permitted. We'd have to convince ourselves that 
> we're fine not needing to allow the empty groupId for backwards compatibility 
> though.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4932) Add UUID Serde

2018-09-09 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-4932:

Fix Version/s: 2.1.0

> Add UUID Serde
> --
>
> Key: KAFKA-4932
> URL: https://issues.apache.org/jira/browse/KAFKA-4932
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Reporter: Jeff Klukas
>Assignee: Brandon Kirchner
>Priority: Minor
>  Labels: needs-kip, newbie
> Fix For: 2.1.0
>
>
> I propose adding serializers and deserializers for the java.util.UUID class.
> I have many use cases where I want to set the key of a Kafka message to be a 
> UUID. Currently, I need to turn UUIDs into strings or byte arrays and use 
> their associated Serdes, but it would be more convenient to serialize and 
> deserialize UUIDs directly.
> I'd propose that the serializer and deserializer use the 36-byte string 
> representation, calling UUID.toString and UUID.fromString, and then using the 
> existing StringSerializer / StringDeserializer to finish the job. We would 
> also wrap these in a Serde and modify the streams Serdes class to include 
> this in the list of supported types.
> Optionally, we could have the deserializer support a 16-byte representation 
> and it would check the size of the input byte array to determine whether it's 
> a binary or string representation of the UUID. It's not well defined whether 
> the most significant bits or least significant go first, so this deserializer 
> would have to support only one or the other.
> Similary, if the deserializer supported a 16-byte representation, there could 
> be two variants of the serializer, a UUIDStringSerializer and a 
> UUIDBytesSerializer.
> I would be willing to write this PR, but am looking for feedback about 
> whether there are significant concerns here around ambiguity of what the byte 
> representation of a UUID should be, or if there's desire to keep to list of 
> built-in Serdes minimal such that a PR would be unlikely to be accepted.
> KIP Link: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-206%3A+Add+support+for+UUID+serialization+and+deserialization



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6049) Kafka Streams: Add Cogroup in the DSL

2018-09-09 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-6049:

Fix Version/s: 2.1.0

> Kafka Streams: Add Cogroup in the DSL
> -
>
> Key: KAFKA-6049
> URL: https://issues.apache.org/jira/browse/KAFKA-6049
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: api, kip, user-experience
> Fix For: 2.1.0
>
>
> When multiple streams aggregate together to form a single larger object (e.g. 
> a shopping website may have a cart stream, a wish list stream, and a 
> purchases stream. Together they make up a Customer), it is very difficult to 
> accommodate this in the Kafka-Streams DSL: it generally requires you to group 
> and aggregate all of the streams to KTables then make multiple outer join 
> calls to end up with a KTable with your desired object. This will create a 
> state store for each stream and a long chain of ValueJoiners that each new 
> record must go through to get to the final object.
> Creating a cogroup method where you use a single state store will:
> * Reduce the number of gets from state stores. With the multiple joins when a 
> new value comes into any of the streams a chain reaction happens where the 
> join processor keep calling ValueGetters until we have accessed all state 
> stores.
> * Slight performance increase. As described above all ValueGetters are called 
> also causing all ValueJoiners to be called forcing a recalculation of the 
> current joined value of all other streams, impacting performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7333) Protocol changes for KIP-320

2018-09-09 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-7333.
-
Resolution: Fixed

> Protocol changes for KIP-320
> 
>
> Key: KAFKA-7333
> URL: https://issues.apache.org/jira/browse/KAFKA-7333
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.1.0
>
>
> Implement protocol changes for 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7333) Protocol changes for KIP-320

2018-09-09 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7333:

Fix Version/s: 2.1.0

> Protocol changes for KIP-320
> 
>
> Key: KAFKA-7333
> URL: https://issues.apache.org/jira/browse/KAFKA-7333
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.1.0
>
>
> Implement protocol changes for 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6082) consider fencing zookeeper updates with controller epoch zkVersion

2018-09-07 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-6082.
-
Resolution: Fixed

> consider fencing zookeeper updates with controller epoch zkVersion
> --
>
> Key: KAFKA-6082
> URL: https://issues.apache.org/jira/browse/KAFKA-6082
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Major
> Fix For: 2.1.0
>
>
>  
> Kafka controller may fail to function properly (even after repeated 
> controller movement) due to the following sequence of events:
>  - User requests topic deletion
>  - Controller A deletes the partition znode
>  - Controller B becomes controller and reads the topic znode
>  - Controller A deletes the topic znode and remove the topic from the topic 
> deletion znode
>  - Controller B reads the partition znode and topic deletion znode
>  - According to controller B's context, the topic znode exists, the topic is 
> not listed for deletion, and some partition is not found for the given topic. 
> Then controller B will create topic znode with empty data (i.e. partition 
> assignment) and create the partition znodes.
>  - All controller after controller B will fail because there is not data in 
> the topic znode.
> The long term solution is to have a way to prevent old controller from 
> writing to zookeeper if it is not the active controller. One idea is to use 
> the zookeeper multi API (See 
> [https://zookeeper.apache.org/doc/r3.4.3/api/org/apache/zookeeper/ZooKeeper.html#multi(java.lang.Iterable))]
>  such that controller only writes to zookeeper if the zk version of the 
> controller epoch znode has not been changed.
> The short term solution is to let controller reads the topic deletion znode 
> first. If the topic is still listed in the topic deletion znode, then the new 
> controller will properly handle partition states of this topic without 
> creating partition znodes for this topic. And if the topic is not listed in 
> the topic deletion znode, then both the topic znode and the partition znodes 
> of this topic should have been deleted by the time the new controller tries 
> to read them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7211) MM should handle timeouts in commitSync

2018-09-04 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7211:

Fix Version/s: 2.1.0

> MM should handle timeouts in commitSync
> ---
>
> Key: KAFKA-7211
> URL: https://issues.apache.org/jira/browse/KAFKA-7211
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: huxihx
>Priority: Major
> Fix For: 2.1.0
>
>
> Now that we have KIP-266, the user can override `default.api.timeout.ms` for 
> the consumer so that commitSync does not block indefinitely. MM needs to be 
> updated to handle TimeoutException. We may also need some logic to handle 
> deleted topics. If MM attempts to commit an offset for a deleted topic, the 
> call will timeout and we should probably check if the topic exists and remove 
> the offset if it doesn't.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7211) MM should handle timeouts in commitSync

2018-09-04 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-7211.
-
Resolution: Fixed

> MM should handle timeouts in commitSync
> ---
>
> Key: KAFKA-7211
> URL: https://issues.apache.org/jira/browse/KAFKA-7211
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: huxihx
>Priority: Major
> Fix For: 2.1.0
>
>
> Now that we have KIP-266, the user can override `default.api.timeout.ms` for 
> the consumer so that commitSync does not block indefinitely. MM needs to be 
> updated to handle TimeoutException. We may also need some logic to handle 
> deleted topics. If MM attempts to commit an offset for a deleted topic, the 
> call will timeout and we should probably check if the topic exists and remove 
> the offset if it doesn't.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list

2018-08-31 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16598948#comment-16598948
 ] 

Dong Lin commented on KAFKA-7278:
-

[~niob] The stacktrace in that Jira seems similar to the issue fixed here. So 
there is good chance that we have fixed that issue as well.

 

> replaceSegments() should not call asyncDeleteSegment() for segments which 
> have been removed from segments list
> --
>
> Key: KAFKA-7278
> URL: https://issues.apache.org/jira/browse/KAFKA-7278
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
>
> Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every 
> segment listed in the `oldSegments`. oldSegments should be constructed from 
> Log.segments and only contain segments listed in Log.segments.
> However, Log.segments may be modified between the time oldSegments is 
> determined to the time Log.replaceSegments() is called. If there are 
> concurrent async deletion of the same log segment file, Log.replaceSegments() 
> will call asyncDeleteSegment() for a segment that does not exist and Kafka 
> server may shutdown the log directory due to NoSuchFileException.
> This is likely the root cause of 
> https://issues.apache.org/jira/browse/KAFKA-6188.
> Given the understanding of the problem, we should be able to fix the issue by 
> only deleting segment if the segment can be found in Log.segments.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (KAFKA-7295) Fix RequestHandlerAvgIdlePercent metric calculation

2018-08-29 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7295:

Comment: was deleted

(was: The issue is mitigated in 
https://issues.apache.org/jira/browse/KAFKA-7354)

> Fix RequestHandlerAvgIdlePercent metric calculation
> ---
>
> Key: KAFKA-7295
> URL: https://issues.apache.org/jira/browse/KAFKA-7295
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently the RequestHandlerAvgIdlePercent metric may be larger than 1 due to 
> the way it is calculated. This is counter-intuitive since by definition it is 
> supposed to be a percentage metric and its value should be in range [0, 1]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-7295) Fix RequestHandlerAvgIdlePercent metric calculation

2018-08-29 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin reopened KAFKA-7295:
-

> Fix RequestHandlerAvgIdlePercent metric calculation
> ---
>
> Key: KAFKA-7295
> URL: https://issues.apache.org/jira/browse/KAFKA-7295
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently the RequestHandlerAvgIdlePercent metric may be larger than 1 due to 
> the way it is calculated. This is counter-intuitive since by definition it is 
> supposed to be a percentage metric and its value should be in range [0, 1]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7354) Fix IdlePercent and NetworkProcessorAvgIdlePercent metric calculation

2018-08-29 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7354:

Fix Version/s: 2.1.0
   2.0.1
   1.1.2

> Fix IdlePercent and NetworkProcessorAvgIdlePercent metric calculation
> -
>
> Key: KAFKA-7354
> URL: https://issues.apache.org/jira/browse/KAFKA-7354
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.0.0
>Reporter: huxihx
>Assignee: huxihx
>Priority: Major
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
>
> Currently, MBean 
> `kafka.network:type=Processor,name=IdlePercent,networkProcessor=*` and 
> `afka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent` could be 
> greater than 1. However, these two values represent a percentage which should 
> not exceed 1.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7295) Fix RequestHandlerAvgIdlePercent metric calculation

2018-08-29 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-7295.
-
Resolution: Fixed

The issue is mitigated in https://issues.apache.org/jira/browse/KAFKA-7354

> Fix RequestHandlerAvgIdlePercent metric calculation
> ---
>
> Key: KAFKA-7295
> URL: https://issues.apache.org/jira/browse/KAFKA-7295
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently the RequestHandlerAvgIdlePercent metric may be larger than 1 due to 
> the way it is calculated. This is counter-intuitive since by definition it is 
> supposed to be a percentage metric and its value should be in range [0, 1]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions

2018-08-23 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16590545#comment-16590545
 ] 

Dong Lin commented on KAFKA-7040:
-

[~luwang] It looks like we need to fix this in Kafka to further reduce the 
chance of message loss. Currently the JIRA is not assigned to anyway. Would you 
have time to fix this (and probably with a test) or would you like other dev to 
work on this?

> The replica fetcher thread may truncate accepted messages during multiple 
> fast leadership transitions
> -
>
> Key: KAFKA-7040
> URL: https://issues.apache.org/jira/browse/KAFKA-7040
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Reporter: Lucas Wang
>Priority: Minor
>
> Problem Statement:
> Consider the scenario where there are two brokers, broker0, and broker1, and 
> there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as 
> the leader and broker0 as the follower. The following sequence of events 
> happened on broker0
> 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to 
> broker1, and awaits to get the response
> 2. A LeaderAndISR request causes broker0 to become the leader for one 
> partition t1p0, which in turn will remove the partition t1p0 from the replica 
> fetcher thread
> 3. Broker0 accepts some messages from a producer
> 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and 
> broker0 to become the follower for partition t1p0. This will cause the 
> partition t1p0 to be added back to the replica fetcher thread on broker0.
> 5. The replica fetcher thread on broker0 receives a response for the 
> LeaderEpoch request issued in step 1, and truncates the accepted messages in 
> step3.
> The issue can be reproduced with the test from 
> https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea
> [1] Initially we set up broker0 to be the follower of two partitions instead 
> of just one, to avoid the shutting down of the replica fetcher thread when it 
> becomes idle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7128) Lagging high watermark can lead to committed data loss after ISR expansion

2018-08-22 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16589533#comment-16589533
 ] 

Dong Lin commented on KAFKA-7128:
-

[~apovzner] Yes you are right. Previously I stopped at the first issue without 
thinking beyond that.

> Lagging high watermark can lead to committed data loss after ISR expansion
> --
>
> Key: KAFKA-7128
> URL: https://issues.apache.org/jira/browse/KAFKA-7128
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Anna Povzner
>Priority: Major
>
> Some model checking exposed a weakness in the ISR expansion logic. We know 
> that the high watermark can go backwards after a leader failover, but we may 
> not have known that this can lead to the loss of committed data.
> Say we have three replicas: r1, r2, and r3. Initially, the ISR consists of 
> (r1, r2) and the leader is r1. r3 is a new replica which has not begun 
> fetching. The data up to offset 10 has been committed to the ISR. Here is the 
> initial state:
> State 1
> ISR: (r1, r2)
>  Leader: r1
>  r1: [hw=10, leo=10]
>  r2: [hw=5, leo=10]
>  r3: [hw=0, leo=0]
> Replica 1 then initiates shutdown (or fails) and leaves the ISR, which makes 
> r2 the new leader. The high watermark is still lagging r1.
> State 2
> ISR: (r2)
>  Leader: r2
>  r1 (offline): [hw=10, leo=10]
>  r2: [hw=5, leo=10]
>  r3: [hw=0, leo=0]
> Replica 3 then catch up to the high watermark on r2 and joins the ISR. 
> Perhaps it's high watermark is lagging behind r2, but this is unimportant.
> State 3
> ISR: (r2, r3)
>  Leader: r2
>  r1 (offline): [hw=10, leo=10]
>  r2: [hw=5, leo=10]
>  r3: [hw=0, leo=5]
> Now r2 fails and r3 is elected leader and is the only member of the ISR. The 
> committed data from offsets 5 to 10 has been lost.
> State 4
> ISR: (r3)
>  Leader: r3
>  r1 (offline): [hw=10, leo=10]
>  r2 (offline): [hw=5, leo=10]
>  r3: [hw=0, leo=5]
> The bug is the fact that we allowed r3 into the ISR after the local high 
> watermark had been reached. Since the follower does not know the true high 
> watermark for the previous leader's epoch, it should not allow a replica to 
> join the ISR until it has caught up to an offset within its own epoch.
> Note this is related to 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-207%3A+Offsets+returned+by+ListOffsetsResponse+should+be+monotonically+increasing+even+during+a+partition+leader+change]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-08-21 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587596#comment-16587596
 ] 

Dong Lin commented on KAFKA-6188:
-

[~TeilaRei] I think the discussion for the fix is in 
https://issues.apache.org/jira/browse/KAFKA-7278. And 
https://issues.apache.org/jira/browse/KAFKA-7278 also has link to the PR 
[https://github.com/apache/kafka/pull/5491]. The PR has been merged to Kafka 
1.1, 2.0 and trunk branch. I think this will very likely fix the same issue 
discussed in this Jira KAFKA-6188. Can you take a look at the discussion 
history in KAFKA-7278? If there is reason to believe that won't fix the same 
issue here, we can re-open this ticket.

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Assignee: Dong Lin
>Priority: Blocker
>  Labels: windows
> Attachments: Segments are opened before deletion, 
> kafka_2.10-0.10.2.1.zip, output.txt
>
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions  are 
> offline due to failure on log directory C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions  (kafka.server.ReplicaFetcherManager)
> [2017-11-08 08:21:57,892] INFO [ReplicaManager broker=0] Broker 0 stopped 
> fetcher for partitions  because they are in the failed log dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,892] INFO Stopping serving logs in dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.log.LogManager)
> [2017-11-08 08:21:57,892] FATAL Shutdown broker because all log dirs in 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs have failed (kafka.log.LogManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-08-21 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-6188.
-
Resolution: Fixed

Likely fixed in https://issues.apache.org/jira/browse/KAFKA-7278.

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Assignee: Dong Lin
>Priority: Blocker
>  Labels: windows
> Attachments: Segments are opened before deletion, 
> kafka_2.10-0.10.2.1.zip, output.txt
>
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions  are 
> offline due to failure on log directory C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions  (kafka.server.ReplicaFetcherManager)
> [2017-11-08 08:21:57,892] INFO [ReplicaManager broker=0] Broker 0 stopped 
> fetcher for partitions  because they are in the failed log dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,892] INFO Stopping serving logs in dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.log.LogManager)
> [2017-11-08 08:21:57,892] FATAL Shutdown broker because all log dirs in 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs have failed (kafka.log.LogManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list

2018-08-20 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7278:

Fix Version/s: 2.1.0
   1.1.2

> replaceSegments() should not call asyncDeleteSegment() for segments which 
> have been removed from segments list
> --
>
> Key: KAFKA-7278
> URL: https://issues.apache.org/jira/browse/KAFKA-7278
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
>
> Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every 
> segment listed in the `oldSegments`. oldSegments should be constructed from 
> Log.segments and only contain segments listed in Log.segments.
> However, Log.segments may be modified between the time oldSegments is 
> determined to the time Log.replaceSegments() is called. If there are 
> concurrent async deletion of the same log segment file, Log.replaceSegments() 
> will call asyncDeleteSegment() for a segment that does not exist and Kafka 
> server may shutdown the log directory due to NoSuchFileException.
> This is likely the root cause of 
> https://issues.apache.org/jira/browse/KAFKA-6188.
> Given the understanding of the problem, we should be able to fix the issue by 
> only deleting segment if the segment can be found in Log.segments.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7313) KAFKA-7313; StopReplicaRequest should attempt to remove future replica for the partition only if future replica exists

2018-08-20 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7313:

Description: 
This patch fixes two issues:

1) Currently if a broker received StopReplicaRequest with delete=true for the 
same offline replica, the first StopRelicaRequest will show 
KafkaStorageException and the second StopRelicaRequest will show 
ReplicaNotAvailableException. This is because the first StopRelicaRequest will 
remove the mapping (tp -> ReplicaManager.OfflinePartition) from 
ReplicaManager.allPartitions before returning KafkaStorageException, thus the 
second StopRelicaRequest will not find this partition as offline.

This result appears to be inconsistent. And since the replica is already 
offline and broker will not be able to delete file for this replica, the 
StopReplicaRequest should fail without making any change and broker should 
still remember that this replica is offline. 

2) Currently if broker receives StopReplicaRequest with delete=true, the broker 
will attempt to remove future replica for the partition, which will cause 
KafkaStorageException in the StopReplicaResponse if this replica does not have 
future replica. It is problematic to always return KafkaStorageException in the 
response if future replica does not exist.

 

 

  was:
This patch fixes two issues:

 

1) Currently if a broker received StopReplicaRequest with delete=true for the 
same offline replica, the first StopRelicaRequest will show 
KafkaStorageException and the second StopRelicaRequest will show 
ReplicaNotAvailableException. This is because the first StopRelicaRequest will 
remove the mapping (tp -> ReplicaManager.OfflinePartition) from 
ReplicaManager.allPartitions before returning KafkaStorageException, thus the 
second StopRelicaRequest will not find this partition as offline.

This result appears to be inconsistent. And since the replica is already 
offline and broker will not be able to delete file for this replica, the 
StopReplicaRequest should fail without making any change and broker should 
still remember that this replica is offline.

 

2) Currently if broker receives StopReplicaRequest with delete=true, the broker 
will attempt to remove future replica for the partition, which will cause 
KafkaStorageException in the StopReplicaResponse if this replica does not have 
future replica. It is problematic to always return KafkaStorageException in the 
response if future replica does not exist.

 

 


> KAFKA-7313; StopReplicaRequest should attempt to remove future replica for 
> the partition only if future replica exists
> --
>
> Key: KAFKA-7313
> URL: https://issues.apache.org/jira/browse/KAFKA-7313
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> This patch fixes two issues:
> 1) Currently if a broker received StopReplicaRequest with delete=true for the 
> same offline replica, the first StopRelicaRequest will show 
> KafkaStorageException and the second StopRelicaRequest will show 
> ReplicaNotAvailableException. This is because the first StopRelicaRequest 
> will remove the mapping (tp -> ReplicaManager.OfflinePartition) from 
> ReplicaManager.allPartitions before returning KafkaStorageException, thus the 
> second StopRelicaRequest will not find this partition as offline.
> This result appears to be inconsistent. And since the replica is already 
> offline and broker will not be able to delete file for this replica, the 
> StopReplicaRequest should fail without making any change and broker should 
> still remember that this replica is offline. 
> 2) Currently if broker receives StopReplicaRequest with delete=true, the 
> broker will attempt to remove future replica for the partition, which will 
> cause KafkaStorageException in the StopReplicaResponse if this replica does 
> not have future replica. It is problematic to always return 
> KafkaStorageException in the response if future replica does not exist.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7313) KAFKA-7313; StopReplicaRequest should attempt to remove future replica for the partition only if future replica exists

2018-08-20 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7313:

Description: 
This patch fixes two issues:

 

1) Currently if a broker received StopReplicaRequest with delete=true for the 
same offline replica, the first StopRelicaRequest will show 
KafkaStorageException and the second StopRelicaRequest will show 
ReplicaNotAvailableException. This is because the first StopRelicaRequest will 
remove the mapping (tp -> ReplicaManager.OfflinePartition) from 
ReplicaManager.allPartitions before returning KafkaStorageException, thus the 
second StopRelicaRequest will not find this partition as offline.

This result appears to be inconsistent. And since the replica is already 
offline and broker will not be able to delete file for this replica, the 
StopReplicaRequest should fail without making any change and broker should 
still remember that this replica is offline.

 

2) Currently if broker receives StopReplicaRequest with delete=true, the broker 
will attempt to remove future replica for the partition, which will cause 
KafkaStorageException in the StopReplicaResponse if this replica does not have 
future replica. It is problematic to always return KafkaStorageException in the 
response if future replica does not exist.

 

 

  was:
Currently if a broker received StopReplicaRequest with delete=true for the same 
offline replica, the first StopRelicaRequest will show KafkaStorageException 
and the second StopRelicaRequest will show ReplicaNotAvailableException. This 
is because the first StopRelicaRequest will remove the mapping (tp -> 
ReplicaManager.OfflinePartition) from ReplicaManager.allPartitions before 
returning KafkaStorageException, thus the second StopRelicaRequest will not 
find this partition as offline.

This result appears to be inconsistent. And since the replica is already 
offline and broker will not be able to delete file for this replica, the 
StopReplicaRequest should fail without making any change and broker should 
still remember that this replica is offline.


> KAFKA-7313; StopReplicaRequest should attempt to remove future replica for 
> the partition only if future replica exists
> --
>
> Key: KAFKA-7313
> URL: https://issues.apache.org/jira/browse/KAFKA-7313
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> This patch fixes two issues:
>  
> 1) Currently if a broker received StopReplicaRequest with delete=true for the 
> same offline replica, the first StopRelicaRequest will show 
> KafkaStorageException and the second StopRelicaRequest will show 
> ReplicaNotAvailableException. This is because the first StopRelicaRequest 
> will remove the mapping (tp -> ReplicaManager.OfflinePartition) from 
> ReplicaManager.allPartitions before returning KafkaStorageException, thus the 
> second StopRelicaRequest will not find this partition as offline.
> This result appears to be inconsistent. And since the replica is already 
> offline and broker will not be able to delete file for this replica, the 
> StopReplicaRequest should fail without making any change and broker should 
> still remember that this replica is offline.
>  
> 2) Currently if broker receives StopReplicaRequest with delete=true, the 
> broker will attempt to remove future replica for the partition, which will 
> cause KafkaStorageException in the StopReplicaResponse if this replica does 
> not have future replica. It is problematic to always return 
> KafkaStorageException in the response if future replica does not exist.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7313) KAFKA-7313; StopReplicaRequest should attempt to remove future replica for the partition only if future replica exists

2018-08-20 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7313:

Summary: KAFKA-7313; StopReplicaRequest should attempt to remove future 
replica for the partition only if future replica exists  (was: 
StopReplicaRequest should not remove partition from 
ReplicaManager.allPartitions if the replica is offline)

> KAFKA-7313; StopReplicaRequest should attempt to remove future replica for 
> the partition only if future replica exists
> --
>
> Key: KAFKA-7313
> URL: https://issues.apache.org/jira/browse/KAFKA-7313
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently if a broker received StopReplicaRequest with delete=true for the 
> same offline replica, the first StopRelicaRequest will show 
> KafkaStorageException and the second StopRelicaRequest will show 
> ReplicaNotAvailableException. This is because the first StopRelicaRequest 
> will remove the mapping (tp -> ReplicaManager.OfflinePartition) from 
> ReplicaManager.allPartitions before returning KafkaStorageException, thus the 
> second StopRelicaRequest will not find this partition as offline.
> This result appears to be inconsistent. And since the replica is already 
> offline and broker will not be able to delete file for this replica, the 
> StopReplicaRequest should fail without making any change and broker should 
> still remember that this replica is offline.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7313) StopReplicaRequest should not remove partition from ReplicaManager.allPartitions if the replica is offline

2018-08-19 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7313:

Description: 
Currently if a broker received StopReplicaRequest with delete=true for the same 
offline replica, the first StopRelicaRequest will show KafkaStorageException 
and the second StopRelicaRequest will show ReplicaNotAvailableException. This 
is because the first StopRelicaRequest will remove the mapping (tp -> 
ReplicaManager.OfflinePartition) from ReplicaManager.allPartitions before 
returning KafkaStorageException, thus the second StopRelicaRequest will not 
find this partition as offline.

This result appears to be inconsistent. And since the replica is already 
offline and broker will not be able to delete file for this replica, the 
StopReplicaRequest should fail without making any change and broker should 
still remember that this replica is offline.

> StopReplicaRequest should not remove partition from 
> ReplicaManager.allPartitions if the replica is offline
> --
>
> Key: KAFKA-7313
> URL: https://issues.apache.org/jira/browse/KAFKA-7313
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently if a broker received StopReplicaRequest with delete=true for the 
> same offline replica, the first StopRelicaRequest will show 
> KafkaStorageException and the second StopRelicaRequest will show 
> ReplicaNotAvailableException. This is because the first StopRelicaRequest 
> will remove the mapping (tp -> ReplicaManager.OfflinePartition) from 
> ReplicaManager.allPartitions before returning KafkaStorageException, thus the 
> second StopRelicaRequest will not find this partition as offline.
> This result appears to be inconsistent. And since the replica is already 
> offline and broker will not be able to delete file for this replica, the 
> StopReplicaRequest should fail without making any change and broker should 
> still remember that this replica is offline.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7313) StopReplicaRequest should not remove partition from ReplicaManager.allPartitions if the replica is offline

2018-08-19 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7313:

Summary: StopReplicaRequest should not remove partition from 
ReplicaManager.allPartitions if the replica is offline  (was: 
StopReplicaRequest should not remove partition from 
ReplicaManager.allPartitions if partition is offline)

> StopReplicaRequest should not remove partition from 
> ReplicaManager.allPartitions if the replica is offline
> --
>
> Key: KAFKA-7313
> URL: https://issues.apache.org/jira/browse/KAFKA-7313
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7313) StopReplicaRequest should not remove partition from ReplicaManager.allPartitions if partition is offline

2018-08-19 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin reassigned KAFKA-7313:
---

Assignee: Dong Lin

> StopReplicaRequest should not remove partition from 
> ReplicaManager.allPartitions if partition is offline
> 
>
> Key: KAFKA-7313
> URL: https://issues.apache.org/jira/browse/KAFKA-7313
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7313) StopReplicaRequest should not remove partition from ReplicaManager.allPartitions if partition is offline

2018-08-19 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-7313:
---

 Summary: StopReplicaRequest should not remove partition from 
ReplicaManager.allPartitions if partition is offline
 Key: KAFKA-7313
 URL: https://issues.apache.org/jira/browse/KAFKA-7313
 Project: Kafka
  Issue Type: Improvement
Reporter: Dong Lin






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7297) Both read/write access to Log.segments should be protected by lock

2018-08-15 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-7297:
---

 Summary: Both read/write access to Log.segments should be 
protected by lock
 Key: KAFKA-7297
 URL: https://issues.apache.org/jira/browse/KAFKA-7297
 Project: Kafka
  Issue Type: Improvement
Reporter: Dong Lin
Assignee: Dong Lin


Log.replaceSegments() updates segments in two steps. It first adds new segments 
and then remove old segments. Though this operation is protected by a lock, 
other read access such as Log.logSegments does not grab lock and thus these 
methods may return an inconsistent view of the segments.

As an example, say Log.replaceSegments() intends to replace segments [0, 100), 
[100, 200) with a new segment [0, 200). In this case if Log.logSegments is 
called right after the new segments are added, the method may return segments 
[0, 200), [100, 200) and messages in the range [100, 200) may be duplicated if 
caller choose to enumerate all messages in all segments returned by the method.

The solution is probably to protect read/write access to Log.segments with 
read/write lock.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7295) Fix RequestHandlerAvgIdlePercent metric calculation

2018-08-14 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-7295:
---

 Summary: Fix RequestHandlerAvgIdlePercent metric calculation
 Key: KAFKA-7295
 URL: https://issues.apache.org/jira/browse/KAFKA-7295
 Project: Kafka
  Issue Type: Improvement
Reporter: Dong Lin
Assignee: Dong Lin


Currently the RequestHandlerAvgIdlePercent metric may be larger than 1 due to 
the way it is calculated. This is counter-intuitive since by definition it is 
supposed to be a percentage metric and its value should be in range [0, 1]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list

2018-08-12 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577707#comment-16577707
 ] 

Dong Lin commented on KAFKA-7278:
-

Yes `segment.changeFileSuffixes("", Log.DeletedFileSuffix)` is executed when 
the lock is hold. But the lock is released between step 2), 3) and 4) in the 
example sequence provided above.

> replaceSegments() should not call asyncDeleteSegment() for segments which 
> have been removed from segments list
> --
>
> Key: KAFKA-7278
> URL: https://issues.apache.org/jira/browse/KAFKA-7278
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every 
> segment listed in the `oldSegments`. oldSegments should be constructed from 
> Log.segments and only contain segments listed in Log.segments.
> However, Log.segments may be modified between the time oldSegments is 
> determined to the time Log.replaceSegments() is called. If there are 
> concurrent async deletion of the same log segment file, Log.replaceSegments() 
> will call asyncDeleteSegment() for a segment that does not exist and Kafka 
> server may shutdown the log directory due to NoSuchFileException.
> This is likely the root cause of 
> https://issues.apache.org/jira/browse/KAFKA-6188.
> Given the understanding of the problem, we should be able to fix the issue by 
> only deleting segment if the segment can be found in Log.segments.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list

2018-08-12 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577706#comment-16577706
 ] 

Dong Lin commented on KAFKA-7278:
-

[~ijuma] The exception is probably thrown from `segment.changeFileSuffixes("", 
Log.DeletedFileSuffix)`. Below is the stacktrace in the discussion of 
https://issues.apache.org/jira/browse/KAFKA-6188.

{code}

[2018-05-07 16:53:06,721] ERROR Failed to clean up log for 
__consumer_offsets-24 in dir /tmp/kafka-logs due to IOException 
(kafka.server.LogDirFailureChannel)
java.nio.file.NoSuchFileException: 
/tmp/kafka-logs/__consumer_offsets-24/.log
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:409)
at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
at java.nio.file.Files.move(Files.java:1395)
at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697)
at org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212)
at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:415)
at kafka.log.Log.asyncDeleteSegment(Log.scala:1601)
at kafka.log.Log.$anonfun$replaceSegments$1(Log.scala:1653)
at kafka.log.Log.$anonfun$replaceSegments$1$adapted(Log.scala:1648)
at scala.collection.immutable.List.foreach(List.scala:389)
at kafka.log.Log.replaceSegments(Log.scala:1648)
at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:535)
at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:462)
at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:461)
at scala.collection.immutable.List.foreach(List.scala:389)
at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
at kafka.log.Cleaner.clean(LogCleaner.scala:438)
at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
Suppressed: java.nio.file.NoSuchFileException: 
/tmp/kafka-logs/__consumer_offsets-24/.log -> 
/tmp/kafka-logs/__consumer_offsets-24/.log.deleted
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:396)
at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
at java.nio.file.Files.move(Files.java:1395)
at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:694)
... 16 more
[2018-05-07 16:53:06,725] INFO [ReplicaManager broker=0] Stopping serving 
replicas in dir /tmp/kafka-logs (kafka.server.ReplicaManager)
[2018-05-07 16:53:06,762] INFO Stopping serving logs in dir /tmp/kafka-logs 
(kafka.log.LogManager)
[2018-05-07 16:53:07,032] ERROR Shutdown broker because all log dirs in 
/tmp/kafka-logs have failed (kafka.log.LogManager)

{code}

> replaceSegments() should not call asyncDeleteSegment() for segments which 
> have been removed from segments list
> --
>
> Key: KAFKA-7278
> URL: https://issues.apache.org/jira/browse/KAFKA-7278
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every 
> segment listed in the `oldSegments`. oldSegments should be constructed from 
> Log.segments and only contain segments listed in Log.segments.
> However, Log.segments may be modified between the time oldSegments is 
> determined to the time Log.replaceSegments() is called. If there are 
> concurrent async deletion of the same log segment file, Log.replaceSegments() 
> will call asyncDeleteSegment() for a segment that does not exist and Kafka 
> server may shutdown the log directory due to NoSuchFileException.
> This is likely the root cause of 
> https://issues.apache.org/jira/browse/KAFKA-6188.
> Given the understanding of the problem, we should be able to fix the issue by 
> only deleting segment if the segment can be found in Log.segments.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list

2018-08-11 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577335#comment-16577335
 ] 

Dong Lin edited comment on KAFKA-7278 at 8/11/18 9:54 PM:
--

[~ijuma] Yeah the latest code in trunk seems to have this issue. The following 
sequence of events may happen:

1) There is segment1, segment2 and segment 3 for a given partition

2) LogCleaner determines to merge segment1 and segment2 into a new segment and 
will call Log.replaceSegments(..., oldSegments=[segment1, segment2])

3) Log retention is triggered and Log.deleteSegment(segment=segment1) is called 
and executed. This renames the files for segment1 from log directory.

4) Log.replaceSegments(oldSegments=[segment1, segment2]) is executed and 
Log.asyncDeleteSegment(segment1) is executed, which fails to find files for 
segment1 and throws IOException.

 


was (Author: lindong):
[~ijuma] Yeah the latest code in trunk seems to have this issue. The following 
sequence of events may happen:

1) There is segment1, segment2 and segment 3 for a given partition

2) LogCleaner determines to merge segment1 and segment2 into segment3 and will 
call Log.replaceSegments(..., oldSegments=[segment1, segment2])

3) Log retention is triggered and Log.deleteSegment(segment=segment1) is called 
and executed. This renames the files for segment1 from log directory.

4) Log.replaceSegments(oldSegments=[segment1, segment2]) is executed and 
Log.asyncDeleteSegment(segment1) is executed, which fails to find files for 
segment1 and throws IOException.

 

> replaceSegments() should not call asyncDeleteSegment() for segments which 
> have been removed from segments list
> --
>
> Key: KAFKA-7278
> URL: https://issues.apache.org/jira/browse/KAFKA-7278
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every 
> segment listed in the `oldSegments`. oldSegments should be constructed from 
> Log.segments and only contain segments listed in Log.segments.
> However, Log.segments may be modified between the time oldSegments is 
> determined to the time Log.replaceSegments() is called. If there are 
> concurrent async deletion of the same log segment file, Log.replaceSegments() 
> will call asyncDeleteSegment() for a segment that does not exist and Kafka 
> server may shutdown the log directory due to NoSuchFileException.
> This is likely the root cause of 
> https://issues.apache.org/jira/browse/KAFKA-6188.
> Given the understanding of the problem, we should be able to fix the issue by 
> only deleting segment if the segment can be found in Log.segments.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list

2018-08-11 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577335#comment-16577335
 ] 

Dong Lin commented on KAFKA-7278:
-

[~ijuma] Yeah the latest code in trunk seems to have this issue. The following 
sequence of events may happen:

1) There is segment1, segment2 and segment 3 for a given partition

2) LogCleaner determines to merge segment1 and segment2 into segment3 and will 
call Log.replaceSegments(..., oldSegments=[segment1, segment2])

3) Log retention is triggered and Log.deleteSegment(segment=segment1) is called 
and executed. This renames the files for segment1 from log directory.

4) Log.replaceSegments(oldSegments=[segment1, segment2]) is executed and 
Log.asyncDeleteSegment(segment1) is executed, which fails to find files for 
segment1 and throws IOException.

 

> replaceSegments() should not call asyncDeleteSegment() for segments which 
> have been removed from segments list
> --
>
> Key: KAFKA-7278
> URL: https://issues.apache.org/jira/browse/KAFKA-7278
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every 
> segment listed in the `oldSegments`. oldSegments should be constructed from 
> Log.segments and only contain segments listed in Log.segments.
> However, Log.segments may be modified between the time oldSegments is 
> determined to the time Log.replaceSegments() is called. If there are 
> concurrent async deletion of the same log segment file, Log.replaceSegments() 
> will call asyncDeleteSegment() for a segment that does not exist and Kafka 
> server may shutdown the log directory due to NoSuchFileException.
> This is likely the root cause of 
> https://issues.apache.org/jira/browse/KAFKA-6188.
> Given the understanding of the problem, we should be able to fix the issue by 
> only deleting segment if the segment can be found in Log.segments.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-08-11 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin reassigned KAFKA-6188:
---

Assignee: Dong Lin

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Assignee: Dong Lin
>Priority: Blocker
>  Labels: windows
> Attachments: Segments are opened before deletion, 
> kafka_2.10-0.10.2.1.zip, output.txt
>
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions  are 
> offline due to failure on log directory C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions  (kafka.server.ReplicaFetcherManager)
> [2017-11-08 08:21:57,892] INFO [ReplicaManager broker=0] Broker 0 stopped 
> fetcher for partitions  because they are in the failed log dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,892] INFO Stopping serving logs in dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.log.LogManager)
> [2017-11-08 08:21:57,892] FATAL Shutdown broker because all log dirs in 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs have failed (kafka.log.LogManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-08-11 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577252#comment-16577252
 ] 

Dong Lin commented on KAFKA-6188:
-

[~manme...@gmail.com] It seems rare to explicitly document what we don't 
support. In general open source projects will document what they support. Those 
that are not documented by default means it is not supported, right?

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Priority: Blocker
>  Labels: windows
> Attachments: Segments are opened before deletion, 
> kafka_2.10-0.10.2.1.zip, output.txt
>
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions  are 
> offline due to failure on log directory C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions  (kafka.server.ReplicaFetcherManager)
> [2017-11-08 08:21:57,892] INFO [ReplicaManager broker=0] Broker 0 stopped 
> fetcher for partitions  because they are in the failed log dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,892] INFO Stopping serving logs in dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.log.LogManager)
> [2017-11-08 08:21:57,892] FATAL Shutdown broker because all log dirs in 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs have failed (kafka.log.LogManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-08-11 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577251#comment-16577251
 ] 

Dong Lin commented on KAFKA-6188:
-

[~ijuma] [~manme...@gmail.com] [~TeilaRei] I found that problem that can likely 
cause the issue discussed here. The explanation and fix can be found in 
https://issues.apache.org/jira/browse/KAFKA-7278.

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Priority: Blocker
>  Labels: windows
> Attachments: Segments are opened before deletion, 
> kafka_2.10-0.10.2.1.zip, output.txt
>
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions  are 
> offline due to failure on log directory C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions  (kafka.server.ReplicaFetcherManager)
> [2017-11-08 08:21:57,892] INFO [ReplicaManager broker=0] Broker 0 stopped 
> fetcher for partitions  because they are in the failed log dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,892] INFO Stopping serving logs in dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.log.LogManager)
> [2017-11-08 08:21:57,892] FATAL Shutdown broker because all log dirs in 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs have failed (kafka.log.LogManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-08-11 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577251#comment-16577251
 ] 

Dong Lin edited comment on KAFKA-6188 at 8/11/18 5:27 PM:
--

[~ijuma] [~manme...@gmail.com] [~TeilaRei] I found a problem that can likely 
cause the issue discussed here. The explanation and fix can be found in 
https://issues.apache.org/jira/browse/KAFKA-7278.


was (Author: lindong):
[~ijuma] [~manme...@gmail.com] [~TeilaRei] I found that problem that can likely 
cause the issue discussed here. The explanation and fix can be found in 
https://issues.apache.org/jira/browse/KAFKA-7278.

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Priority: Blocker
>  Labels: windows
> Attachments: Segments are opened before deletion, 
> kafka_2.10-0.10.2.1.zip, output.txt
>
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions  are 
> offline due to failure on log directory C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions  (kafka.server.ReplicaFetcherManager)
> [2017-11-08 08:21:57,892] INFO [ReplicaManager broker=0] Broker 0 stopped 
> fetcher for partitions  because they are in the failed log dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,892] INFO Stopping serving logs in dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.log.LogManager)
> [2017-11-08 08:21:57,892] FATAL Shutdown broker because all log dirs in 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs have failed (kafka.log.LogManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list

2018-08-11 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7278:

Description: 
Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every 
segment listed in the `oldSegments`. oldSegments should be constructed from 
Log.segments and only contain segments listed in Log.segments.

However, Log.segments may be modified between the time oldSegments is 
determined to the time Log.replaceSegments() is called. If there are concurrent 
async deletion of the same log segment file, Log.replaceSegments() will call 
asyncDeleteSegment() for a segment that does not exist and Kafka server may 
shutdown the log directory due to NoSuchFileException.

This is likely the root cause of 
https://issues.apache.org/jira/browse/KAFKA-6188.

Given the understanding of the problem, we should be able to fix the issue by 
only deleting segment if the segment can be found in Log.segments.

 

 

> replaceSegments() should not call asyncDeleteSegment() for segments which 
> have been removed from segments list
> --
>
> Key: KAFKA-7278
> URL: https://issues.apache.org/jira/browse/KAFKA-7278
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every 
> segment listed in the `oldSegments`. oldSegments should be constructed from 
> Log.segments and only contain segments listed in Log.segments.
> However, Log.segments may be modified between the time oldSegments is 
> determined to the time Log.replaceSegments() is called. If there are 
> concurrent async deletion of the same log segment file, Log.replaceSegments() 
> will call asyncDeleteSegment() for a segment that does not exist and Kafka 
> server may shutdown the log directory due to NoSuchFileException.
> This is likely the root cause of 
> https://issues.apache.org/jira/browse/KAFKA-6188.
> Given the understanding of the problem, we should be able to fix the issue by 
> only deleting segment if the segment can be found in Log.segments.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list

2018-08-11 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-7278:
---

 Summary: replaceSegments() should not call asyncDeleteSegment() 
for segments which have been removed from segments list
 Key: KAFKA-7278
 URL: https://issues.apache.org/jira/browse/KAFKA-7278
 Project: Kafka
  Issue Type: Improvement
Reporter: Dong Lin
Assignee: Dong Lin






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-08-11 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577240#comment-16577240
 ] 

Dong Lin commented on KAFKA-6188:
-

[~xmar] The log you provided does not seem to contain any IOException related 
to disk operation. It complains about IOException related to network connection 
to another broker, which is probably OK since broker will retry connection.

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Priority: Blocker
>  Labels: windows
> Attachments: Segments are opened before deletion, 
> kafka_2.10-0.10.2.1.zip, output.txt
>
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions  are 
> offline due to failure on log directory C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions  (kafka.server.ReplicaFetcherManager)
> [2017-11-08 08:21:57,892] INFO [ReplicaManager broker=0] Broker 0 stopped 
> fetcher for partitions  because they are in the failed log dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,892] INFO Stopping serving logs in dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.log.LogManager)
> [2017-11-08 08:21:57,892] FATAL Shutdown broker because all log dirs in 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs have failed (kafka.log.LogManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-08-11 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577239#comment-16577239
 ] 

Dong Lin commented on KAFKA-6188:
-

[~manme...@gmail.com] Thanks for your detailed information. Sorry for late 
reply. I was not monitoring this Jira ticket discussion. My understanding of 
your comment is that FIleSystemException is expected if Kafka tries to 
delete/rename a file which is already open. I checked the current logic in the 
Kafka 1.1.0 code. For log compacted topic, `replaceSegments()` will be called 
which in turn calls `asyncDeleteSegment()` and `segment.changeFileSuffixes`. 
Similarly for non-log-compacted topics, `deleteSegment()` calls 
`asyncDeleteSegment()` which also modified file without first closing the file. 
So it should affect every user constantly if this is an issue, right?

Given that this does not happen in Linux machine at LinkedIn, this issue seems 
to be specific Windows or NAS which requires user to close the file before 
modifying the file. Does this make sense?

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Priority: Blocker
>  Labels: windows
> Attachments: Segments are opened before deletion, 
> kafka_2.10-0.10.2.1.zip, output.txt
>
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions  are 
> offline due to failure on log directory C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions  (kafka.server.ReplicaFetcherManager)
> [2017-11-08 08:21:57,892] INFO [ReplicaManager broker=0] Broker 0 stopped 
> fetcher for partitions  because they are in the failed log dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,892] INFO Stopping serving logs in dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.log.LogManager)
> [2017-11-08 08:21:57,892] FATAL Shutdown broker because all log dirs in 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs have failed (kafka.log.LogManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7147) Allow kafka-reassign-partitions.sh and kafka-log-dirs.sh to take admin client property file

2018-08-10 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-7147.
-
Resolution: Fixed

> Allow kafka-reassign-partitions.sh and kafka-log-dirs.sh to take admin client 
> property file
> ---
>
> Key: KAFKA-7147
> URL: https://issues.apache.org/jira/browse/KAFKA-7147
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
> Fix For: 2.1.0
>
>
> Currently both ReassignPartitionsCommand and LogDirsCommand instantiates 
> AdminClient using bootstrap.servers and client.id provided by the user. Since 
> it does not provide other ssl-related properties, these tools will not be 
> able to talk to broker over SSL.
> In order to solve this problem, these tools should allow users to provide 
> property file containing configs to be passed to AdminClient.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6701) synchronize Log modification between delete cleanup and async delete

2018-08-10 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-6701.
-
Resolution: Fixed

The issue appears to have been fixed in 
https://issues.apache.org/jira/browse/KAFKA-5163. More specifically, 
https://issues.apache.org/jira/browse/KAFKA-5163 added method `Log.renameDir()` 
and this method will grab the per-log lock before making modification to the 
log's directory etc.

> synchronize Log modification between delete cleanup and async delete
> 
>
> Key: KAFKA-6701
> URL: https://issues.apache.org/jira/browse/KAFKA-6701
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sumant Tambe
>Assignee: Sumant Tambe
>Priority: Major
>
> Kafka broker crashes without any evident disk failures 
> From [~becket_qin]: This looks a bug in kafka when topic deletion and log 
> retention cleanup happen at the same time, the log retention cleanup may see 
> ClosedChannelException after the log has been renamed for async deletion.
> The root cause is that the topic deletion should have set the isClosed flag 
> of the partition log to true and the retention should not bother to do the 
> old log segments deletion when the log is closed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-5928) Avoid redundant requests to zookeeper when reassign topic partition

2018-08-06 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin reassigned KAFKA-5928:
---

Assignee: Genmao Yu

> Avoid redundant requests to zookeeper when reassign topic partition
> ---
>
> Key: KAFKA-5928
> URL: https://issues.apache.org/jira/browse/KAFKA-5928
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 0.10.2.1, 0.11.0.0, 1.0.0, 2.0.0
>Reporter: Genmao Yu
>Assignee: Genmao Yu
>Priority: Major
> Fix For: 2.1.0
>
>
> We mistakenly request topic level information according to partitions config 
> in the assignment json file. For example 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala#L550:
>  {code}
> val validPartitions = proposedPartitionAssignment.filter { case (p, _) => 
> validatePartition(zkUtils, p.topic, p.partition) } 
> {code} 
> If reassign 1000 partitions (in 10 topics), we need to request zookeeper 1000 
> times here. But actually we only need to request just 10 (topics) times. We 
> test a large-scale assignment, about 10K partitions. It takes tens of 
> minutes. After optimization, it will reduce to less than 1minute.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5928) Avoid redundant requests to zookeeper when reassign topic partition

2018-08-06 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-5928:

Fix Version/s: 2.1.0

> Avoid redundant requests to zookeeper when reassign topic partition
> ---
>
> Key: KAFKA-5928
> URL: https://issues.apache.org/jira/browse/KAFKA-5928
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 0.10.2.1, 0.11.0.0, 1.0.0, 2.0.0
>Reporter: Genmao Yu
>Priority: Major
> Fix For: 2.1.0
>
>
> We mistakenly request topic level information according to partitions config 
> in the assignment json file. For example 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala#L550:
>  {code}
> val validPartitions = proposedPartitionAssignment.filter { case (p, _) => 
> validatePartition(zkUtils, p.topic, p.partition) } 
> {code} 
> If reassign 1000 partitions (in 10 topics), we need to request zookeeper 1000 
> times here. But actually we only need to request just 10 (topics) times. We 
> test a large-scale assignment, about 10K partitions. It takes tens of 
> minutes. After optimization, it will reduce to less than 1minute.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6648) Fetcher.getTopicMetadata() should return all partitions for each requested topic

2018-08-03 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-6648:

Fix Version/s: 2.0.1

> Fetcher.getTopicMetadata() should return all partitions for each requested 
> topic
> 
>
> Key: KAFKA-6648
> URL: https://issues.apache.org/jira/browse/KAFKA-6648
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.11.0.2, 1.0.0
>Reporter: radai rosenblatt
>Assignee: radai rosenblatt
>Priority: Major
> Fix For: 2.0.1, 2.1.0
>
>
> {code}
> if (!shouldRetry) {
>HashMap> topicsPartitionInfos = new 
> HashMap<>();
>for (String topic : cluster.topics())
>   topicsPartitionInfos.put(topic, 
> cluster.availablePartitionsForTopic(topic));
>return topicsPartitionInfos;
> }
> {code}
> this leads to inconsistent behavior upstream, for example in 
> KafkaConsumer.partitionsFor(), where if there's valid metadata all partitions 
> would be returned, whereas if MD doesnt exist (or has expired) a subset of 
> partitions (only the healthy ones) would be returned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6648) Fetcher.getTopicMetadata() should return all partitions for each requested topic

2018-08-03 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-6648:

Summary: Fetcher.getTopicMetadata() should return all partitions for each 
requested topic  (was: Fetcher.getTopicMetadata() only returns "healthy" 
partitions, not all)

> Fetcher.getTopicMetadata() should return all partitions for each requested 
> topic
> 
>
> Key: KAFKA-6648
> URL: https://issues.apache.org/jira/browse/KAFKA-6648
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.11.0.2, 1.0.0
>Reporter: radai rosenblatt
>Assignee: radai rosenblatt
>Priority: Major
> Fix For: 2.1.0
>
>
> {code}
> if (!shouldRetry) {
>HashMap> topicsPartitionInfos = new 
> HashMap<>();
>for (String topic : cluster.topics())
>   topicsPartitionInfos.put(topic, 
> cluster.availablePartitionsForTopic(topic));
>return topicsPartitionInfos;
> }
> {code}
> this leads to inconsistent behavior upstream, for example in 
> KafkaConsumer.partitionsFor(), where if there's valid metadata all partitions 
> would be returned, whereas if MD doesnt exist (or has expired) a subset of 
> partitions (only the healthy ones) would be returned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7235) Use brokerZkNodeVersion to prevent broker from processing outdated controller request

2018-08-02 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-7235:
---

 Summary: Use brokerZkNodeVersion to prevent broker from processing 
outdated controller request
 Key: KAFKA-7235
 URL: https://issues.apache.org/jira/browse/KAFKA-7235
 Project: Kafka
  Issue Type: Improvement
Reporter: Dong Lin
Assignee: Dong Lin


Currently a broker can process controller requests that are sent before the 
broker is restarted. This could cause a few problems. Here is one example:

Let's assume partitions p1 and p2 exists on broker1.

1) Controller generates LeaderAndIsrRequest with p1 to be sent to broker1.

2) Before controller sends the request, broker1 is quickly restarted.

3) The LeaderAndIsrRequest with p1 is delivered to broker1.

4) After processing the first LeaderAndIsrRequest, broker1 starts to checkpoint 
high watermark for all partitions that it owns. Thus it may overwrite high 
watermark checkpoint file with only the hw for partition p1. The hw for 
partition p2 is now lost, which could be a problem.

In general, the correctness of broker logic currently relies on a few 
assumption, e.g. the first LeaderAndIsrRequest received by broker should 
contain all partitions hosted by the broker, which could break if broker can 
receive controller requests that were generated before it restarts.

 

One reasonable solution to the problem is to include the 
expectedBrokeNodeZkVersion in the controller requests. Broker should remember 
the broker znode zkVersion after it registers itself in the zookeeper. Then 
broker can reject those controller requests whose expectedBrokeNodeZkVersion is 
different from its broker znode zkVersion.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7235) Use brokerZkNodeVersion to prevent broker from processing outdated controller request

2018-08-02 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7235:

Description: 
Currently a broker can process controller requests that are sent before the 
broker is restarted. This could cause a few problems. Here is one example:

Let's assume partitions p1 and p2 exists on broker1.

1) Controller generates LeaderAndIsrRequest with p1 to be sent to broker1.

2) Before controller sends the request, broker1 is quickly restarted.

3) The LeaderAndIsrRequest with p1 is delivered to broker1.

4) After processing the first LeaderAndIsrRequest, broker1 starts to checkpoint 
high watermark for all partitions that it owns. Thus it may overwrite high 
watermark checkpoint file with only the hw for partition p1. The hw for 
partition p2 is now lost, which could be a problem.

In general, the correctness of broker logic currently relies on a few 
assumption, e.g. the first LeaderAndIsrRequest received by broker should 
contain all partitions hosted by the broker, which could break if broker can 
receive controller requests that were generated before it restarts. 

One reasonable solution to the problem is to include the 
expectedBrokeNodeZkVersion in the controller requests. Broker should remember 
the broker znode zkVersion after it registers itself in the zookeeper. Then 
broker can reject those controller requests whose expectedBrokeNodeZkVersion is 
different from its broker znode zkVersion.

 

  was:
Currently a broker can process controller requests that are sent before the 
broker is restarted. This could cause a few problems. Here is one example:

Let's assume partitions p1 and p2 exists on broker1.

1) Controller generates LeaderAndIsrRequest with p1 to be sent to broker1.

2) Before controller sends the request, broker1 is quickly restarted.

3) The LeaderAndIsrRequest with p1 is delivered to broker1.

4) After processing the first LeaderAndIsrRequest, broker1 starts to checkpoint 
high watermark for all partitions that it owns. Thus it may overwrite high 
watermark checkpoint file with only the hw for partition p1. The hw for 
partition p2 is now lost, which could be a problem.

In general, the correctness of broker logic currently relies on a few 
assumption, e.g. the first LeaderAndIsrRequest received by broker should 
contain all partitions hosted by the broker, which could break if broker can 
receive controller requests that were generated before it restarts.

 

One reasonable solution to the problem is to include the 
expectedBrokeNodeZkVersion in the controller requests. Broker should remember 
the broker znode zkVersion after it registers itself in the zookeeper. Then 
broker can reject those controller requests whose expectedBrokeNodeZkVersion is 
different from its broker znode zkVersion.

 


> Use brokerZkNodeVersion to prevent broker from processing outdated controller 
> request
> -
>
> Key: KAFKA-7235
> URL: https://issues.apache.org/jira/browse/KAFKA-7235
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently a broker can process controller requests that are sent before the 
> broker is restarted. This could cause a few problems. Here is one example:
> Let's assume partitions p1 and p2 exists on broker1.
> 1) Controller generates LeaderAndIsrRequest with p1 to be sent to broker1.
> 2) Before controller sends the request, broker1 is quickly restarted.
> 3) The LeaderAndIsrRequest with p1 is delivered to broker1.
> 4) After processing the first LeaderAndIsrRequest, broker1 starts to 
> checkpoint high watermark for all partitions that it owns. Thus it may 
> overwrite high watermark checkpoint file with only the hw for partition p1. 
> The hw for partition p2 is now lost, which could be a problem.
> In general, the correctness of broker logic currently relies on a few 
> assumption, e.g. the first LeaderAndIsrRequest received by broker should 
> contain all partitions hosted by the broker, which could break if broker can 
> receive controller requests that were generated before it restarts. 
> One reasonable solution to the problem is to include the 
> expectedBrokeNodeZkVersion in the controller requests. Broker should remember 
> the broker znode zkVersion after it registers itself in the zookeeper. Then 
> broker can reject those controller requests whose expectedBrokeNodeZkVersion 
> is different from its broker znode zkVersion.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7180) In testHWCheckpointWithFailuresSingleLogSegment, wait until server1 has joined the ISR before shutting down server2

2018-07-29 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-7180.
-
Resolution: Fixed

> In testHWCheckpointWithFailuresSingleLogSegment, wait until server1 has 
> joined the ISR before shutting down server2
> ---
>
> Key: KAFKA-7180
> URL: https://issues.apache.org/jira/browse/KAFKA-7180
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
> Fix For: 2.1.0, 2.0.0
>
>
> In the testHWCheckpointWithFailuresSingleLogSegment method, the test logic is 
> 1. shutdown server1 and then capture the leadership of a partition in the 
> variable "leader", which should be server2
> 2. shutdown server2 and wait until the leadership has changed to a broker 
> other than server2
> through the line 
> waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt 
> = Some(leader))
> However when we execute step 2 and shutdown server2, it's possible that 
> server1 has not caught up with the partition, and has not joined the ISR. 
> With unclean leader election turned off, the leadership cannot be transferred 
> to server1, causing the waited condition in step 2 to be never met. 
> The obvious fix is to wait until server1 has joined the ISR before shutting 
> down server2.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7180) In testHWCheckpointWithFailuresSingleLogSegment, wait until server1 has joined the ISR before shutting down server2

2018-07-29 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7180:

Fix Version/s: 2.1.0

> In testHWCheckpointWithFailuresSingleLogSegment, wait until server1 has 
> joined the ISR before shutting down server2
> ---
>
> Key: KAFKA-7180
> URL: https://issues.apache.org/jira/browse/KAFKA-7180
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
> Fix For: 2.0.0, 2.1.0
>
>
> In the testHWCheckpointWithFailuresSingleLogSegment method, the test logic is 
> 1. shutdown server1 and then capture the leadership of a partition in the 
> variable "leader", which should be server2
> 2. shutdown server2 and wait until the leadership has changed to a broker 
> other than server2
> through the line 
> waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt 
> = Some(leader))
> However when we execute step 2 and shutdown server2, it's possible that 
> server1 has not caught up with the partition, and has not joined the ISR. 
> With unclean leader election turned off, the leadership cannot be transferred 
> to server1, causing the waited condition in step 2 to be never met. 
> The obvious fix is to wait until server1 has joined the ISR before shutting 
> down server2.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7180) In testHWCheckpointWithFailuresSingleLogSegment, wait until server1 has joined the ISR before shutting down server2

2018-07-29 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7180:

Fix Version/s: 2.0.0

> In testHWCheckpointWithFailuresSingleLogSegment, wait until server1 has 
> joined the ISR before shutting down server2
> ---
>
> Key: KAFKA-7180
> URL: https://issues.apache.org/jira/browse/KAFKA-7180
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
> Fix For: 2.0.0, 2.1.0
>
>
> In the testHWCheckpointWithFailuresSingleLogSegment method, the test logic is 
> 1. shutdown server1 and then capture the leadership of a partition in the 
> variable "leader", which should be server2
> 2. shutdown server2 and wait until the leadership has changed to a broker 
> other than server2
> through the line 
> waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt 
> = Some(leader))
> However when we execute step 2 and shutdown server2, it's possible that 
> server1 has not caught up with the partition, and has not joined the ISR. 
> With unclean leader election turned off, the leadership cannot be transferred 
> to server1, causing the waited condition in step 2 to be never met. 
> The obvious fix is to wait until server1 has joined the ISR before shutting 
> down server2.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7126) Reduce number of rebalance for large consumer groups after a topic is created

2018-07-26 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-7126.
-
Resolution: Fixed

> Reduce number of rebalance for large consumer groups after a topic is created
> -
>
> Key: KAFKA-7126
> URL: https://issues.apache.org/jira/browse/KAFKA-7126
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Jon Lee
>Priority: Major
> Fix For: 2.0.0, 2.1.0
>
> Attachments: 1.diff
>
>
> For a group of 200 MirrorMaker consumers with patten-based topic 
> subscription, a single topic creation caused 50 rebalances for each of these 
> consumer over 5 minutes period. This causes the MM to significantly lag 
> behind during this 5 minutes period and the clusters may be considerably 
> out-of-sync during this period.
> Ideally we would like to trigger only 1 rebalance in the MM group after a 
> topic is created. And conceptually it should be doable.
>  
> Here is the explanation of this repeated consumer rebalance based on the 
> consumer rebalance logic in the latest Kafka code:
> 1) A topic of 10 partitions are created in the cluster and it matches the 
> subscription pattern of the MM consumers.
> 2) The leader of the MM consumer group detects the new topic after metadata 
> refresh. It triggers rebalance.
> 3) At time T0, the first rebalance finishes. 10 consumers are assigned 1 
> partition of this topic. The other 190 consumers are not assigned any 
> partition of this topic. At this moment, the newly created topic will appear 
> in `ConsumerCoordinator.subscriptions.subscription` for those consumers who 
> is assigned partition of this consumer or who has refreshed metadata before 
> time T0.
> 4) In the common case, half of the consumers has refreshed metadata before 
> the leader of the consumer group refreshed metadata. Thus around 100 + 10 = 
> 110 consumers has the newly created topic in 
> `ConsumerCoordinator.subscriptions.subscription`. The other 90 consumers do 
> not have this topic in `ConsumerCoordinator.subscriptions.subscription`.
> 5) For those 90 consumers, if any consumer refreshes metadata, it will add 
> this topic to `ConsumerCoordinator.subscriptions.subscription`, which causes 
> `ConsumerCoordinator.rejoinNeededOrPending()` to return true and triggers 
> another rebalance. If a few consumers refresh metadata almost at the same 
> time, they will jointly trigger one rebalance. Otherwise, they each trigger a 
> separate rebalance.
> 6) The default metadata.max.age.ms is 5 minutes. Thus in the worse case, 
> which is probably also the average case if number of consumers in the group 
> is large, the latest consumer will refresh its metadata 5 minutes after T0. 
> And the rebalance will be repeated during this 5 minutes interval.
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7126) Reduce number of rebalance for large consumer groups after a topic is created

2018-07-26 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7126:

Summary: Reduce number of rebalance for large consumer groups after a topic 
is created  (was: Reduce number of rebalance period for large consumer groups 
after a topic is created)

> Reduce number of rebalance for large consumer groups after a topic is created
> -
>
> Key: KAFKA-7126
> URL: https://issues.apache.org/jira/browse/KAFKA-7126
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Jon Lee
>Priority: Major
> Fix For: 2.0.0, 2.1.0
>
> Attachments: 1.diff
>
>
> For a group of 200 MirrorMaker consumers with patten-based topic 
> subscription, a single topic creation caused 50 rebalances for each of these 
> consumer over 5 minutes period. This causes the MM to significantly lag 
> behind during this 5 minutes period and the clusters may be considerably 
> out-of-sync during this period.
> Ideally we would like to trigger only 1 rebalance in the MM group after a 
> topic is created. And conceptually it should be doable.
>  
> Here is the explanation of this repeated consumer rebalance based on the 
> consumer rebalance logic in the latest Kafka code:
> 1) A topic of 10 partitions are created in the cluster and it matches the 
> subscription pattern of the MM consumers.
> 2) The leader of the MM consumer group detects the new topic after metadata 
> refresh. It triggers rebalance.
> 3) At time T0, the first rebalance finishes. 10 consumers are assigned 1 
> partition of this topic. The other 190 consumers are not assigned any 
> partition of this topic. At this moment, the newly created topic will appear 
> in `ConsumerCoordinator.subscriptions.subscription` for those consumers who 
> is assigned partition of this consumer or who has refreshed metadata before 
> time T0.
> 4) In the common case, half of the consumers has refreshed metadata before 
> the leader of the consumer group refreshed metadata. Thus around 100 + 10 = 
> 110 consumers has the newly created topic in 
> `ConsumerCoordinator.subscriptions.subscription`. The other 90 consumers do 
> not have this topic in `ConsumerCoordinator.subscriptions.subscription`.
> 5) For those 90 consumers, if any consumer refreshes metadata, it will add 
> this topic to `ConsumerCoordinator.subscriptions.subscription`, which causes 
> `ConsumerCoordinator.rejoinNeededOrPending()` to return true and triggers 
> another rebalance. If a few consumers refresh metadata almost at the same 
> time, they will jointly trigger one rebalance. Otherwise, they each trigger a 
> separate rebalance.
> 6) The default metadata.max.age.ms is 5 minutes. Thus in the worse case, 
> which is probably also the average case if number of consumers in the group 
> is large, the latest consumer will refresh its metadata 5 minutes after T0. 
> And the rebalance will be repeated during this 5 minutes interval.
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7126) Reduce number of rebalance period for large consumer groups after a topic is created

2018-07-26 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7126:

Fix Version/s: 2.1.0
   2.0.0

> Reduce number of rebalance period for large consumer groups after a topic is 
> created
> 
>
> Key: KAFKA-7126
> URL: https://issues.apache.org/jira/browse/KAFKA-7126
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Jon Lee
>Priority: Major
> Fix For: 2.0.0, 2.1.0
>
> Attachments: 1.diff
>
>
> For a group of 200 MirrorMaker consumers with patten-based topic 
> subscription, a single topic creation caused 50 rebalances for each of these 
> consumer over 5 minutes period. This causes the MM to significantly lag 
> behind during this 5 minutes period and the clusters may be considerably 
> out-of-sync during this period.
> Ideally we would like to trigger only 1 rebalance in the MM group after a 
> topic is created. And conceptually it should be doable.
>  
> Here is the explanation of this repeated consumer rebalance based on the 
> consumer rebalance logic in the latest Kafka code:
> 1) A topic of 10 partitions are created in the cluster and it matches the 
> subscription pattern of the MM consumers.
> 2) The leader of the MM consumer group detects the new topic after metadata 
> refresh. It triggers rebalance.
> 3) At time T0, the first rebalance finishes. 10 consumers are assigned 1 
> partition of this topic. The other 190 consumers are not assigned any 
> partition of this topic. At this moment, the newly created topic will appear 
> in `ConsumerCoordinator.subscriptions.subscription` for those consumers who 
> is assigned partition of this consumer or who has refreshed metadata before 
> time T0.
> 4) In the common case, half of the consumers has refreshed metadata before 
> the leader of the consumer group refreshed metadata. Thus around 100 + 10 = 
> 110 consumers has the newly created topic in 
> `ConsumerCoordinator.subscriptions.subscription`. The other 90 consumers do 
> not have this topic in `ConsumerCoordinator.subscriptions.subscription`.
> 5) For those 90 consumers, if any consumer refreshes metadata, it will add 
> this topic to `ConsumerCoordinator.subscriptions.subscription`, which causes 
> `ConsumerCoordinator.rejoinNeededOrPending()` to return true and triggers 
> another rebalance. If a few consumers refresh metadata almost at the same 
> time, they will jointly trigger one rebalance. Otherwise, they each trigger a 
> separate rebalance.
> 6) The default metadata.max.age.ms is 5 minutes. Thus in the worse case, 
> which is probably also the average case if number of consumers in the group 
> is large, the latest consumer will refresh its metadata 5 minutes after T0. 
> And the rebalance will be repeated during this 5 minutes interval.
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7190) Under low traffic conditions purging repartition topics cause WARN statements about UNKNOWN_PRODUCER_ID

2018-07-25 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556289#comment-16556289
 ] 

Dong Lin commented on KAFKA-7190:
-

Thanks you [~mjsax] [~guozhang] for the discussion.

> Under low traffic conditions purging repartition topics cause WARN statements 
> about  UNKNOWN_PRODUCER_ID 
> -
>
> Key: KAFKA-7190
> URL: https://issues.apache.org/jira/browse/KAFKA-7190
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, streams
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Bill Bejeck
>Assignee: lambdaliu
>Priority: Major
>
> When a streams application has little traffic, then it is possible that 
> consumer purging would delete
> even the last message sent by a producer (i.e., all the messages sent by
> this producer have been consumed and committed), and as a result, the broker
> would delete that producer's ID. The next time when this producer tries to
> send, it will get this UNKNOWN_PRODUCER_ID error code, but in this case,
> this error is retriable: the producer would just get a new producer id and
> retries, and then this time it will succeed. 
>  
> Possible fixes could be on the broker side, i.e., delaying the deletion of 
> the produderIDs for a more extended period or on the streams side developing 
> a more conservative approach to deleting offsets from repartition topics
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7128) Lagging high watermark can lead to committed data loss after ISR expansion

2018-07-25 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556277#comment-16556277
 ] 

Dong Lin commented on KAFKA-7128:
-

BTW, if we want to make sure there is no message loss, it may be worthwhile to 
work https://issues.apache.org/jira/browse/KAFKA-7040 as well.

> Lagging high watermark can lead to committed data loss after ISR expansion
> --
>
> Key: KAFKA-7128
> URL: https://issues.apache.org/jira/browse/KAFKA-7128
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Anna Povzner
>Priority: Major
>
> Some model checking exposed a weakness in the ISR expansion logic. We know 
> that the high watermark can go backwards after a leader failover, but we may 
> not have known that this can lead to the loss of committed data.
> Say we have three replicas: r1, r2, and r3. Initially, the ISR consists of 
> (r1, r2) and the leader is r1. r3 is a new replica which has not begun 
> fetching. The data up to offset 10 has been committed to the ISR. Here is the 
> initial state:
> State 1
> ISR: (r1, r2)
>  Leader: r1
>  r1: [hw=10, leo=10]
>  r2: [hw=5, leo=10]
>  r3: [hw=0, leo=0]
> Replica 1 then initiates shutdown (or fails) and leaves the ISR, which makes 
> r2 the new leader. The high watermark is still lagging r1.
> State 2
> ISR: (r2)
>  Leader: r2
>  r1 (offline): [hw=10, leo=10]
>  r2: [hw=5, leo=10]
>  r3: [hw=0, leo=0]
> Replica 3 then catch up to the high watermark on r2 and joins the ISR. 
> Perhaps it's high watermark is lagging behind r2, but this is unimportant.
> State 3
> ISR: (r2, r3)
>  Leader: r2
>  r1 (offline): [hw=10, leo=10]
>  r2: [hw=5, leo=10]
>  r3: [hw=0, leo=5]
> Now r2 fails and r3 is elected leader and is the only member of the ISR. The 
> committed data from offsets 5 to 10 has been lost.
> State 4
> ISR: (r3)
>  Leader: r3
>  r1 (offline): [hw=10, leo=10]
>  r2 (offline): [hw=5, leo=10]
>  r3: [hw=0, leo=5]
> The bug is the fact that we allowed r3 into the ISR after the local high 
> watermark had been reached. Since the follower does not know the true high 
> watermark for the previous leader's epoch, it should not allow a replica to 
> join the ISR until it has caught up to an offset within its own epoch.
> Note this is related to 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-207%3A+Offsets+returned+by+ListOffsetsResponse+should+be+monotonically+increasing+even+during+a+partition+leader+change]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7128) Lagging high watermark can lead to committed data loss after ISR expansion

2018-07-25 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556274#comment-16556274
 ] 

Dong Lin commented on KAFKA-7128:
-

Thanks for the explanation [~hachikuji].

I slightly modified the description to provide state for each step for ease of 
discussion. It is not obvious from the description yet whether state 3 is 
possible. In order to move from state 2 to state 3, LEO in replica 3 needs to 
increase from 0 to 5, which means replica 3 needs to receive a FetchResponse 
containing messages up to offset 5. Since the hw in the FetchResponse should be 
5, should replica 3's hw be increased to 5 as well?

> Lagging high watermark can lead to committed data loss after ISR expansion
> --
>
> Key: KAFKA-7128
> URL: https://issues.apache.org/jira/browse/KAFKA-7128
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Anna Povzner
>Priority: Major
>
> Some model checking exposed a weakness in the ISR expansion logic. We know 
> that the high watermark can go backwards after a leader failover, but we may 
> not have known that this can lead to the loss of committed data.
> Say we have three replicas: r1, r2, and r3. Initially, the ISR consists of 
> (r1, r2) and the leader is r1. r3 is a new replica which has not begun 
> fetching. The data up to offset 10 has been committed to the ISR. Here is the 
> initial state:
> State 1
> ISR: (r1, r2)
>  Leader: r1
>  r1: [hw=10, leo=10]
>  r2: [hw=5, leo=10]
>  r3: [hw=0, leo=0]
> Replica 1 then initiates shutdown (or fails) and leaves the ISR, which makes 
> r2 the new leader. The high watermark is still lagging r1.
> State 2
> ISR: (r2)
>  Leader: r2
>  r1 (offline): [hw=10, leo=10]
>  r2: [hw=5, leo=10]
>  r3: [hw=0, leo=0]
> Replica 3 then catch up to the high watermark on r2 and joins the ISR. 
> Perhaps it's high watermark is lagging behind r2, but this is unimportant.
> State 3
> ISR: (r2, r3)
>  Leader: r2
>  r1 (offline): [hw=10, leo=10]
>  r2: [hw=5, leo=10]
>  r3: [hw=0, leo=5]
> Now r2 fails and r3 is elected leader and is the only member of the ISR. The 
> committed data from offsets 5 to 10 has been lost.
> State 4
> ISR: (r3)
>  Leader: r3
>  r1 (offline): [hw=10, leo=10]
>  r2 (offline): [hw=5, leo=10]
>  r3: [hw=0, leo=5]
> The bug is the fact that we allowed r3 into the ISR after the local high 
> watermark had been reached. Since the follower does not know the true high 
> watermark for the previous leader's epoch, it should not allow a replica to 
> join the ISR until it has caught up to an offset within its own epoch.
> Note this is related to 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-207%3A+Offsets+returned+by+ListOffsetsResponse+should+be+monotonically+increasing+even+during+a+partition+leader+change]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7128) Lagging high watermark can lead to committed data loss after ISR expansion

2018-07-25 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7128:

Description: 
Some model checking exposed a weakness in the ISR expansion logic. We know that 
the high watermark can go backwards after a leader failover, but we may not 
have known that this can lead to the loss of committed data.

Say we have three replicas: r1, r2, and r3. Initially, the ISR consists of (r1, 
r2) and the leader is r1. r3 is a new replica which has not begun fetching. The 
data up to offset 10 has been committed to the ISR. Here is the initial state:

State 1
ISR: (r1, r2)
 Leader: r1
 r1: [hw=10, leo=10]
 r2: [hw=5, leo=10]
 r3: [hw=0, leo=0]

Replica 1 then initiates shutdown (or fails) and leaves the ISR, which makes r2 
the new leader. The high watermark is still lagging r1.

State 2
ISR: (r2)
 Leader: r2
 r1 (offline): [hw=10, leo=10]
 r2: [hw=5, leo=10]
 r3: [hw=0, leo=0]

Replica 3 then catch up to the high watermark on r2 and joins the ISR. Perhaps 
it's high watermark is lagging behind r2, but this is unimportant.

State 3
ISR: (r2, r3)
 Leader: r2
 r1 (offline): [hw=10, leo=10]
 r2: [hw=5, leo=10]
 r3: [hw=0, leo=5]

Now r2 fails and r3 is elected leader and is the only member of the ISR. The 
committed data from offsets 5 to 10 has been lost.

State 4
ISR: (r3)
 Leader: r3
 r1 (offline): [hw=10, leo=10]
 r2 (offline): [hw=5, leo=10]
 r3: [hw=0, leo=5]

The bug is the fact that we allowed r3 into the ISR after the local high 
watermark had been reached. Since the follower does not know the true high 
watermark for the previous leader's epoch, it should not allow a replica to 
join the ISR until it has caught up to an offset within its own epoch.

Note this is related to 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-207%3A+Offsets+returned+by+ListOffsetsResponse+should+be+monotonically+increasing+even+during+a+partition+leader+change]

  was:
Some model checking exposed a weakness in the ISR expansion logic. We know that 
the high watermark can go backwards after a leader failover, but we may not 
have known that this can lead to the loss of committed data. 

Say we have three replicas: r1, r2, and r3. Initially, the ISR consists of (r1, 
r2) and the leader is r1. r3 is a new replica which has not begun fetching. The 
data up to offset 10 has been committed to the ISR. Here is the initial state:

ISR: (r1, r2)
Leader: r1
r1: [hw=10, leo=10]
r2: [hw=5, leo=10]
r3: [hw=0, leo=0]

Replica 1 then initiates shutdown (or fails) and leaves the ISR, which makes r2 
the new leader. The high watermark is still lagging r1.

ISR: (r2)
Leader: r2
r1 (offline): [hw=10, leo=10]
r2: [hw=5, leo=10]
r3: [hw=0, leo=0]

Replica 3 then catch up to the high watermark on r2 and joins the ISR. Perhaps 
it's high watermark is lagging behind r2, but this is unimportant.

ISR: (r2, r3)
Leader: r2
r1 (offline): [hw=10, leo=10]
r2: [hw=5, leo=10]
r3: [hw=0, leo=5]

Now r2 fails and r3 is elected leader and is the only member of the ISR. The 
committed data from offsets 5 to 10 has been lost.

ISR: (r3)
Leader: r3
r1 (offline): [hw=10, leo=10]
r2 (offline): [hw=5, leo=10]
r3: [hw=0, leo=5]

The bug is the fact that we allowed r3 into the ISR after the local high 
watermark had been reached. Since the follower does not know the true high 
watermark for the previous leader's epoch, it should not allow a replica to 
join the ISR until it has caught up to an offset within its own epoch. 

Note this is related to 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-207%3A+Offsets+returned+by+ListOffsetsResponse+should+be+monotonically+increasing+even+during+a+partition+leader+change


> Lagging high watermark can lead to committed data loss after ISR expansion
> --
>
> Key: KAFKA-7128
> URL: https://issues.apache.org/jira/browse/KAFKA-7128
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Anna Povzner
>Priority: Major
>
> Some model checking exposed a weakness in the ISR expansion logic. We know 
> that the high watermark can go backwards after a leader failover, but we may 
> not have known that this can lead to the loss of committed data.
> Say we have three replicas: r1, r2, and r3. Initially, the ISR consists of 
> (r1, r2) and the leader is r1. r3 is a new replica which has not begun 
> fetching. The data up to offset 10 has been committed to the ISR. Here is the 
> initial state:
> State 1
> ISR: (r1, r2)
>  Leader: r1
>  r1: [hw=10, leo=10]
>  r2: [hw=5, leo=10]
>  r3: [hw=0, leo=0]
> Replica 1 then initiates shutdown (or fails) and leaves the ISR, which makes 
> r2 the new leader. The high watermark is still lagging r1.
> State 2
> ISR: (r2)
>  Leader: r2
>  r1 (offline): [hw=10, leo=10]
>  r2: [hw=5, leo=10]
>  r3: [hw=0, 

[jira] [Comment Edited] (KAFKA-7190) Under low traffic conditions purging repartition topics cause WARN statements about UNKNOWN_PRODUCER_ID

2018-07-25 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556053#comment-16556053
 ] 

Dong Lin edited comment on KAFKA-7190 at 7/25/18 5:53 PM:
--

[~mjsax] I guess one question is, if LSO is defined for a partition, will 
consumer be able to consume beyond this offset, e.g. in read-uncommitted mode? 
If it is possible, then it seems that we should allow user to be able to delete 
messages beyond LSO. Since consumer may have already consumed it, it does not 
unnecessarily cause data loss if message beyond LSO is deleted. If it is 
impossible, then I agree we should prevent user from deleting beyond LSO. What 
do you think?


was (Author: lindong):
[~mjsax] I guess one question is, if LSO is defined for a partition, will 
consumer be able to consume beyond this offset, e.g. in read-uncommitted mode? 
If it is possible, then it seems that we should allow user to be able to delete 
messages beyond LSO. Since consumer may have already consumed it, it does not 
unnecessarily cause data loss if message beyond LSO is deleted. If it is 
impossible, then I agree we prevent user from deleting beyond LSO. What do you 
think?

> Under low traffic conditions purging repartition topics cause WARN statements 
> about  UNKNOWN_PRODUCER_ID 
> -
>
> Key: KAFKA-7190
> URL: https://issues.apache.org/jira/browse/KAFKA-7190
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, streams
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Bill Bejeck
>Assignee: lambdaliu
>Priority: Major
>
> When a streams application has little traffic, then it is possible that 
> consumer purging would delete
> even the last message sent by a producer (i.e., all the messages sent by
> this producer have been consumed and committed), and as a result, the broker
> would delete that producer's ID. The next time when this producer tries to
> send, it will get this UNKNOWN_PRODUCER_ID error code, but in this case,
> this error is retriable: the producer would just get a new producer id and
> retries, and then this time it will succeed. 
>  
> Possible fixes could be on the broker side, i.e., delaying the deletion of 
> the produderIDs for a more extended period or on the streams side developing 
> a more conservative approach to deleting offsets from repartition topics
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7190) Under low traffic conditions purging repartition topics cause WARN statements about UNKNOWN_PRODUCER_ID

2018-07-25 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556053#comment-16556053
 ] 

Dong Lin commented on KAFKA-7190:
-

 

 

[~mjsax] I guess one question is, if LSO is defined for a partition, will 
consumer be able to consume beyond this offset, e.g. in read-uncommitted mode? 
If it is possible, then it seems that we should allow user to be able to delete 
messages beyond LSO. Since consumer may have already consumed it, it does not 
unnecessarily cause data loss if message beyond LSO is deleted. If it is 
impossible, then I agree we prevent user from deleting beyond LSO. What do you 
think?

 

 

> Under low traffic conditions purging repartition topics cause WARN statements 
> about  UNKNOWN_PRODUCER_ID 
> -
>
> Key: KAFKA-7190
> URL: https://issues.apache.org/jira/browse/KAFKA-7190
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, streams
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Bill Bejeck
>Assignee: lambdaliu
>Priority: Major
>
> When a streams application has little traffic, then it is possible that 
> consumer purging would delete
> even the last message sent by a producer (i.e., all the messages sent by
> this producer have been consumed and committed), and as a result, the broker
> would delete that producer's ID. The next time when this producer tries to
> send, it will get this UNKNOWN_PRODUCER_ID error code, but in this case,
> this error is retriable: the producer would just get a new producer id and
> retries, and then this time it will succeed. 
>  
> Possible fixes could be on the broker side, i.e., delaying the deletion of 
> the produderIDs for a more extended period or on the streams side developing 
> a more conservative approach to deleting offsets from repartition topics
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7190) Under low traffic conditions purging repartition topics cause WARN statements about UNKNOWN_PRODUCER_ID

2018-07-25 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556053#comment-16556053
 ] 

Dong Lin edited comment on KAFKA-7190 at 7/25/18 5:52 PM:
--

[~mjsax] I guess one question is, if LSO is defined for a partition, will 
consumer be able to consume beyond this offset, e.g. in read-uncommitted mode? 
If it is possible, then it seems that we should allow user to be able to delete 
messages beyond LSO. Since consumer may have already consumed it, it does not 
unnecessarily cause data loss if message beyond LSO is deleted. If it is 
impossible, then I agree we prevent user from deleting beyond LSO. What do you 
think?


was (Author: lindong):
 

 

[~mjsax] I guess one question is, if LSO is defined for a partition, will 
consumer be able to consume beyond this offset, e.g. in read-uncommitted mode? 
If it is possible, then it seems that we should allow user to be able to delete 
messages beyond LSO. Since consumer may have already consumed it, it does not 
unnecessarily cause data loss if message beyond LSO is deleted. If it is 
impossible, then I agree we prevent user from deleting beyond LSO. What do you 
think?

 

 

> Under low traffic conditions purging repartition topics cause WARN statements 
> about  UNKNOWN_PRODUCER_ID 
> -
>
> Key: KAFKA-7190
> URL: https://issues.apache.org/jira/browse/KAFKA-7190
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, streams
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Bill Bejeck
>Assignee: lambdaliu
>Priority: Major
>
> When a streams application has little traffic, then it is possible that 
> consumer purging would delete
> even the last message sent by a producer (i.e., all the messages sent by
> this producer have been consumed and committed), and as a result, the broker
> would delete that producer's ID. The next time when this producer tries to
> send, it will get this UNKNOWN_PRODUCER_ID error code, but in this case,
> this error is retriable: the producer would just get a new producer id and
> retries, and then this time it will succeed. 
>  
> Possible fixes could be on the broker side, i.e., delaying the deletion of 
> the produderIDs for a more extended period or on the streams side developing 
> a more conservative approach to deleting offsets from repartition topics
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7190) Under low traffic conditions purging repartition topics cause WARN statements about UNKNOWN_PRODUCER_ID

2018-07-25 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556011#comment-16556011
 ] 

Dong Lin commented on KAFKA-7190:
-

[~guozhang] Certainly, I am happy to make the modification after we agree on 
the solution.

Here is my thought regarding whether we should allow client to delete beyond 
last stable offset. The main reason we need to prevent client from deleting 
beyond HW is because otherwise follower may receive OutOfRangeException and the 
broker logic may be messed up. Is there similar concern in the broker if we 
delete beyond LSO?

If the goal of not deleting beyond LSO is to make sure that messages can be 
exposed to consumers before being deleted, then I am not sure if this change is 
justified. My understanding is that we currently rely on user to make sure that 
messages have been consumed by all consumer groups which need the message, 
before user calls deleteRecords() to delete the message. If user does follow 
this rule, then user will not delete beyond LSO regardless of the constraint in 
the broker. If user does not follow this rule, then even with the extra 
protection in the broker, user can still delete the messages before the LSO 
that has not been consumed yet. Does this make sense?

 

> Under low traffic conditions purging repartition topics cause WARN statements 
> about  UNKNOWN_PRODUCER_ID 
> -
>
> Key: KAFKA-7190
> URL: https://issues.apache.org/jira/browse/KAFKA-7190
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, streams
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Bill Bejeck
>Assignee: lambdaliu
>Priority: Major
>
> When a streams application has little traffic, then it is possible that 
> consumer purging would delete
> even the last message sent by a producer (i.e., all the messages sent by
> this producer have been consumed and committed), and as a result, the broker
> would delete that producer's ID. The next time when this producer tries to
> send, it will get this UNKNOWN_PRODUCER_ID error code, but in this case,
> this error is retriable: the producer would just get a new producer id and
> retries, and then this time it will succeed. 
>  
> Possible fixes could be on the broker side, i.e., delaying the deletion of 
> the produderIDs for a more extended period or on the streams side developing 
> a more conservative approach to deleting offsets from repartition topics
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7196) Remove heartbeat delayed operation for those removed consumers at the end of each rebalance

2018-07-23 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7196:

Fix Version/s: 2.1.0

> Remove heartbeat delayed operation for those removed consumers at the end of 
> each rebalance
> ---
>
> Key: KAFKA-7196
> URL: https://issues.apache.org/jira/browse/KAFKA-7196
> Project: Kafka
>  Issue Type: Bug
>  Components: core, purgatory
>Reporter: Lincong Li
>Assignee: Lincong Li
>Priority: Minor
> Fix For: 2.1.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> During the consumer group rebalance, when the joining group phase finishes, 
> the heartbeat delayed operation of the consumer that fails to rejoin the 
> group should be removed from the purgatory. Otherwise, even though the member 
> ID of the consumer has been removed from the group, its heartbeat delayed 
> operation is still registered in the purgatory and the heartbeat delayed 
> operation is going to timeout and then another unnecessary rebalance is 
> triggered because of it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7196) Remove heartbeat delayed operation for those removed consumers at the end of each rebalance

2018-07-23 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin reassigned KAFKA-7196:
---

Assignee: Lincong Li

> Remove heartbeat delayed operation for those removed consumers at the end of 
> each rebalance
> ---
>
> Key: KAFKA-7196
> URL: https://issues.apache.org/jira/browse/KAFKA-7196
> Project: Kafka
>  Issue Type: Bug
>  Components: core, purgatory
>Reporter: Lincong Li
>Assignee: Lincong Li
>Priority: Minor
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> During the consumer group rebalance, when the joining group phase finishes, 
> the heartbeat delayed operation of the consumer that fails to rejoin the 
> group should be removed from the purgatory. Otherwise, even though the member 
> ID of the consumer has been removed from the group, its heartbeat delayed 
> operation is still registered in the purgatory and the heartbeat delayed 
> operation is going to timeout and then another unnecessary rebalance is 
> triggered because of it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7191) Add sensors for NumOfflineThread, FetchRequestRate and FetchRequestLocalTime in the follower broker

2018-07-22 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-7191:
---

 Summary: Add sensors for NumOfflineThread, FetchRequestRate and 
FetchRequestLocalTime in the follower broker
 Key: KAFKA-7191
 URL: https://issues.apache.org/jira/browse/KAFKA-7191
 Project: Kafka
  Issue Type: Improvement
Reporter: Dong Lin
Assignee: Dong Lin


It will be useful to have NumOfflineThread to monitor the number of offline 
thread (e.g. ReplicaFetcherThread) in the broker so that system admin can be 
alerted when there is offline thread.

And we also need metrics for FetchRequestRate and FetchRequestLocalTime in the 
follower broker to monitor and debug the data replication performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7152) replica should be in-sync if its LEO equals leader's LEO

2018-07-22 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16552112#comment-16552112
 ] 

Dong Lin commented on KAFKA-7152:
-

I realized one more point. In the common case, if replica fetcher thread for 
broker A has died on broker B, it is probably some partitions that still 
actively produced to and owned by this fetcher thread. In this case ISR for 
some partitions will shrink which can indicate problem in the cluster.

> replica should be in-sync if its LEO equals leader's LEO
> 
>
> Key: KAFKA-7152
> URL: https://issues.apache.org/jira/browse/KAFKA-7152
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Major
> Fix For: 2.1.0
>
>
> Currently a replica will be moved out of ISR if follower has not fetched from 
> leader for 10 sec (default replica.lag.time.max.ms). This cases problem in 
> the following scenario:
> Say follower's ReplicaFetchThread needs to fetch 2k partitions from the 
> leader broker. Only 100 out of 2k partitions are actively being produced to 
> and therefore the total bytes in rate for those 2k partitions are small. The 
> following will happen:
>  
> 1) The follower's ReplicaFetcherThread sends FetchRequest for those 2k 
> partitions.
> 2) Because the total bytes-in-rate for those 2k partitions is very small, 
> follower is able to catch up and leader broker adds these 2k partitions to 
> ISR. Follower's lastCaughtUpTimeMs for all partitions are updated to the 
> current time T0.
> 3) Since follower has caught up for all 2k partitions, leader updates 2k 
> partition znodes to include the follower in the ISR. It may take 20 seconds 
> to write 2k partition znodes if each znode write operation takes 10 ms.
> 4) At T0 + 15, maybeShrinkIsr() is invoked on leader broker. Since there is 
> no FetchRequet from the follower for more than 10 seconds after T0, all those 
> 2k partitions will be considered as out of syn and the follower will be 
> removed from ISR.
> 5) The follower receives FetchResponse at least 20 seconds after T0. That 
> means the next FetchRequest from follower to leader will be after T0 + 20.
> The sequence of events described above will loop over time. There will be 
> constant churn of URP in the cluster even if follower can catch up with 
> leader's byte-in-rate. This reduces the cluster availability.
>  
> In order to address this problem, one simple approach is to keep follower in 
> the ISR as long as follower's LEO equals leader's LEO regardless of 
> follower's lastCaughtUpTimeMs. This is particularly useful if there are a lot 
> of inactive partitions in the cluster.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7152) replica should be in-sync if its LEO equals leader's LEO

2018-07-22 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16552105#comment-16552105
 ] 

Dong Lin edited comment on KAFKA-7152 at 7/22/18 6:06 PM:
--

Thanks for the clarification [~ijuma]. Yeah I understand immediate result of 
this patch, i.e. ISR won't shrink if a topic is not produced often and a 
replica thread dies. But I am trying to understand what is the concern with 
this behavior.

Let's assume replica.lag.time.max.ms uses the default value of 10 sec. In this 
case maybeShrinkIsr() will be called every 5 sec. We can assume for now that 
everything else is fast and take negligible time.

Regarding 1, you are right for a ProduceRequest which is sent long after the 
thread dies, the produce delay (and end-to-end latency) will be higher if we 
don't shrink ISR. More specifically, if we shrink ISR, leader won't wait for 
the problematic follower and ProduceRequest can succeed with 0 sec delay, 
whereas if we don't shrink ISRE, the ProduceRequest can succeed within 5 sec 
because ISR will be shrunk immediately when maybeShrinkIsr() is called.

But if we look at arbitrary ProduceRequest sent by the user, this would include 
the ProduceRequest that can potentially be sent immediately after the thread 
dies. For such ProduceRequest, the max ProduceRequest delay would also be 5 
sec. So the overall max produce delay (and end-to-end latency) does not change 
with this patch. And since only the first ProduceRequest may be affected, it 
probably does not affect the 99th and average produce delay. So this patch 
probably does not negatively affect the overall user experience regarding 
produce delay.

Also, I personally feel that ReplicaFetcherThread will die only if there is 
bug. In this case, it is probably not a big deal to have extra 5 seconds delay, 
given that there maybe bigger issue caused by bug, partition has been inactive 
for much longer than 5 seconds, and it only affects the first ProduceRequest.

 

Regarding 2, yes we should be able to monitor whether the ReplicaFetcherThread 
is running. It can currently be achieved by monitoring ISR but it is probably 
not a good way for this purpose. In general when ISR is shrink, system admin 
does not directly know whether it is because lag or because 
ReplicaFetcherThread dies, which means it can take a long time before system 
admin takes action. And it does not directly tell which broker has the issue.

 

In order to solver this problem, maybe it is better to add metrics in the 
broker to monitor the NumOfflineThread, FetchRequestRate and 
FetchRequestLocalTime in the follower (not the leader). The NumOfflineThread 
should increase whenever a thread has died abnormally in the broker. And system 
admin can be alerted if this metric increases above 0. The other two metrics 
can very be useful in debugging replication performance issue in the follower.

 

So it seems that in the short term this patch won't change the performance 
guarantee (e.g. max, average, 99th produce delay) for user. And it will 
slightly increase the produce delay when there is bug Kafka. And in the long 
term we probably want to add more metrics in Kafka to detect such bug and let 
system admin proactively resolves the issue. Does this sound good?

 

 

 

 


was (Author: lindong):
Thanks for the clarification [~ijuma]. Yeah I understand immediate result of 
this patch, i.e. ISR won't if a topic that is not produced often and a replica 
thread dies. But I am trying to understand what is the concern with this 
behavior.

Let's assume replica.lag.time.max.ms uses the default value of 10 sec. In this 
case maybeShrinkIsr() will be called every 5 sec. We can assume for now that 
everything else is fast and take negligible time.

Regarding 1, you are right for a ProduceRequest which is sent long after the 
thread dies, the produce delay (and end-to-end latency) will be higher if we 
don't shrink ISR. More specifically, if we shrink ISR, leader won't wait for 
the problematic follower and ProduceRequest can succeed with 0 sec delay, 
whereas if we don't shrink ISRE, the ProduceRequest can succeed within 5 sec 
because ISR will be shrunk immediately when maybeShrinkIsr() is called.

But if we look at arbitrary ProduceRequest sent by the user, this would include 
the ProduceRequest that can potentially be sent immediately after the thread 
dies. For such ProduceRequest, the max ProduceRequest delay would also be 5 
sec. So the overall max produce delay (and end-to-end latency) does not change 
with this patch. And since only the first ProduceRequest may be affected, it 
probably does not affect the 99th and average produce delay. So this patch 
probably does not negatively affect the overall user experience regarding 
produce delay.

Also, I personally feel that ReplicaFetcherThread will die only if there is 
bug. In this case, it is probably not a big deal to 

[jira] [Commented] (KAFKA-7152) replica should be in-sync if its LEO equals leader's LEO

2018-07-22 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16552110#comment-16552110
 ] 

Dong Lin commented on KAFKA-7152:
-

Hey [~junrao], I forgot to reply to the 2nd part of the point 3 earlier. Yes 
this issue can also be addressed by increasing the replica.lag.time.max.ms to 
20+ seconds. It is just that this approach can increase the max produce delay 
when there is slow or offline follower. So it will be better if the patch 
currently proposed can address the problem with less or no concern.

> replica should be in-sync if its LEO equals leader's LEO
> 
>
> Key: KAFKA-7152
> URL: https://issues.apache.org/jira/browse/KAFKA-7152
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Major
> Fix For: 2.1.0
>
>
> Currently a replica will be moved out of ISR if follower has not fetched from 
> leader for 10 sec (default replica.lag.time.max.ms). This cases problem in 
> the following scenario:
> Say follower's ReplicaFetchThread needs to fetch 2k partitions from the 
> leader broker. Only 100 out of 2k partitions are actively being produced to 
> and therefore the total bytes in rate for those 2k partitions are small. The 
> following will happen:
>  
> 1) The follower's ReplicaFetcherThread sends FetchRequest for those 2k 
> partitions.
> 2) Because the total bytes-in-rate for those 2k partitions is very small, 
> follower is able to catch up and leader broker adds these 2k partitions to 
> ISR. Follower's lastCaughtUpTimeMs for all partitions are updated to the 
> current time T0.
> 3) Since follower has caught up for all 2k partitions, leader updates 2k 
> partition znodes to include the follower in the ISR. It may take 20 seconds 
> to write 2k partition znodes if each znode write operation takes 10 ms.
> 4) At T0 + 15, maybeShrinkIsr() is invoked on leader broker. Since there is 
> no FetchRequet from the follower for more than 10 seconds after T0, all those 
> 2k partitions will be considered as out of syn and the follower will be 
> removed from ISR.
> 5) The follower receives FetchResponse at least 20 seconds after T0. That 
> means the next FetchRequest from follower to leader will be after T0 + 20.
> The sequence of events described above will loop over time. There will be 
> constant churn of URP in the cluster even if follower can catch up with 
> leader's byte-in-rate. This reduces the cluster availability.
>  
> In order to address this problem, one simple approach is to keep follower in 
> the ISR as long as follower's LEO equals leader's LEO regardless of 
> follower's lastCaughtUpTimeMs. This is particularly useful if there are a lot 
> of inactive partitions in the cluster.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7152) replica should be in-sync if its LEO equals leader's LEO

2018-07-22 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16552105#comment-16552105
 ] 

Dong Lin commented on KAFKA-7152:
-

Thanks for the clarification [~ijuma]. Yeah I understand immediate result of 
this patch, i.e. ISR won't if a topic that is not produced often and a replica 
thread dies. But I am trying to understand what is the concern with this 
behavior.

Let's assume replica.lag.time.max.ms uses the default value of 10 sec. In this 
case maybeShrinkIsr() will be called every 5 sec. We can assume for now that 
everything else is fast and take negligible time.

Regarding 1, you are right for a ProduceRequest which is sent long after the 
thread dies, the produce delay (and end-to-end latency) will be higher if we 
don't shrink ISR. More specifically, if we shrink ISR, leader won't wait for 
the problematic follower and ProduceRequest can succeed with 0 sec delay, 
whereas if we don't shrink ISRE, the ProduceRequest can succeed within 5 sec 
because ISR will be shrunk immediately when maybeShrinkIsr() is called.

But if we look at arbitrary ProduceRequest sent by the user, this would include 
the ProduceRequest that can potentially be sent immediately after the thread 
dies. For such ProduceRequest, the max ProduceRequest delay would also be 5 
sec. So the overall max produce delay (and end-to-end latency) does not change 
with this patch. And since only the first ProduceRequest may be affected, it 
probably does not affect the 99th and average produce delay. So this patch 
probably does not negatively affect the overall user experience regarding 
produce delay.

Also, I personally feel that ReplicaFetcherThread will die only if there is 
bug. In this case, it is probably not a big deal to have extra 5 seconds delay, 
given that there maybe bigger issue caused by bug, partition has been inactive 
for much longer than 5 seconds, and it only affects the first ProduceRequest.

 

Regarding 2, yes we should be able to monitor whether the ReplicaFetcherThread 
is running. It can currently be achieved by monitoring ISR but it is probably 
not a good way for this purpose. In general when ISR is shrink, system admin 
does not directly know whether it is because lag or because 
ReplicaFetcherThread dies, which means it can take a long time before system 
admin takes action. And it does not directly tell which broker has the issue.

 

In order to solver this problem, maybe it is better to add metrics in the 
broker to monitor the NumOfflineThread, FetchRequestRate and 
FetchRequestLocalTime in the follower (not the leader). The NumOfflineThread 
should increase whenever a thread has died abnormally in the broker. And system 
admin can be alerted if this metric increases above 0. The other two metrics 
can very be useful in debugging replication performance issue in the follower.

 

So it seems that in the short term this patch won't change the performance 
guarantee (e.g. max, average, 99th produce delay) for user. And it will 
slightly increase the produce delay when there is bug Kafka. And in the long 
term we probably want to add more metrics in Kafka to detect such bug and let 
system admin proactively resolves the issue. Does this sound good?

 

 

 

 

> replica should be in-sync if its LEO equals leader's LEO
> 
>
> Key: KAFKA-7152
> URL: https://issues.apache.org/jira/browse/KAFKA-7152
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Major
> Fix For: 2.1.0
>
>
> Currently a replica will be moved out of ISR if follower has not fetched from 
> leader for 10 sec (default replica.lag.time.max.ms). This cases problem in 
> the following scenario:
> Say follower's ReplicaFetchThread needs to fetch 2k partitions from the 
> leader broker. Only 100 out of 2k partitions are actively being produced to 
> and therefore the total bytes in rate for those 2k partitions are small. The 
> following will happen:
>  
> 1) The follower's ReplicaFetcherThread sends FetchRequest for those 2k 
> partitions.
> 2) Because the total bytes-in-rate for those 2k partitions is very small, 
> follower is able to catch up and leader broker adds these 2k partitions to 
> ISR. Follower's lastCaughtUpTimeMs for all partitions are updated to the 
> current time T0.
> 3) Since follower has caught up for all 2k partitions, leader updates 2k 
> partition znodes to include the follower in the ISR. It may take 20 seconds 
> to write 2k partition znodes if each znode write operation takes 10 ms.
> 4) At T0 + 15, maybeShrinkIsr() is invoked on leader broker. Since there is 
> no FetchRequet from the follower for more than 10 seconds after T0, all those 
> 2k partitions will be considered as out of syn and the follower 

[jira] [Commented] (KAFKA-7152) replica should be in-sync if its LEO equals leader's LEO

2018-07-21 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16551868#comment-16551868
 ] 

Dong Lin commented on KAFKA-7152:
-

Thanks for your detailed comment Jun.

Regarding the 2nd part of 1, in the event that the follower replica fetcher 
thread has died, the replica will only stay in ISR if the follower replica LEO 
== leader replica LEO. So it is probably safe. Would we have new issue here?

Regarding 1st part of 1 and also 2, the current patch addresses the scenario 
that all topics byte in rate is slow but number of partitions is high, which 
can further be split into two scenarios. One scenario is that most partitions 
are inactive, which is fixed by this patch. The other scenario is that all 
partitions are active and each partition has constant extremely low traffic. 
This one is not handled but the patch but it seems like less common than the 
previous one. So it looks like this patch can fixed an important subset of the 
problem. I agree that we need to look into asyc ZK for ISR update to fix the 
latter scenario.

Regarding 3, the issue was observed in a small cluster that has large number of 
inactive partitions. I have kind of made up the number in the description as I 
feel these numbers are probably sufficient to explain the problem and may be on 
the same order of the magnitude of the real number.



> replica should be in-sync if its LEO equals leader's LEO
> 
>
> Key: KAFKA-7152
> URL: https://issues.apache.org/jira/browse/KAFKA-7152
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Major
> Fix For: 2.1.0
>
>
> Currently a replica will be moved out of ISR if follower has not fetched from 
> leader for 10 sec (default replica.lag.time.max.ms). This cases problem in 
> the following scenario:
> Say follower's ReplicaFetchThread needs to fetch 2k partitions from the 
> leader broker. Only 100 out of 2k partitions are actively being produced to 
> and therefore the total bytes in rate for those 2k partitions are small. The 
> following will happen:
>  
> 1) The follower's ReplicaFetcherThread sends FetchRequest for those 2k 
> partitions.
> 2) Because the total bytes-in-rate for those 2k partitions is very small, 
> follower is able to catch up and leader broker adds these 2k partitions to 
> ISR. Follower's lastCaughtUpTimeMs for all partitions are updated to the 
> current time T0.
> 3) Since follower has caught up for all 2k partitions, leader updates 2k 
> partition znodes to include the follower in the ISR. It may take 20 seconds 
> to write 2k partition znodes if each znode write operation takes 10 ms.
> 4) At T0 + 15, maybeShrinkIsr() is invoked on leader broker. Since there is 
> no FetchRequet from the follower for more than 10 seconds after T0, all those 
> 2k partitions will be considered as out of syn and the follower will be 
> removed from ISR.
> 5) The follower receives FetchResponse at least 20 seconds after T0. That 
> means the next FetchRequest from follower to leader will be after T0 + 20.
> The sequence of events described above will loop over time. There will be 
> constant churn of URP in the cluster even if follower can catch up with 
> leader's byte-in-rate. This reduces the cluster availability.
>  
> In order to address this problem, one simple approach is to keep follower in 
> the ISR as long as follower's LEO equals leader's LEO regardless of 
> follower's lastCaughtUpTimeMs. This is particularly useful if there are a lot 
> of inactive partitions in the cluster.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7152) replica should be in-sync if its LEO equals leader's LEO

2018-07-21 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16551851#comment-16551851
 ] 

Dong Lin commented on KAFKA-7152:
-

[~ijuma] Kafka at LinkedIn is not using asynchronous zookeeper client yet so it 
is not clear whether the problem exists in 1.1. In general we probably want the 
resource usage in the broker to be proportional to the traffic from user and 
avoid spending resource on partitions that are inactive.

> replica should be in-sync if its LEO equals leader's LEO
> 
>
> Key: KAFKA-7152
> URL: https://issues.apache.org/jira/browse/KAFKA-7152
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Major
> Fix For: 2.1.0
>
>
> Currently a replica will be moved out of ISR if follower has not fetched from 
> leader for 10 sec (default replica.lag.time.max.ms). This cases problem in 
> the following scenario:
> Say follower's ReplicaFetchThread needs to fetch 2k partitions from the 
> leader broker. Only 100 out of 2k partitions are actively being produced to 
> and therefore the total bytes in rate for those 2k partitions are small. The 
> following will happen:
>  
> 1) The follower's ReplicaFetcherThread sends FetchRequest for those 2k 
> partitions.
> 2) Because the total bytes-in-rate for those 2k partitions is very small, 
> follower is able to catch up and leader broker adds these 2k partitions to 
> ISR. Follower's lastCaughtUpTimeMs for all partitions are updated to the 
> current time T0.
> 3) Since follower has caught up for all 2k partitions, leader updates 2k 
> partition znodes to include the follower in the ISR. It may take 20 seconds 
> to write 2k partition znodes if each znode write operation takes 10 ms.
> 4) At T0 + 15, maybeShrinkIsr() is invoked on leader broker. Since there is 
> no FetchRequet from the follower for more than 10 seconds after T0, all those 
> 2k partitions will be considered as out of syn and the follower will be 
> removed from ISR.
> 5) The follower receives FetchResponse at least 20 seconds after T0. That 
> means the next FetchRequest from follower to leader will be after T0 + 20.
> The sequence of events described above will loop over time. There will be 
> constant churn of URP in the cluster even if follower can catch up with 
> leader's byte-in-rate. This reduces the cluster availability.
>  
> In order to address this problem, one simple approach is to keep follower in 
> the ISR as long as follower's LEO equals leader's LEO regardless of 
> follower's lastCaughtUpTimeMs. This is particularly useful if there are a lot 
> of inactive partitions in the cluster.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7152) replica should be in-sync if its LEO equals leader's LEO

2018-07-21 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-7152.
-
Resolution: Fixed

> replica should be in-sync if its LEO equals leader's LEO
> 
>
> Key: KAFKA-7152
> URL: https://issues.apache.org/jira/browse/KAFKA-7152
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Major
> Fix For: 2.1.0
>
>
> Currently a replica will be moved out of ISR if follower has not fetched from 
> leader for 10 sec (default replica.lag.time.max.ms). This cases problem in 
> the following scenario:
> Say follower's ReplicaFetchThread needs to fetch 2k partitions from the 
> leader broker. Only 100 out of 2k partitions are actively being produced to 
> and therefore the total bytes in rate for those 2k partitions are small. The 
> following will happen:
>  
> 1) The follower's ReplicaFetcherThread sends FetchRequest for those 2k 
> partitions.
> 2) Because the total bytes-in-rate for those 2k partitions is very small, 
> follower is able to catch up and leader broker adds these 2k partitions to 
> ISR. Follower's lastCaughtUpTimeMs for all partitions are updated to the 
> current time T0.
> 3) Since follower has caught up for all 2k partitions, leader updates 2k 
> partition znodes to include the follower in the ISR. It may take 20 seconds 
> to write 2k partition znodes if each znode write operation takes 10 ms.
> 4) At T0 + 15, maybeShrinkIsr() is invoked on leader broker. Since there is 
> no FetchRequet from the follower for more than 10 seconds after T0, all those 
> 2k partitions will be considered as out of syn and the follower will be 
> removed from ISR.
> 5) The follower receives FetchResponse at least 20 seconds after T0. That 
> means the next FetchRequest from follower to leader will be after T0 + 20.
> The sequence of events described above will loop over time. There will be 
> constant churn of URP in the cluster even if follower can catch up with 
> leader's byte-in-rate. This reduces the cluster availability.
>  
> In order to address this problem, one simple approach is to keep follower in 
> the ISR as long as follower's LEO equals leader's LEO regardless of 
> follower's lastCaughtUpTimeMs. This is particularly useful if there are a lot 
> of inactive partitions in the cluster.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7177) Update 2.0 documentation to reflect changed quota behaviors by KIP-219

2018-07-21 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7177:

Priority: Minor  (was: Major)

> Update 2.0 documentation to reflect changed quota behaviors by KIP-219 
> ---
>
> Key: KAFKA-7177
> URL: https://issues.apache.org/jira/browse/KAFKA-7177
> Project: Kafka
>  Issue Type: Task
>  Components: documentation
>Reporter: Jon Lee
>Assignee: Jon Lee
>Priority: Minor
> Fix For: 2.1.0
>
>
> KIP-219 changed the way quota violation is communicated between clients and 
> brokers. Documentation should be updated accordingly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7177) Update 2.0 documentation to reflect changed quota behaviors by KIP-219

2018-07-21 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7177:

Fix Version/s: 2.1.0

> Update 2.0 documentation to reflect changed quota behaviors by KIP-219 
> ---
>
> Key: KAFKA-7177
> URL: https://issues.apache.org/jira/browse/KAFKA-7177
> Project: Kafka
>  Issue Type: Task
>  Components: documentation
>Reporter: Jon Lee
>Assignee: Jon Lee
>Priority: Major
> Fix For: 2.1.0
>
>
> KIP-219 changed the way quota violation is communicated between clients and 
> brokers. Documentation should be updated accordingly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7177) Update 2.0 documentation to reflect changed quota behaviors by KIP-219

2018-07-21 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-7177.
-
Resolution: Fixed

> Update 2.0 documentation to reflect changed quota behaviors by KIP-219 
> ---
>
> Key: KAFKA-7177
> URL: https://issues.apache.org/jira/browse/KAFKA-7177
> Project: Kafka
>  Issue Type: Task
>  Components: documentation
>Reporter: Jon Lee
>Assignee: Jon Lee
>Priority: Major
>
> KIP-219 changed the way quota violation is communicated between clients and 
> brokers. Documentation should be updated accordingly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


<    1   2   3   4   5   6   7   >