[jira] [Updated] (KAFKA-14151) Add validation to fail fast when base offsets are incorrectly assigned to batches

2022-08-09 Thread Dhruvil Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-14151:
-
Summary: Add validation to fail fast when base offsets are incorrectly 
assigned to batches  (was: Add additional validation to protect on-disk log 
segment data from being corrupted)

> Add validation to fail fast when base offsets are incorrectly assigned to 
> batches
> -
>
> Key: KAFKA-14151
> URL: https://issues.apache.org/jira/browse/KAFKA-14151
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Reporter: Vincent Jiang
>Priority: Major
>
> We saw a case where records with incorrect offsets were being written to log 
> segment on-disk data due to environmental issues (bug in old version JVM 
> JIT). We should consider adding additional validation to detect this scenario 
> and fail fast.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14151) Add additional validation to protect on-disk log segment data from being corrupted

2022-08-09 Thread Dhruvil Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-14151:
-
Description: We saw a case where records with incorrect offsets were being 
written to log segment on-disk data due to environmental issues (bug in old 
version JVM JIT). We should consider adding additional validation to detect 
this scenario and fail fast.  (was: We received escalations reporting bad 
records being written to log segment on-disk data due to environmental issues 
(bug in old version JVM jit).  We should consider adding additional validation 
to  protect the on-disk data from being corrupted by inadvertent bugs or 
environmental issues)

> Add additional validation to protect on-disk log segment data from being 
> corrupted
> --
>
> Key: KAFKA-14151
> URL: https://issues.apache.org/jira/browse/KAFKA-14151
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Reporter: Vincent Jiang
>Priority: Major
>
> We saw a case where records with incorrect offsets were being written to log 
> segment on-disk data due to environmental issues (bug in old version JVM 
> JIT). We should consider adding additional validation to detect this scenario 
> and fail fast.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-8522) Tombstones can survive forever

2021-09-20 Thread Dhruvil Shah (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17417849#comment-17417849
 ] 

Dhruvil Shah commented on KAFKA-8522:
-

The related PR is here for reference: https://github.com/apache/kafka/pull/10914

> Tombstones can survive forever
> --
>
> Key: KAFKA-8522
> URL: https://issues.apache.org/jira/browse/KAFKA-8522
> Project: Kafka
>  Issue Type: Improvement
>  Components: log cleaner
>Reporter: Evelyn Bayes
>Assignee: Richard Yu
>Priority: Minor
> Fix For: 3.1.0
>
>
> This is a bit grey zone as to whether it's a "bug" but it is certainly 
> unintended behaviour.
>  
> Under specific conditions tombstones effectively survive forever:
>  * Small amount of throughput;
>  * min.cleanable.dirty.ratio near or at 0; and
>  * Other parameters at default.
> What  happens is all the data continuously gets cycled into the oldest 
> segment. Old records get compacted away, but the new records continuously 
> update the timestamp of the oldest segment reseting the countdown for 
> deleting tombstones.
> So tombstones build up in the oldest segment forever.
>  
> While you could "fix" this by reducing the segment size, this can be 
> undesirable as a sudden change in throughput could cause a dangerous number 
> of segments to be created.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12520) Producer state is needlessly rebuilt on startup

2021-03-22 Thread Dhruvil Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah reassigned KAFKA-12520:


Assignee: Dhruvil Shah

> Producer state is needlessly rebuilt on startup
> ---
>
> Key: KAFKA-12520
> URL: https://issues.apache.org/jira/browse/KAFKA-12520
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Major
>
> When we find a {{.swap}} file on startup, we typically want to rename and 
> replace it as {{.log}}, {{.index}}, {{.timeindex}}, etc. as a way to complete 
> any ongoing replace operations. These swap files are usually known to have 
> been flushed to disk before the replace operation begins.
> One flaw in the current logic is that when we recover these swap files on 
> startup, we end up truncating the producer state and rebuild it from scratch. 
> This is unneeded as the replace operation does not mutate the producer state 
> by itself. It is only meant to replace the {{.log}} file along with 
> corresponding indices.
> Because of this unneeded producer state rebuild operation, we have seen 
> multi-hour startup times for clusters that have large compacted topics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12520) Producer state is needlessly rebuilt on startup

2021-03-22 Thread Dhruvil Shah (Jira)
Dhruvil Shah created KAFKA-12520:


 Summary: Producer state is needlessly rebuilt on startup
 Key: KAFKA-12520
 URL: https://issues.apache.org/jira/browse/KAFKA-12520
 Project: Kafka
  Issue Type: Bug
Reporter: Dhruvil Shah


When we find a {{.swap}} file on startup, we typically want to rename and 
replace it as {{.log}}, {{.index}}, {{.timeindex}}, etc. as a way to complete 
any ongoing replace operations. These swap files are usually known to have been 
flushed to disk before the replace operation begins.

One flaw in the current logic is that when we recover these swap files on 
startup, we end up truncating the producer state and rebuild it from scratch. 
This is unneeded as the replace operation does not mutate the producer state by 
itself. It is only meant to replace the {{.log}} file along with corresponding 
indices.

Because of this unneeded producer state rebuild operation, we have seen 
multi-hour startup times for clusters that have large compacted topics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12254) MirrorMaker 2.0 creates destination topic with default configs

2021-03-01 Thread Dhruvil Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-12254:
-
Affects Version/s: 2.4.0
   2.5.0
   2.4.1
   2.6.0
   2.5.1
   2.7.0
   2.6.1

> MirrorMaker 2.0 creates destination topic with default configs
> --
>
> Key: KAFKA-12254
> URL: https://issues.apache.org/jira/browse/KAFKA-12254
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0, 2.5.0, 2.4.1, 2.6.0, 2.5.1, 2.7.0, 2.6.1, 2.8.0
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Blocker
> Fix For: 3.0.0
>
>
> `MirrorSourceConnector` implements the logic for replicating data, 
> configurations, and other metadata between the source and destination 
> clusters. This includes the tasks below:
>  # `refreshTopicPartitions` for syncing topics / partitions from source to 
> destination.
>  # `syncTopicConfigs` for syncing topic configurations from source to 
> destination.
> A limitation is that `computeAndCreateTopicPartitions` creates topics with 
> default configurations on the destination cluster. A separate async task 
> `syncTopicConfigs` is responsible for syncing the topic configs. Before that 
> sync happens, topic configurations could be out of sync between the two 
> clusters.
> In the worst case, this could lead to data loss eg. when we have a compacted 
> topic being mirrored between clusters which is incorrectly created with the 
> default configuration of `cleanup.policy = delete` on the destination before 
> the configurations are sync'd via `syncTopicConfigs`.
> Here is an example of the divergence:
> Source Topic:
> ```
> Topic: foobar PartitionCount: 1 ReplicationFactor: 1 Configs: 
> cleanup.policy=compact,segment.bytes=1073741824
> ```
> Destination Topic:
> ```
> Topic: A.foobar PartitionCount: 1 ReplicationFactor: 1 Configs: 
> segment.bytes=1073741824
> ```
> A safer approach is to ensure that the right configurations are set on the 
> destination cluster before data is replicated to it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8929) MM2 system tests

2021-03-01 Thread Dhruvil Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-8929:

Priority: Major  (was: Minor)

> MM2 system tests
> 
>
> Key: KAFKA-8929
> URL: https://issues.apache.org/jira/browse/KAFKA-8929
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Ryanne Dolan
>Priority: Major
>  Labels: test
>
> Add system tests for MM2 driver. Should resemble existing mirror-maker system 
> tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12254) MirrorMaker 2.0 creates destination topic with default configs

2021-03-01 Thread Dhruvil Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-12254:
-
Priority: Blocker  (was: Major)

> MirrorMaker 2.0 creates destination topic with default configs
> --
>
> Key: KAFKA-12254
> URL: https://issues.apache.org/jira/browse/KAFKA-12254
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Blocker
> Fix For: 3.0.0
>
>
> `MirrorSourceConnector` implements the logic for replicating data, 
> configurations, and other metadata between the source and destination 
> clusters. This includes the tasks below:
>  # `refreshTopicPartitions` for syncing topics / partitions from source to 
> destination.
>  # `syncTopicConfigs` for syncing topic configurations from source to 
> destination.
> A limitation is that `computeAndCreateTopicPartitions` creates topics with 
> default configurations on the destination cluster. A separate async task 
> `syncTopicConfigs` is responsible for syncing the topic configs. Before that 
> sync happens, topic configurations could be out of sync between the two 
> clusters.
> In the worst case, this could lead to data loss eg. when we have a compacted 
> topic being mirrored between clusters which is incorrectly created with the 
> default configuration of `cleanup.policy = delete` on the destination before 
> the configurations are sync'd via `syncTopicConfigs`.
> Here is an example of the divergence:
> Source Topic:
> ```
> Topic: foobar PartitionCount: 1 ReplicationFactor: 1 Configs: 
> cleanup.policy=compact,segment.bytes=1073741824
> ```
> Destination Topic:
> ```
> Topic: A.foobar PartitionCount: 1 ReplicationFactor: 1 Configs: 
> segment.bytes=1073741824
> ```
> A safer approach is to ensure that the right configurations are set on the 
> destination cluster before data is replicated to it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12254) MirrorMaker 2.0 creates destination topic with default configs

2021-03-01 Thread Dhruvil Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-12254:
-
Affects Version/s: 2.8.0

> MirrorMaker 2.0 creates destination topic with default configs
> --
>
> Key: KAFKA-12254
> URL: https://issues.apache.org/jira/browse/KAFKA-12254
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Blocker
> Fix For: 3.0.0
>
>
> `MirrorSourceConnector` implements the logic for replicating data, 
> configurations, and other metadata between the source and destination 
> clusters. This includes the tasks below:
>  # `refreshTopicPartitions` for syncing topics / partitions from source to 
> destination.
>  # `syncTopicConfigs` for syncing topic configurations from source to 
> destination.
> A limitation is that `computeAndCreateTopicPartitions` creates topics with 
> default configurations on the destination cluster. A separate async task 
> `syncTopicConfigs` is responsible for syncing the topic configs. Before that 
> sync happens, topic configurations could be out of sync between the two 
> clusters.
> In the worst case, this could lead to data loss eg. when we have a compacted 
> topic being mirrored between clusters which is incorrectly created with the 
> default configuration of `cleanup.policy = delete` on the destination before 
> the configurations are sync'd via `syncTopicConfigs`.
> Here is an example of the divergence:
> Source Topic:
> ```
> Topic: foobar PartitionCount: 1 ReplicationFactor: 1 Configs: 
> cleanup.policy=compact,segment.bytes=1073741824
> ```
> Destination Topic:
> ```
> Topic: A.foobar PartitionCount: 1 ReplicationFactor: 1 Configs: 
> segment.bytes=1073741824
> ```
> A safer approach is to ensure that the right configurations are set on the 
> destination cluster before data is replicated to it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12254) MirrorMaker 2.0 creates destination topic with default configs

2021-02-25 Thread Dhruvil Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah reassigned KAFKA-12254:


Assignee: Dhruvil Shah

> MirrorMaker 2.0 creates destination topic with default configs
> --
>
> Key: KAFKA-12254
> URL: https://issues.apache.org/jira/browse/KAFKA-12254
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Major
>
> `MirrorSourceConnector` implements the logic for replicating data, 
> configurations, and other metadata between the source and destination 
> clusters. This includes the tasks below:
>  # `refreshTopicPartitions` for syncing topics / partitions from source to 
> destination.
>  # `syncTopicConfigs` for syncing topic configurations from source to 
> destination.
> A limitation is that `computeAndCreateTopicPartitions` creates topics with 
> default configurations on the destination cluster. A separate async task 
> `syncTopicConfigs` is responsible for syncing the topic configs. Before that 
> sync happens, topic configurations could be out of sync between the two 
> clusters.
> In the worst case, this could lead to data loss eg. when we have a compacted 
> topic being mirrored between clusters which is incorrectly created with the 
> default configuration of `cleanup.policy = delete` on the destination before 
> the configurations are sync'd via `syncTopicConfigs`.
> Here is an example of the divergence:
> Source Topic:
> ```
> Topic: foobar PartitionCount: 1 ReplicationFactor: 1 Configs: 
> cleanup.policy=compact,segment.bytes=1073741824
> ```
> Destination Topic:
> ```
> Topic: A.foobar PartitionCount: 1 ReplicationFactor: 1 Configs: 
> segment.bytes=1073741824
> ```
> A safer approach is to ensure that the right configurations are set on the 
> destination cluster before data is replicated to it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12254) MirrorMaker 2.0 creates destination topic with default configs

2021-01-29 Thread Dhruvil Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-12254:
-
Description: 
`MirrorSourceConnector` implements the logic for replicating data, 
configurations, and other metadata between the source and destination clusters. 
This includes the tasks below:
 # `refreshTopicPartitions` for syncing topics / partitions from source to 
destination.
 # `syncTopicConfigs` for syncing topic configurations from source to 
destination.

A limitation is that `computeAndCreateTopicPartitions` creates topics with 
default configurations on the destination cluster. A separate async task 
`syncTopicConfigs` is responsible for syncing the topic configs. Before that 
sync happens, topic configurations could be out of sync between the two 
clusters.

In the worst case, this could lead to data loss eg. when we have a compacted 
topic being mirrored between clusters which is incorrectly created with the 
default configuration of `cleanup.policy = delete` on the destination before 
the configurations are sync'd via `syncTopicConfigs`.

Here is an example of the divergence:

Source Topic:

```

Topic: foobar PartitionCount: 1 ReplicationFactor: 1 Configs: 
cleanup.policy=compact,segment.bytes=1073741824

```

Destination Topic:

```

Topic: A.foobar PartitionCount: 1 ReplicationFactor: 1 Configs: 
segment.bytes=1073741824

```

A safer approach is to ensure that the right configurations are set on the 
destination cluster before data is replicated to it.

  was:
`MirrorSourceConnector` implements the logic for replicating data, 
configurations, and other metadata between the source and destination clusters. 
This includes the tasks below:
 # `refreshTopicPartitions` for syncing topics / partitions from source to 
destination.
 # `syncTopicConfigs` for syncing topic configurations from source to 
destination.

A limitation is that `computeAndCreateTopicPartitions` creates topics with 
default configurations on the destination cluster. A separate async task 
`syncTopicConfigs` is responsible for syncing the topic configs. Before that 
sync happens, topic configurations could be out of sync between the two 
clusters.

In the worst case, this could lead to data loss eg. when we have a compacted 
topic being mirrored between clusters which is incorrectly created with the 
default configuration of `cleanup.policy = delete` on the destination before 
the configurations are sync'd via `syncTopicConfigs`.

Here is an example of the divergence:

Source Topic:

```

Topic: foobar PartitionCount: 1 ReplicationFactor: 1 Configs: 
cleanup.policy=compact,segment.bytes=1073741824

```

Destination Topic:

```

Topic: A.foobar PartitionCount: 1 ReplicationFactor: 1 Configs: 
segment.bytes=1073741824

```


> MirrorMaker 2.0 creates destination topic with default configs
> --
>
> Key: KAFKA-12254
> URL: https://issues.apache.org/jira/browse/KAFKA-12254
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dhruvil Shah
>Priority: Major
>
> `MirrorSourceConnector` implements the logic for replicating data, 
> configurations, and other metadata between the source and destination 
> clusters. This includes the tasks below:
>  # `refreshTopicPartitions` for syncing topics / partitions from source to 
> destination.
>  # `syncTopicConfigs` for syncing topic configurations from source to 
> destination.
> A limitation is that `computeAndCreateTopicPartitions` creates topics with 
> default configurations on the destination cluster. A separate async task 
> `syncTopicConfigs` is responsible for syncing the topic configs. Before that 
> sync happens, topic configurations could be out of sync between the two 
> clusters.
> In the worst case, this could lead to data loss eg. when we have a compacted 
> topic being mirrored between clusters which is incorrectly created with the 
> default configuration of `cleanup.policy = delete` on the destination before 
> the configurations are sync'd via `syncTopicConfigs`.
> Here is an example of the divergence:
> Source Topic:
> ```
> Topic: foobar PartitionCount: 1 ReplicationFactor: 1 Configs: 
> cleanup.policy=compact,segment.bytes=1073741824
> ```
> Destination Topic:
> ```
> Topic: A.foobar PartitionCount: 1 ReplicationFactor: 1 Configs: 
> segment.bytes=1073741824
> ```
> A safer approach is to ensure that the right configurations are set on the 
> destination cluster before data is replicated to it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12254) MirrorMaker 2.0 creates destination topic with default configs

2021-01-29 Thread Dhruvil Shah (Jira)
Dhruvil Shah created KAFKA-12254:


 Summary: MirrorMaker 2.0 creates destination topic with default 
configs
 Key: KAFKA-12254
 URL: https://issues.apache.org/jira/browse/KAFKA-12254
 Project: Kafka
  Issue Type: Bug
Reporter: Dhruvil Shah


`MirrorSourceConnector` implements the logic for replicating data, 
configurations, and other metadata between the source and destination clusters. 
This includes the tasks below:
 # `refreshTopicPartitions` for syncing topics / partitions from source to 
destination.
 # `syncTopicConfigs` for syncing topic configurations from source to 
destination.

A limitation is that `computeAndCreateTopicPartitions` creates topics with 
default configurations on the destination cluster. A separate async task 
`syncTopicConfigs` is responsible for syncing the topic configs. Before that 
sync happens, topic configurations could be out of sync between the two 
clusters.

In the worst case, this could lead to data loss eg. when we have a compacted 
topic being mirrored between clusters which is incorrectly created with the 
default configuration of `cleanup.policy = delete` on the destination before 
the configurations are sync'd via `syncTopicConfigs`.

Here is an example of the divergence:

Source Topic:

```

Topic: foobar PartitionCount: 1 ReplicationFactor: 1 Configs: 
cleanup.policy=compact,segment.bytes=1073741824

```

Destination Topic:

```

Topic: A.foobar PartitionCount: 1 ReplicationFactor: 1 Configs: 
segment.bytes=1073741824

```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10518) Consumer fetches could be inefficient when lags are unbalanced

2020-09-23 Thread Dhruvil Shah (Jira)
Dhruvil Shah created KAFKA-10518:


 Summary: Consumer fetches could be inefficient when lags are 
unbalanced
 Key: KAFKA-10518
 URL: https://issues.apache.org/jira/browse/KAFKA-10518
 Project: Kafka
  Issue Type: Bug
Reporter: Dhruvil Shah


Consumer fetches are inefficient when lags are imbalanced across partitions, 
due to head of the line blocking and the behavior of blocking for `max.wait.ms` 
until data is available.

When the consumer receives a fetch response, it prepares the next fetch request 
and sends it out. The caveat is that the subsequent fetch request would 
explicitly exclude partitions for which the consumer received data in the 
previous round. This is to allow the consumer application to drain the data for 
those partitions, until the consumer fetches the other partitions it is 
subscribed to.

This behavior does not play out too well if the consumer is consuming when the 
lag is unbalanced, because it would receive data for the partitions it is 
lagging on, and then it would send a fetch request for partitions that do not 
have any data (or have little data). The latter will end up blocking for 
fetch.max.wait.ms on the broker before an empty response is sent back. This 
slows down the consumer’s overall consumption throughput since 
fetch.max.wait.ms is 500ms by default.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10517) Inefficient consumer processing with fetch sessions

2020-09-23 Thread Dhruvil Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-10517:
-
Description: 
With the introduction of fetch sessions, the consumer and the broker share a 
unified view of the partitions being consumed and their current state 
(fetch_offset, last_propagated_hwm, last_propagated_start_offset, etc.). The 
consumer is still expected to consume in a round robin manner, however, we have 
observed certain cases where this is not the case.

Because of how we perform memory management on the consumer and implement fetch 
pipelining, we exclude partitions from a `FetchRequest` when they have not been 
drained by the application. This is done by adding these partitions to the 
`toForget` list in the `FetchRequest`. When partitions are added to the 
`toForget` list, the broker removes these partitions from its session cache. 
This causes bit of a divergence between the broker's and the client's view of 
the metadata.

When forgotten partitions are added back to the Fetch after the application 
have drained them, the server will immediately add them back to the session 
cache and return a response for them, even if there is no corresponding data. 
This re-triggers the behavior on the consumer to put this partition on the 
`toForget` list incorrectly, even though no data for the partition may have 
been returned.

We have seen this behavior to cause an imbalance in lags across partitions as 
the consumer no longer obeys the round-robin sequence given that the partitions 
keep shuffling between the `toForget` and `toSend` lists.

At a high level, this is caused due to the out of sync session caches on the 
consumer and broker. This ends up in a state where the partition balance is 
being maintained by external factors (such as whether metadata was returned for 
a partition), rather than following the round-robin ordering.

  was:
With the introduction of fetch sessions, the consumer and the broker share a 
unified view of the partitions being consumed and their current state 
(fetch_offset, last_propagated_hwm, last_propagated_start_offset, etc.). The 
consumer is still expected to consume in a round robin manner, however, we have 
observed certain cases where this is not the case.

Because of how we perform memory management on the consumer and implement fetch 
pipelining, we exclude partitions from a FetchRequest when they have not been 
drained by the application. This is done by adding these partitions to the 
`toForget` list in the `FetchRequest`. When partitions are added to the 
`toForget` list, the broker removes these partitions from its session cache. 
This causes bit of a divergence between the broker's and the client's view of 
the metadata.

When forgotten partitions are added back to the Fetch after the application 
have drained them, the server will immediately add them back to the session 
cache and return a response for them, even if there is no corresponding data. 
This re-triggers the behavior on the consumer to put this partition on the 
`toForget` list incorrectly, even though no data for the partition may have 
been returned.

We have seen this behavior to cause an imbalance in lags across partitions as 
the consumer no longer obeys the round-robin sequence given that the partitions 
keep shuffling between the `toForget` and `toSend` lists.

At a high level, this is caused due to the out of sync session caches on the 
consumer and broker. This ends up in a state where the partition balance is 
being maintained by external factors (such as whether metadata was returned for 
a partition), rather than following the round-robin ordering.


> Inefficient consumer processing with fetch sessions
> ---
>
> Key: KAFKA-10517
> URL: https://issues.apache.org/jira/browse/KAFKA-10517
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dhruvil Shah
>Priority: Major
>
> With the introduction of fetch sessions, the consumer and the broker share a 
> unified view of the partitions being consumed and their current state 
> (fetch_offset, last_propagated_hwm, last_propagated_start_offset, etc.). The 
> consumer is still expected to consume in a round robin manner, however, we 
> have observed certain cases where this is not the case.
> Because of how we perform memory management on the consumer and implement 
> fetch pipelining, we exclude partitions from a `FetchRequest` when they have 
> not been drained by the application. This is done by adding these partitions 
> to the `toForget` list in the `FetchRequest`. When partitions are added to 
> the `toForget` list, the broker removes these partitions from its session 
> cache. This causes bit of a divergence between the broker's and the client's 
> view of the metadata.
> When forgotten partitions are added back 

[jira] [Created] (KAFKA-10517) Inefficient consumer processing with fetch sessions

2020-09-23 Thread Dhruvil Shah (Jira)
Dhruvil Shah created KAFKA-10517:


 Summary: Inefficient consumer processing with fetch sessions
 Key: KAFKA-10517
 URL: https://issues.apache.org/jira/browse/KAFKA-10517
 Project: Kafka
  Issue Type: Bug
Reporter: Dhruvil Shah


With the introduction of fetch sessions, the consumer and the broker share a 
unified view of the partitions being consumed and their current state 
(fetch_offset, last_propagated_hwm, last_propagated_start_offset, etc.). The 
consumer is still expected to consume in a round robin manner, however, we have 
observed certain cases where this is not the case.

Because of how we perform memory management on the consumer and implement fetch 
pipelining, we exclude partitions from a FetchRequest when they have not been 
drained by the application. This is done by adding these partitions to the 
`toForget` list in the `FetchRequest`. When partitions are added to the 
`toForget` list, the broker removes these partitions from its session cache. 
This causes bit of a divergence between the broker's and the client's view of 
the metadata.

When forgotten partitions are added back to the Fetch after the application 
have drained them, the server will immediately add them back to the session 
cache and return a response for them, even if there is no corresponding data. 
This re-triggers the behavior on the consumer to put this partition on the 
`toForget` list incorrectly, even though no data for the partition may have 
been returned.

We have seen this behavior to cause an imbalance in lags across partitions as 
the consumer no longer obeys the round-robin sequence given that the partitions 
keep shuffling between the `toForget` and `toSend` lists.

At a high level, this is caused due to the out of sync session caches on the 
consumer and broker. This ends up in a state where the partition balance is 
being maintained by external factors (such as whether metadata was returned for 
a partition), rather than following the round-robin ordering.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10471) TimeIndex handling may cause data loss in certain back to back failure

2020-09-08 Thread Dhruvil Shah (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17192494#comment-17192494
 ] 

Dhruvil Shah commented on KAFKA-10471:
--

It may be nice to use the time index sanity check to catch issues where an 
index file is not in a state we expect. We used to perform a sanity check for 
indices associated with all segments, but with the changes to load segments 
lazily, we dropped those checks. If we could reintroduce those checks back 
safely, that may be sufficient to catch and fix such cases. We are taking a 
similar approach in https://issues.apache.org/jira/browse/KAFKA-10207, so that 
might help here too.

> TimeIndex handling may cause data loss in certain back to back failure
> --
>
> Key: KAFKA-10471
> URL: https://issues.apache.org/jira/browse/KAFKA-10471
> Project: Kafka
>  Issue Type: Bug
>  Components: core, log
>Reporter: Rohit Shekhar
>Priority: Critical
>
> # Active segment for log A going clean shutdown - trim the time index to the 
> latest fill value, set the clean shutdown marker.
>  # Broker restarts, loading logs - no recovery due to clean shutdown marker, 
> log A recovers with the previous active segment as current. It also resized 
> the TimeIndex to the max.
>  #  Before all the log loads, the broker had a hard shutdown causing a clean 
> shutdown marker left as is.
>  #  Broker restarts, log A skips recovery due to the presence of a clean 
> shutdown marker but the TimeIndex file assumes the resized file from the 
> previous instance is all full (it assumes either file is newly created or is 
> full with valid value).
>  # The first append to the active segment will result in roll and TimeIndex 
> will be rolled with the timestamp value of the last valid entry (0)
>  # Segment's largest timestamp gives 0 (this can cause premature deletion of 
> data due to retention.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10207) Untrimmed Index files cause premature log segment deletions on startup

2020-07-07 Thread Dhruvil Shah (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17152915#comment-17152915
 ] 

Dhruvil Shah commented on KAFKA-10207:
--

Hmm, this seems pretty strange. Do you know if there was a specific jvm issue 
that was fixed to address the problem with `RandomAccessFile#setLength`?

> Untrimmed Index files cause premature log segment deletions on startup
> --
>
> Key: KAFKA-10207
> URL: https://issues.apache.org/jira/browse/KAFKA-10207
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 2.4.0, 2.3.1, 2.4.1
>Reporter: Johnny Malizia
>Priority: Major
>
> [KIP-263|https://cwiki.apache.org/confluence/display/KAFKA/KIP-263%3A+Allow+broker+to+skip+sanity+check+of+inactive+segments+on+broker+startup#KIP263:Allowbrokertoskipsanitycheckofinactivesegmentsonbrokerstartup-Evaluation]
>  appears to have introduced a change explicitly deciding to not call the 
> sanityCheck method on the time or offset index files that are loaded by Kafka 
> at startup. I found a particularly nasty bug using the following configuration
> {code:java}
> jvm=1.8.0_191 zfs=0.6.5.6 kernel=4.4.0-1013-aws kafka=2.4.1{code}
> The bug was that the retention period for a topic or even the broker level 
> configuration seemed to not be respected, no matter what, when the broker 
> started up it would decide that all log segments on disk were breaching the 
> retention window and the data would be purged away.
>  
> {code:java}
> Found deletable segments with base offsets [11610665,12130396,12650133] due 
> to retention time 8640ms breach {code}
> {code:java}
> Rolled new log segment at offset 12764291 in 1 ms. (kafka.log.Log)
> Scheduling segments for deletion List(LogSegment(baseOffset=11610665, 
> size=1073731621, lastModifiedTime=1592532125000, largestTime=0), 
> LogSegment(baseOffset=12130396, size=1073727967, 
> lastModifiedTime=1592532462000, largestTime=0), 
> LogSegment(baseOffset=12650133, size=235891971, 
> lastModifiedTime=1592532531000, largestTime=0)) {code}
> Further logging showed that this issue was happening when loading the files, 
> indicating the final writes to trim the index were not successful
> {code:java}
> DEBUG Loaded index file 
> /mnt/kafka-logs/test_topic-0/17221277.timeindex with maxEntries = 
> 873813, maxIndexSize = 10485760, entries = 873813, lastOffset = 
> TimestampOffset(0,17221277), file position = 10485756 
> (kafka.log.TimeIndex){code}
>  It looks like the initially file is preallocated (10MB by default) and index 
> entries are added over time. When it's time to roll to a new log segment, the 
> index file is supposed to be trimmed, removing any 0 bytes left at the tail 
> from the initial allocation. But, in some cases that doesn't seem to happen 
> successfully. Because 0 bytes at the tail may not have been removed, when the 
> index is loaded again after restarting Kafka, the buffer seeks the position 
> to the end and the next timestamp is 0 and this leads to a premature TTL 
> deletion of the log segments.
>  
> I tracked the issue down to being caused by the jvm version being used as 
> upgrading resolved this issue, but I think that Kafka should never delete 
> data by mistake like this as doing a rolling restart with this bug in place 
> would cause complete data-loss across the cluster.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9961) Brokers may be left in an inconsistent state after reassignment

2020-05-05 Thread Dhruvil Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-9961:

Description: 
When completing a reassignment, the controller sends StopReplicaRequest to 
replicas that are not in the target assignment and removes them from the 
assignment in ZK. We do not have any retry mechanism to ensure that the broker 
is able to process the StopReplicaRequest successfully. Under certain 
circumstances, this could leave brokers in an inconsistent state, where they 
continue being the follower for this partition and end up with an inconsistent 
metadata cache.

We have seen messages like the following being spammed in the broker logs when 
we get into this situation:
{code:java}
While recording the replica LEO, the partition topic-1 hasn't been created.
{code}
This happens because the broker has neither received an updated 
LeaderAndIsrRequest for the new leader nor a StopReplicaRequest from the 
controller when the replica was removed from the assignment.

Note that we would require a restart of the affected broker to fix this 
situation. A controller failover would not fix it as the broker could continue 
being a replica for the partition until it receives a StopReplicaRequest, which 
would never happen in this case.

There seem to be couple of problems we should address:
 # We need a mechanism to retry replica deletions after partition reassignment 
is complete. The main challenge here is to be able to deal with cases where a 
broker has been decommissioned and may never come back up.
 # We could perhaps consider a mechanism to reconcile replica states across 
brokers, something similar to the solution proposed in 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-550%3A+Mechanism+to+Delete+Stray+Partitions+on+Broker].

  was:
When completing a reassignment, the controller sends StopReplicaRequest to 
replicas that are not in the target assignment and removes them from the 
assignment in ZK. We do not have any retry mechanism to ensure that the broker 
is able to process the StopReplicaRequest successfully. Under certain 
circumstances, this could leave brokers in an inconsistent state, where they 
continue being the follower for this partition and end up with an inconsistent 
metadata cache.

We have seen messages like the following being spammed in the broker logs when 
we get into this situation:
{code:java}
While recording the replica LEO, the partition topic-1 hasn't been created.
{code}
This happens because the broker has not an updated LeaderAndIsrRequest for the 
new leader nor a StopReplicaRequest from the controller when the replica was 
removed from the assignment.

Note that we would require a restart of the affected broker to fix this 
situation. A controller failover would not fix it as the broker could continue 
being a replica for the partition until it receives a StopReplicaRequest, which 
would never happen in this case.

There seem to be couple of problems we should address:
 # We need a mechanism to retry replica deletions after partition reassignment 
is complete. The main challenge here is to be able to deal with cases where a 
broker has been decommissioned and may never come back up.
 # We could perhaps consider a mechanism to reconcile replica states across 
brokers, something similar to the solution proposed in 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-550%3A+Mechanism+to+Delete+Stray+Partitions+on+Broker].


> Brokers may be left in an inconsistent state after reassignment
> ---
>
> Key: KAFKA-9961
> URL: https://issues.apache.org/jira/browse/KAFKA-9961
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dhruvil Shah
>Priority: Major
>
> When completing a reassignment, the controller sends StopReplicaRequest to 
> replicas that are not in the target assignment and removes them from the 
> assignment in ZK. We do not have any retry mechanism to ensure that the 
> broker is able to process the StopReplicaRequest successfully. Under certain 
> circumstances, this could leave brokers in an inconsistent state, where they 
> continue being the follower for this partition and end up with an 
> inconsistent metadata cache.
> We have seen messages like the following being spammed in the broker logs 
> when we get into this situation:
> {code:java}
> While recording the replica LEO, the partition topic-1 hasn't been created.
> {code}
> This happens because the broker has neither received an updated 
> LeaderAndIsrRequest for the new leader nor a StopReplicaRequest from the 
> controller when the replica was removed from the assignment.
> Note that we would require a restart of the affected broker to fix this 
> situation. A controller failover would not fix it as the broker could 
> continue being a replica for the 

[jira] [Created] (KAFKA-9961) Brokers may be left in an inconsistent state after reassignment

2020-05-05 Thread Dhruvil Shah (Jira)
Dhruvil Shah created KAFKA-9961:
---

 Summary: Brokers may be left in an inconsistent state after 
reassignment
 Key: KAFKA-9961
 URL: https://issues.apache.org/jira/browse/KAFKA-9961
 Project: Kafka
  Issue Type: Bug
Reporter: Dhruvil Shah


When completing a reassignment, the controller sends StopReplicaRequest to 
replicas that are not in the target assignment and removes them from the 
assignment in ZK. We do not have any retry mechanism to ensure that the broker 
is able to process the StopReplicaRequest successfully. Under certain 
circumstances, this could leave brokers in an inconsistent state, where they 
continue being the follower for this partition and end up with an inconsistent 
metadata cache.

We have seen messages like the following being spammed in the broker logs when 
we get into this situation:
{code:java}
While recording the replica LEO, the partition topic-1 hasn't been created.
{code}
This happens because the broker has not an updated LeaderAndIsrRequest for the 
new leader nor a StopReplicaRequest from the controller when the replica was 
removed from the assignment.

Note that we would require a restart of the affected broker to fix this 
situation. A controller failover would not fix it as the broker could continue 
being a replica for the partition until it receives a StopReplicaRequest, which 
would never happen in this case.

There seem to be couple of problems we should address:
 # We need a mechanism to retry replica deletions after partition reassignment 
is complete. The main challenge here is to be able to deal with cases where a 
broker has been decommissioned and may never come back up.
 # We could perhaps consider a mechanism to reconcile replica states across 
brokers, something similar to the solution proposed in 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-550%3A+Mechanism+to+Delete+Stray+Partitions+on+Broker].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9956) Authorizer APIs may be invoked more than once for a given request

2020-05-05 Thread Dhruvil Shah (Jira)
Dhruvil Shah created KAFKA-9956:
---

 Summary: Authorizer APIs may be invoked more than once for a given 
request
 Key: KAFKA-9956
 URL: https://issues.apache.org/jira/browse/KAFKA-9956
 Project: Kafka
  Issue Type: Bug
Reporter: Dhruvil Shah


Authorizer#authorize may be invoked more than once in some cases for a given 
request. I noticed this in for `DescribeConfigsRequest` but other requests 
could be affected as well.

The reason for this is the misuse of the scala `partition` API in code like 
this:
{code:java}
val (authorizedResources, unauthorizedResources) = 
describeConfigsRequest.resources.asScala.partition { resource =>
  resource.`type` match {
case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER =>
  authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, CLUSTER_NAME)
case ConfigResource.Type.TOPIC =>
  authorize(request.context, DESCRIBE_CONFIGS, TOPIC, resource.name)
case rt => throw new InvalidRequestException(s"Unexpected resource type $rt 
for resource ${resource.name}")
  }
}
{code}
As per Scala docs, the `partition` API could traverse the collection twice, 
depending on the implementation. 
[https://www.scala-lang.org/api/current/scala/collection/Iterable.html#partition(p:A=%3EBoolean):(C,C)]

It is also not a good practice to include side effects as part of the lambda 
passed into `partition`. We should clean up such usages.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9772) Transactional offset commit fails with IllegalStateException

2020-03-26 Thread Dhruvil Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-9772:

Description: 
{code:java}
Trying to complete a transactional offset commit for producerId 7090 and 
groupId application-id even though the offset commit record itself hasn't been 
appended to the log.{code}
{code:java}
java.lang.IllegalStateException: Trying to complete a transactional offset 
commit for producerId 7090 and groupId application-id even though the offset 
commit record itself hasn't been appended to the log. at 
kafka.coordinator.group.GroupMetadata.$anonfun$completePendingTxnOffsetCommit$2(GroupMetadata.scala:677)
 at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) at 
scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) at 
scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) at 
scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) at 
scala.collection.mutable.HashMap.foreach(HashMap.scala:149) at 
kafka.coordinator.group.GroupMetadata.$anonfun$completePendingTxnOffsetCommit$1(GroupMetadata.scala:674)
 at 
kafka.coordinator.group.GroupMetadata.completePendingTxnOffsetCommit(GroupMetadata.scala:673)
 at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$handleTxnCompletion$2(GroupMetadataManager.scala:874)
 at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:228) at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$handleTxnCompletion$1(GroupMetadataManager.scala:873)
 at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at 
kafka.coordinator.group.GroupMetadataManager.handleTxnCompletion(GroupMetadataManager.scala:870)
 at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$scheduleHandleTxnCompletion$1(GroupMetadataManager.scala:865)
 at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 at java.base/java.lang.Thread.run(Thread.java:834){code}

  was:
{code:java}
java.lang.IllegalStateException: Trying to complete a transactional offset 
commit for producerId 7090 and groupId application-id even though the offset 
commit record itself hasn't been appended to the log. at 
kafka.coordinator.group.GroupMetadata.$anonfun$completePendingTxnOffsetCommit$2(GroupMetadata.scala:677)
 at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) at 
scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) at 
scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) at 
scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) at 
scala.collection.mutable.HashMap.foreach(HashMap.scala:149) at 
kafka.coordinator.group.GroupMetadata.$anonfun$completePendingTxnOffsetCommit$1(GroupMetadata.scala:674)
 at 
kafka.coordinator.group.GroupMetadata.completePendingTxnOffsetCommit(GroupMetadata.scala:673)
 at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$handleTxnCompletion$2(GroupMetadataManager.scala:874)
 at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:228) at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$handleTxnCompletion$1(GroupMetadataManager.scala:873)
 at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at 
kafka.coordinator.group.GroupMetadataManager.handleTxnCompletion(GroupMetadataManager.scala:870)
 at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$scheduleHandleTxnCompletion$1(GroupMetadataManager.scala:865)
 at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 at java.base/java.lang.Thread.run(Thread.java:834){code}


> Transactional offset commit fails with IllegalStateException
> 
>
> Key: KAFKA-9772
> URL: https://issues.apache.org/jira/browse/KAFKA-9772
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dhruvil Shah
>Priority: Major
>
> {code:java}
> Trying to complete a transactional offset 

[jira] [Updated] (KAFKA-9772) Transactional offset commit fails with IllegalStateException

2020-03-26 Thread Dhruvil Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-9772:

Description: 
{code:java}
java.lang.IllegalStateException: Trying to complete a transactional offset 
commit for producerId 7090 and groupId application-id even though the offset 
commit record itself hasn't been appended to the log. at 
kafka.coordinator.group.GroupMetadata.$anonfun$completePendingTxnOffsetCommit$2(GroupMetadata.scala:677)
 at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) at 
scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) at 
scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) at 
scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) at 
scala.collection.mutable.HashMap.foreach(HashMap.scala:149) at 
kafka.coordinator.group.GroupMetadata.$anonfun$completePendingTxnOffsetCommit$1(GroupMetadata.scala:674)
 at 
kafka.coordinator.group.GroupMetadata.completePendingTxnOffsetCommit(GroupMetadata.scala:673)
 at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$handleTxnCompletion$2(GroupMetadataManager.scala:874)
 at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:228) at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$handleTxnCompletion$1(GroupMetadataManager.scala:873)
 at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at 
kafka.coordinator.group.GroupMetadataManager.handleTxnCompletion(GroupMetadataManager.scala:870)
 at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$scheduleHandleTxnCompletion$1(GroupMetadataManager.scala:865)
 at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 at java.base/java.lang.Thread.run(Thread.java:834){code}

  was:java.lang.IllegalStateException: Trying to complete a transactional 
offset commit for producerId 7090 and groupId application-id even though the 
offset commit record itself hasn't been appended to the log. at 
kafka.coordinator.group.GroupMetadata.$anonfun$completePendingTxnOffsetCommit$2(GroupMetadata.scala:677)
 at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) at 
scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) at 
scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) at 
scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) at 
scala.collection.mutable.HashMap.foreach(HashMap.scala:149) at 
kafka.coordinator.group.GroupMetadata.$anonfun$completePendingTxnOffsetCommit$1(GroupMetadata.scala:674)
 at 
kafka.coordinator.group.GroupMetadata.completePendingTxnOffsetCommit(GroupMetadata.scala:673)
 at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$handleTxnCompletion$2(GroupMetadataManager.scala:874)
 at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:228) at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$handleTxnCompletion$1(GroupMetadataManager.scala:873)
 at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at 
kafka.coordinator.group.GroupMetadataManager.handleTxnCompletion(GroupMetadataManager.scala:870)
 at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$scheduleHandleTxnCompletion$1(GroupMetadataManager.scala:865)
 at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 at java.base/java.lang.Thread.run(Thread.java:834)


> Transactional offset commit fails with IllegalStateException
> 
>
> Key: KAFKA-9772
> URL: https://issues.apache.org/jira/browse/KAFKA-9772
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dhruvil Shah
>Priority: Major
>
> {code:java}
> java.lang.IllegalStateException: Trying to complete a transactional offset 
> commit for producerId 7090 and groupId application-id even though the offset 
> commit record itself hasn't been appended to the log. at 
> 

[jira] [Created] (KAFKA-9772) Transactional offset commit fails with IllegalStateException

2020-03-26 Thread Dhruvil Shah (Jira)
Dhruvil Shah created KAFKA-9772:
---

 Summary: Transactional offset commit fails with 
IllegalStateException
 Key: KAFKA-9772
 URL: https://issues.apache.org/jira/browse/KAFKA-9772
 Project: Kafka
  Issue Type: Bug
Reporter: Dhruvil Shah


java.lang.IllegalStateException: Trying to complete a transactional offset 
commit for producerId 7090 and groupId application-id even though the offset 
commit record itself hasn't been appended to the log. at 
kafka.coordinator.group.GroupMetadata.$anonfun$completePendingTxnOffsetCommit$2(GroupMetadata.scala:677)
 at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) at 
scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) at 
scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) at 
scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) at 
scala.collection.mutable.HashMap.foreach(HashMap.scala:149) at 
kafka.coordinator.group.GroupMetadata.$anonfun$completePendingTxnOffsetCommit$1(GroupMetadata.scala:674)
 at 
kafka.coordinator.group.GroupMetadata.completePendingTxnOffsetCommit(GroupMetadata.scala:673)
 at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$handleTxnCompletion$2(GroupMetadataManager.scala:874)
 at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:228) at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$handleTxnCompletion$1(GroupMetadataManager.scala:873)
 at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at 
kafka.coordinator.group.GroupMetadataManager.handleTxnCompletion(GroupMetadataManager.scala:870)
 at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$scheduleHandleTxnCompletion$1(GroupMetadataManager.scala:865)
 at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 at java.base/java.lang.Thread.run(Thread.java:834)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9307) Transaction coordinator could be left in unknown state after ZK session timeout

2019-12-16 Thread Dhruvil Shah (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997882#comment-16997882
 ] 

Dhruvil Shah edited comment on KAFKA-9307 at 12/17/19 4:42 AM:
---

Exception during step 6 that led to partial completion of become-follower 
transition:
{code:java}
[2019-12-12 03:08:17,864] ERROR [KafkaApi-3] Error when handling request: 
clientId=2, correlationId=1, api=LEADER_AND_ISR,
...
{topic=__transaction_state,partition_states=[{...
{partition=41,controller_epoch=16,leader=4,leader_epoch=112,isr=[2,4,1],zk_version=208,replicas=[3,4,2,1],is_new=false}
...
org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = 
Session expired for /brokers/topics/__transaction_state
at org.apache.zookeeper.KeeperException.create(KeeperException.java:130)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:54)
at 
kafka.zookeeper.AsyncResponse.resultException(ZooKeeperClient.scala:537)
at 
kafka.zk.KafkaZkClient$$anonfun$getReplicaAssignmentForTopics$1.apply(KafkaZkClient.scala:579)
at 
kafka.zk.KafkaZkClient$$anonfun$getReplicaAssignmentForTopics$1.apply(KafkaZkClient.scala:574)
at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at 
kafka.zk.KafkaZkClient.getReplicaAssignmentForTopics(KafkaZkClient.scala:574)
at 
kafka.zk.KafkaZkClient.getTopicPartitionCount(KafkaZkClient.scala:624)
at 
kafka.coordinator.transaction.TransactionStateManager.getTransactionTopicPartitionCount(TransactionStateManager.scala:279)
at 
kafka.coordinator.transaction.TransactionStateManager.validateTransactionTopicPartitionCountIsStable(TransactionStateManager.scala:465)
at 
kafka.coordinator.transaction.TransactionStateManager.removeTransactionsForTxnTopicPartition(TransactionStateManager.scala:434)
at 
kafka.coordinator.transaction.TransactionCoordinator.handleTxnEmigration(TransactionCoordinator.scala:282)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:190)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:186)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$onLeadershipChange$1(KafkaApis.scala:186)
at kafka.server.KafkaApis$$anonfun$2.apply(KafkaApis.scala:202)
at kafka.server.KafkaApis$$anonfun$2.apply(KafkaApis.scala:202)
at 
kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1153)
at 
kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:202) 
{code}


was (Author: dhruvilshah):
Exception during step 6 that led to partial completion of become-follower 
transition:
{code:java}
 2019-12-12 03:11:56,320] ERROR [Transaction State Manager 3]: The metadata 
cache for txn partition 41 has already exist with epoch 111 and 6 entries while 
trying to add to it; this should not happen 
(kafka.coordinator.transaction.TransactionStateManager) [2019-12-12 
03:11:56,320] ERROR Uncaught exception in scheduled task 
'load-txns-for-partition-__transaction_state-41' (kafka.utils.KafkaScheduler) 
java.lang.IllegalStateException: The metadata cache for txn partition 41 has 
already exist with epoch 111 and 6 entries while trying to add to it; this 
should not happen at 
kafka.coordinator.transaction.TransactionStateManager.addLoadedTransactionsToCache(TransactionStateManager.scala:369)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply$mcV$sp(TransactionStateManager.scala:394)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at 
kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259) at 
kafka.coordinator.transaction.TransactionStateManager.kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1(TransactionStateManager.scala:392)
 at 

[jira] [Commented] (KAFKA-9307) Transaction coordinator could be left in unknown state after ZK session timeout

2019-12-16 Thread Dhruvil Shah (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997885#comment-16997885
 ] 

Dhruvil Shah commented on KAFKA-9307:
-

Become-follower transition ignored during step 7:
{code:java}
[2019-12-12 03:08:18,241] TRACE [Broker id=3] Received LeaderAndIsr request 
PartitionState(controllerEpoch=16, leader=4, leaderEpoch=112, isr=2,4,1, 
zkVersion=208, replicas=3,4,2,1, isNew=false) correlation id 1 from controller 
2 epoch 16 for partition __transaction_state-41 (state.change.logger)
[2019-12-12 03:08:18,247] WARN [Broker id=3] Ignoring LeaderAndIsr request from 
controller 2 with correlation id 1 epoch 16 for partition 
__transaction_state-41 since its associated leader epoch 112 is not higher than 
the current leader epoch 112 (state.change.logger)
{code}

> Transaction coordinator could be left in unknown state after ZK session 
> timeout
> ---
>
> Key: KAFKA-9307
> URL: https://issues.apache.org/jira/browse/KAFKA-9307
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Major
>
> We observed a case where the transaction coordinator could not load 
> transaction state from __transaction-state topic partition. Clients would 
> continue seeing COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker 
> hosting the coordinator is restarted.
> This is the sequence of events that leads to the issue:
>  # The broker is the leader of one (or more) transaction state topic 
> partitions.
>  # The broker loses its ZK session due to a network issue.
>  # Broker reestablishes session with ZK, though there are still transient 
> network issues.
>  # Broker is made follower of the transaction state topic partition it was 
> leading earlier.
>  # During the become-follower transition, the broker loses its ZK session 
> again.
>  # The become-follower transition for this broker fails in-between, leaving 
> us in a partial leader / partial follower state for the transaction topic. 
> This meant that we could not unload the transaction metadata. However, the 
> broker successfully caches the leader epoch of associated with the 
> LeaderAndIsrRequest.
>  # Later, when the ZK session is finally established successfully, the broker 
> ignores the become-follower transition as the leader epoch was same as the 
> one it had cached. This prevented the transaction metadata from being 
> unloaded.
>  # Because this partition was a partial follower, we had setup replica 
> fetchers. The partition continued to fetch from the leader until it was made 
> part of the ISR.
>  # Once it was part of the ISR, preferred leader election kicked in and 
> elected this broker as the leader.
>  # When processing the become-leader transition, the transaction state load 
> operation failed as we already had transaction metadata loaded at a previous 
> epoch.
>  # This meant that this partition was left in the "loading" state and we thus 
> returned COORDINATOR_LOAD_IN_PROGRESS errors.
> Restarting the broker that hosts the transaction state coordinator is the 
> only way to recover from this situation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9307) Transaction coordinator could be left in unknown state after ZK session timeout

2019-12-16 Thread Dhruvil Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-9307:

Description: 
We observed a case where the transaction coordinator could not load transaction 
state from __transaction-state topic partition. Clients would continue seeing 
COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker hosting the 
coordinator is restarted.

This is the sequence of events that leads to the issue:
 # The broker is the leader of one (or more) transaction state topic partitions.
 # The broker loses its ZK session due to a network issue.
 # Broker reestablishes session with ZK, though there are still transient 
network issues.
 # Broker is made follower of the transaction state topic partition it was 
leading earlier.
 # During the become-follower transition, the broker loses its ZK session again.
 # The become-follower transition for this broker fails in-between, leaving us 
in a partial leader / partial follower state for the transaction topic. This 
meant that we could not unload the transaction metadata. However, the broker 
successfully caches the leader epoch of associated with the LeaderAndIsrRequest.
 # Later, when the ZK session is finally established successfully, the broker 
ignores the become-follower transition as the leader epoch was same as the one 
it had cached. This prevented the transaction metadata from being unloaded.
 # Because this partition was a partial follower, we had setup replica 
fetchers. The partition continued to fetch from the leader until it was made 
part of the ISR.
 # Once it was part of the ISR, preferred leader election kicked in and elected 
this broker as the leader.
 # When processing the become-leader transition, the transaction state load 
operation failed as we already had transaction metadata loaded at a previous 
epoch.
 # This meant that this partition was left in the "loading" state and we thus 
returned COORDINATOR_LOAD_IN_PROGRESS errors.

Restarting the broker that hosts the transaction state coordinator is the only 
way to recover from this situation.

  was:
We observed a case where the transaction coordinator could not load transaction 
state from __transaction-state topic partition. Clients would continue seeing 
COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker hosting the 
coordinator is restarted.

This is the sequence of events that leads to the issue:
 # The broker is the leader of one (or more) transaction state topic partitions.
 # The broker loses its ZK session due to a network issue.
 # Broker reestablishes session with ZK, though there are still transient 
network issues.
 # Broker is made follower of the transaction state topic partition it was 
leading earlier.
 # During the become-follower transition, the broker loses its ZK session again.
 # The become-follower transition for this broker fails in-between, leaving us 
in a partial leader / partial follower state for the transaction topic. This 
meant that we could not unload the transaction metadata. However, the broker 
successfully caches the leader epoch of associated with the LeaderAndIsrRequest.
 # Later, when the ZK session is finally established successfully, the broker 
ignores the become-follower transition as the leader epoch was same as the one 
it had cached. This prevented the transaction metadata from being unloaded.
 # Because this partition was a partial follower, we had setup replica 
fetchers. The partition continued to fetch from the leader until it was made 
part of the ISR.
 # Once it was part of the ISR, preferred leader election kicked in and elected 
this broker as the leader.
 # When processing the become-leader transition, the transaction state load 
operation failed as we already had transaction metadata loaded at a previous 
epoch.
 # This meant that this partition was left in the "loading" state and we thus 
returned COORDINATOR_LOAD_IN_PROGRESS errors.
 # Broker restart fixed this partial in-memory state and we were able to resume 
processing for transactions.


> Transaction coordinator could be left in unknown state after ZK session 
> timeout
> ---
>
> Key: KAFKA-9307
> URL: https://issues.apache.org/jira/browse/KAFKA-9307
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Major
>
> We observed a case where the transaction coordinator could not load 
> transaction state from __transaction-state topic partition. Clients would 
> continue seeing COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker 
> hosting the coordinator is restarted.
> This is the sequence of events that leads to the issue:
>  # The broker is the leader of one (or more) transaction state topic 
> partitions.
>  # The 

[jira] [Updated] (KAFKA-9307) Transaction coordinator could be left in unknown state after ZK session timeout

2019-12-16 Thread Dhruvil Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-9307:

Description: 
We observed a case where the transaction coordinator could not load transaction 
state from __transaction-state topic partition. Clients would continue seeing 
COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker hosting the 
coordinator is restarted.

This is the sequence of events that leads to the issue:
 # The broker is the leader of one (or more) transaction state topic partitions.
 # The broker loses its ZK session due to a network issue.
 # Broker reestablishes session with ZK, though there are still transient 
network issues.
 # Broker is made follower of the transaction state topic partition it was 
leading earlier.
 # During the become-follower transition, the broker loses its ZK session again.
 # The become-follower transition for this broker fails in-between, leaving us 
in a partial leader / partial follower state for the transaction topic. This 
meant that we could not unload the transaction metadata. However, the broker 
successfully caches the leader epoch of associated with the LeaderAndIsrRequest.
 # Later, when the ZK session is finally established successfully, the broker 
ignores the become-follower transition as the leader epoch was same as the one 
it had cached. This prevented the transaction metadata from being unloaded.
 # Because this partition was a partial follower, we had setup replica 
fetchers. The partition continued to fetch from the leader until it was made 
part of the ISR.
 # Once it was part of the ISR, preferred leader election kicked in and elected 
this broker as the leader.
 # When processing the become-leader transition, the transaction state load 
operation failed as we already had transaction metadata loaded at a previous 
epoch.
 # This meant that this partition was left in the "loading" state and we thus 
returned COORDINATOR_LOAD_IN_PROGRESS errors.
 # Broker restart fixed this partial in-memory state and we were able to resume 
processing for transactions.

  was:
We observed a case where the transaction coordinator could not load transaction 
state from __transaction-state topic partition. Clients would continue seeing 
COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker hosting the 
coordinator is restarted.

This is the sequence of events that leads to the issue:
 # The broker is the leader of one (or more) transaction state topic partitions.
 # The broker loses its ZK session due to a network issue.
 # Broker reestablishes session with ZK, though there are still transient 
network issues.
 # Broker is made follower of the transaction state topic partition it was 
leading earlier.
 # During the become-follower transition, the broker loses its ZK session again.
 # The become-follower transition for this broker fails in-between, leaving us 
in a partial leader / partial follower state for the transaction topic. This 
meant that we could not unload the transaction metadata. However, the broker 
successfully caches the leader epoch of associated with the LeaderAndIsrRequest.
 # Later, when the ZK session is finally established successfully, the broker 
ignores the become-follower transition as the leader epoch was same as the one 
it had cached. This prevented the transaction metadata from being unloaded.
 # Because this partition was a partial follower, we had setup replica 
fetchers. The partition continued to fetch from the leader until it was made 
part of the ISR.
 # Once it was part of the ISR, preferred leader election kicked in and elected 
this broker as the leader.
 # When processing the become-leader transition, the operation failed as we 
already had transaction metadata loaded at a previous epoch.
 # This meant that this partition was left in the "loading" state and we thus 
returned COORDINATOR_LOAD_IN_PROGRESS errors.
 # Broker restart fixed this partial in-memory state and we were able to resume 
processing for transactions.


> Transaction coordinator could be left in unknown state after ZK session 
> timeout
> ---
>
> Key: KAFKA-9307
> URL: https://issues.apache.org/jira/browse/KAFKA-9307
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Major
>
> We observed a case where the transaction coordinator could not load 
> transaction state from __transaction-state topic partition. Clients would 
> continue seeing COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker 
> hosting the coordinator is restarted.
> This is the sequence of events that leads to the issue:
>  # The broker is the leader of one (or more) transaction state topic 
> partitions.
>  # The broker loses its ZK session due to a 

[jira] [Commented] (KAFKA-9307) Transaction coordinator could be left in unknown state after ZK session timeout

2019-12-16 Thread Dhruvil Shah (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997883#comment-16997883
 ] 

Dhruvil Shah commented on KAFKA-9307:
-

Exception during step 10 that caused transaction metadata load to fail after 
become-leader transition:
{code:java}
2019-12-12 03:11:56,320] ERROR [Transaction State Manager 3]: The metadata 
cache for txn partition 41 has already exist with epoch 111 and 6 entries while 
trying to add to it; this should not happen 
(kafka.coordinator.transaction.TransactionStateManager)
[2019-12-12 03:11:56,320] ERROR Uncaught exception in scheduled task 
'load-txns-for-partition-__transaction_state-41' (kafka.utils.KafkaScheduler)
java.lang.IllegalStateException: The metadata cache for txn partition 41 has 
already exist with epoch 111 and 6 entries while trying to add to it; this 
should not happen
at 
kafka.coordinator.transaction.TransactionStateManager.addLoadedTransactionsToCache(TransactionStateManager.scala:369)
at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply$mcV$sp(TransactionStateManager.scala:394)
at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393)
at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259)
at 
kafka.coordinator.transaction.TransactionStateManager.kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1(TransactionStateManager.scala:392)
at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$loadTransactionsForTxnTopicPartition$2.apply$mcV$sp(TransactionStateManager.scala:426)
at 
kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:114)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java)
at java.lang.Thread.run(Thread.java:748)
{code}

> Transaction coordinator could be left in unknown state after ZK session 
> timeout
> ---
>
> Key: KAFKA-9307
> URL: https://issues.apache.org/jira/browse/KAFKA-9307
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Major
>
> We observed a case where the transaction coordinator could not load 
> transaction state from __transaction-state topic partition. Clients would 
> continue seeing COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker 
> hosting the coordinator is restarted.
> This is the sequence of events that leads to the issue:
>  # The broker is the leader of one (or more) transaction state topic 
> partitions.
>  # The broker loses its ZK session due to a network issue.
>  # Broker reestablishes session with ZK, though there are still transient 
> network issues.
>  # Broker is made follower of the transaction state topic partition it was 
> leading earlier.
>  # During the become-follower transition, the broker loses its ZK session 
> again.
>  # The become-follower transition for this broker fails in-between, leaving 
> us in a partial leader / partial follower state for the transaction topic. 
> This meant that we could not unload the transaction metadata. However, the 
> broker successfully caches the leader epoch of associated with the 
> LeaderAndIsrRequest.
>  # Later, when the ZK session is finally established successfully, the broker 
> ignores the become-follower transition as the leader epoch was same as the 
> one it had cached. This prevented the transaction metadata from being 
> unloaded.
>  # Because this partition was a partial follower, we had setup replica 
> fetchers. The partition continued to fetch from the leader until it was made 
> part of the ISR.
>  # Once it was part of the ISR, preferred leader election kicked in and 
> elected this broker as the leader.
>  # When processing the 

[jira] [Commented] (KAFKA-9307) Transaction coordinator could be left in unknown state after ZK session timeout

2019-12-16 Thread Dhruvil Shah (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997882#comment-16997882
 ] 

Dhruvil Shah commented on KAFKA-9307:
-

Exception during step 6 that led to partial completion of become-follower 
transition:
{code:java}
 2019-12-12 03:11:56,320] ERROR [Transaction State Manager 3]: The metadata 
cache for txn partition 41 has already exist with epoch 111 and 6 entries while 
trying to add to it; this should not happen 
(kafka.coordinator.transaction.TransactionStateManager) [2019-12-12 
03:11:56,320] ERROR Uncaught exception in scheduled task 
'load-txns-for-partition-__transaction_state-41' (kafka.utils.KafkaScheduler) 
java.lang.IllegalStateException: The metadata cache for txn partition 41 has 
already exist with epoch 111 and 6 entries while trying to add to it; this 
should not happen at 
kafka.coordinator.transaction.TransactionStateManager.addLoadedTransactionsToCache(TransactionStateManager.scala:369)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply$mcV$sp(TransactionStateManager.scala:394)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at 
kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259) at 
kafka.coordinator.transaction.TransactionStateManager.kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1(TransactionStateManager.scala:392)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$loadTransactionsForTxnTopicPartition$2.apply$mcV$sp(TransactionStateManager.scala:426)
 at 
kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:114) at 
kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java) 
at java.lang.Thread.run(Thread.java:748)
{code}

> Transaction coordinator could be left in unknown state after ZK session 
> timeout
> ---
>
> Key: KAFKA-9307
> URL: https://issues.apache.org/jira/browse/KAFKA-9307
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Major
>
> We observed a case where the transaction coordinator could not load 
> transaction state from __transaction-state topic partition. Clients would 
> continue seeing COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker 
> hosting the coordinator is restarted.
> This is the sequence of events that leads to the issue:
>  # The broker is the leader of one (or more) transaction state topic 
> partitions.
>  # The broker loses its ZK session due to a network issue.
>  # Broker reestablishes session with ZK, though there are still transient 
> network issues.
>  # Broker is made follower of the transaction state topic partition it was 
> leading earlier.
>  # During the become-follower transition, the broker loses its ZK session 
> again.
>  # The become-follower transition for this broker fails in-between, leaving 
> us in a partial leader / partial follower state for the transaction topic. 
> This meant that we could not unload the transaction metadata. However, the 
> broker successfully caches the leader epoch of associated with the 
> LeaderAndIsrRequest.
>  # Later, when the ZK session is finally established successfully, the broker 
> ignores the become-follower transition as the leader epoch was same as the 
> one it had cached. This prevented the transaction metadata from being 
> unloaded.
>  # Because this partition was a partial follower, we had setup replica 
> fetchers. The partition continued to fetch from the leader until it was made 
> part of the ISR.
>  # Once it was part of the ISR, preferred leader election kicked in and 
> elected this broker as the leader.
>  # When processing the become-leader transition, the operation failed as we 
> already had transaction metadata loaded at a previous epoch.
>  # This meant that 

[jira] [Updated] (KAFKA-9307) Transaction coordinator could be left in unknown state after ZK session timeout

2019-12-16 Thread Dhruvil Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-9307:

Description: 
We observed a case where the transaction coordinator could not load transaction 
state from __transaction-state topic partition. Clients would continue seeing 
COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker hosting the 
coordinator is restarted.

This is the sequence of events that leads to the issue:
 # The broker is the leader of one (or more) transaction state topic partitions.
 # The broker loses its ZK session due to a network issue.
 # Broker reestablishes session with ZK, though there are still transient 
network issues.
 # Broker is made follower of the transaction state topic partition it was 
leading earlier.
 # During the become-follower transition, the broker loses its ZK session again.
 # The become-follower transition for this broker fails in-between, leaving us 
in a partial leader / partial follower state for the transaction topic. This 
meant that we could not unload the transaction metadata. However, the broker 
successfully caches the leader epoch of associated with the LeaderAndIsrRequest.
 # Later, when the ZK session is finally established successfully, the broker 
ignores the become-follower transition as the leader epoch was same as the one 
it had cached. This prevented the transaction metadata from being unloaded.
 # Because this partition was a partial follower, we had setup replica 
fetchers. The partition continued to fetch from the leader until it was made 
part of the ISR.
 # Once it was part of the ISR, preferred leader election kicked in and elected 
this broker as the leader.
 # When processing the become-leader transition, the operation failed as we 
already had transaction metadata loaded at a previous epoch.
 # This meant that this partition was left in the "loading" state and we thus 
returned COORDINATOR_LOAD_IN_PROGRESS errors.
 # Broker restart fixed this partial in-memory state and we were able to resume 
processing for transactions.

  was:
We observed a case where the transaction coordinator could not load transaction 
state from __transaction-state topic partition. Clients would continue seeing 
COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker hosting the 
coordinator is restarted.

This is the sequence of events that leads to the issue:
 * The broker is the leader of one (or more) transaction state topic partitions.
 * The broker loses its ZK session due to a network issue.

 * Broker reestablishes session with ZK, though there are still transient 
network issues.
 * Broker is made follower of the transaction state topic partition it was 
leading earlier.

 * During the become-follower transition, the broker loses its ZK session again.
 * The become-follower transition for this broker fails in-between, leaving us 
in a partial leader / partial follower state for the transaction topic. This 
meant that we could not unload the transaction metadata. However, the broker 
successfully caches the leader epoch of associated with the LeaderAndIsrRequest.

 
 * Later, when the ZK session is finally established successfully, the broker 
ignores the become-follower transition as the leader epoch was same as the one 
it had cached. This prevented the transaction metadata from being unloaded.

 * Because this partition was a partial follower, we had setup replica 
fetchers. The partition continued to fetch from the leader until it was made 
part of the ISR.

 * Once it was part of the ISR, preferred leader election kicked in and elected 
this broker as the leader.

 * When processing the become-leader transition, the operation failed as we 
already had transaction metadata loaded at a previous epoch.

```
 2019-12-12 03:11:56,320] ERROR [Transaction State Manager 3]: The metadata 
cache for txn partition 41 has already exist with epoch 111 and 6 entries while 
trying to add to it; this should not happen 
(kafka.coordinator.transaction.TransactionStateManager) [2019-12-12 
03:11:56,320] ERROR Uncaught exception in scheduled task 
'load-txns-for-partition-__transaction_state-41' (kafka.utils.KafkaScheduler) 
java.lang.IllegalStateException: The metadata cache for txn partition 41 has 
already exist with epoch 111 and 6 entries while trying to add to it; this 
should not happen at 
kafka.coordinator.transaction.TransactionStateManager.addLoadedTransactionsToCache(TransactionStateManager.scala:369)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply$mcV$sp(TransactionStateManager.scala:394)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393)
 at 

[jira] [Updated] (KAFKA-9307) Transaction coordinator could be left in unknown state after ZK session timeout

2019-12-16 Thread Dhruvil Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-9307:

Description: 
We observed a case where the transaction coordinator could not load transaction 
state from __transaction-state topic partition. Clients would continue seeing 
COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker hosting the 
coordinator is restarted.

This is the sequence of events that leads to the issue:
 * The broker is the leader of one (or more) transaction state topic partitions.
 * The broker loses its ZK session due to a network issue.

 * Broker reestablishes session with ZK, though there are still transient 
network issues.
 * Broker is made follower of the transaction state topic partition it was 
leading earlier.

 * During the become-follower transition, the broker loses its ZK session again.
 * The become-follower transition for this broker fails in-between, leaving us 
in a partial leader / partial follower state for the transaction topic. This 
meant that we could not unload the transaction metadata. However, the broker 
successfully caches the leader epoch of associated with the LeaderAndIsrRequest.

 
 * Later, when the ZK session is finally established successfully, the broker 
ignores the become-follower transition as the leader epoch was same as the one 
it had cached. This prevented the transaction metadata from being unloaded.

 * Because this partition was a partial follower, we had setup replica 
fetchers. The partition continued to fetch from the leader until it was made 
part of the ISR.

 * Once it was part of the ISR, preferred leader election kicked in and elected 
this broker as the leader.

 * When processing the become-leader transition, the operation failed as we 
already had transaction metadata loaded at a previous epoch.

```
 2019-12-12 03:11:56,320] ERROR [Transaction State Manager 3]: The metadata 
cache for txn partition 41 has already exist with epoch 111 and 6 entries while 
trying to add to it; this should not happen 
(kafka.coordinator.transaction.TransactionStateManager) [2019-12-12 
03:11:56,320] ERROR Uncaught exception in scheduled task 
'load-txns-for-partition-__transaction_state-41' (kafka.utils.KafkaScheduler) 
java.lang.IllegalStateException: The metadata cache for txn partition 41 has 
already exist with epoch 111 and 6 entries while trying to add to it; this 
should not happen at 
kafka.coordinator.transaction.TransactionStateManager.addLoadedTransactionsToCache(TransactionStateManager.scala:369)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply$mcV$sp(TransactionStateManager.scala:394)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at 
kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259) at 
kafka.coordinator.transaction.TransactionStateManager.kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1(TransactionStateManager.scala:392)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$loadTransactionsForTxnTopicPartition$2.apply$mcV$sp(TransactionStateManager.scala:426)
 at 
kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:114) at 
kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java) 
at java.lang.Thread.run(Thread.java:748)
 ```
 * This meant that this partition was left in the "loading" state and we thus 
returned COORDINATOR_LOAD_IN_PROGRESS errors.

 * Broker restart fixed this partial in-memory state and we were able to resume 
processing for transactions.

  was:
We observed a case where the transaction coordinator could not load transaction 
state from __transaction-state topic partition. Clients would continue seeing 
COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker hosting the 
coordinator is restarted.

This is the sequence of events that leads to the issue:
 * The broker is the leader of one (or more) transaction state topic partitions.
 * The broker loses 

[jira] [Updated] (KAFKA-9307) Transaction coordinator could be left in unknown state after ZK session timeout

2019-12-16 Thread Dhruvil Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-9307:

Description: 
We observed a case where the transaction coordinator could not load transaction 
state from __transaction-state topic partition. Clients would continue seeing 
COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker hosting the 
coordinator is restarted.

This is the sequence of events that leads to the issue:
 * The broker is the leader of one (or more) transaction state topic partitions.
 * The broker loses its ZK session due to a network issue.

 * Broker reestablishes session with ZK, though there are still transient 
network issues.
 * Broker is made follower of the transaction state topic partition it was 
leading earlier.

 * During the become-follower transition, the broker loses its ZK session again.
 * The become-follower transition for this broker fails in-between, leaving us 
in a partial leader / partial follower state for the transaction topic. This 
meant that we could not unload the transaction metadata. However, the broker 
successfully caches the leader epoch of associated with the LeaderAndIsrRequest.

```
 [2019-12-12 03:08:17,864] ERROR [KafkaApi-3] Error when handling request: 
clientId=2, correlationId=1, api=LEADER_AND_ISR, ... 
{topic=__transaction_state,partition_states=[{...

{partition=41,controller_epoch=16,leader=4,leader_epoch=112,isr=[2,4,1],zk_version=208,replicas=[3,4,2,1],is_new=false}

... org.apache.zookeeper.KeeperException$SessionExpiredException: 
KeeperErrorCode = Session expired for /brokers/topics/__transaction_state at 
org.apache.zookeeper.KeeperException.create(KeeperException.java:130) at 
org.apache.zookeeper.KeeperException.create(KeeperException.java:54) at 
kafka.zookeeper.AsyncResponse.resultException(ZooKeeperClient.scala:537) at 
kafka.zk.KafkaZkClient$$anonfun$getReplicaAssignmentForTopics$1.apply(KafkaZkClient.scala:579)
 at 
kafka.zk.KafkaZkClient$$anonfun$getReplicaAssignmentForTopics$1.apply(KafkaZkClient.scala:574)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at 
scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at 
kafka.zk.KafkaZkClient.getReplicaAssignmentForTopics(KafkaZkClient.scala:574) 
at kafka.zk.KafkaZkClient.getTopicPartitionCount(KafkaZkClient.scala:624) at 
kafka.coordinator.transaction.TransactionStateManager.getTransactionTopicPartitionCount(TransactionStateManager.scala:279)
 at 
kafka.coordinator.transaction.TransactionStateManager.validateTransactionTopicPartitionCountIsStable(TransactionStateManager.scala:465)
 at 
kafka.coordinator.transaction.TransactionStateManager.removeTransactionsForTxnTopicPartition(TransactionStateManager.scala:434)
 at 
kafka.coordinator.transaction.TransactionCoordinator.handleTxnEmigration(TransactionCoordinator.scala:282)
 at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:190)
 at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:186)
 at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$onLeadershipChange$1(KafkaApis.scala:186)
 at kafka.server.KafkaApis$$anonfun$2.apply(KafkaApis.scala:202) at 
kafka.server.KafkaApis$$anonfun$2.apply(KafkaApis.scala:202) at 
kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1153) 
at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:202) 
 ```
 * Later, when the ZK session is finally established successfully, the broker 
ignores the become-follower transition as the leader epoch was same as the one 
it had cached. This prevented the transaction metadata from being unloaded.

 * Because this partition was a partial follower, we had setup replica 
fetchers. The partition continued to fetch from the leader until it was made 
part of the ISR.

 * Once it was part of the ISR, preferred leader election kicked in and elected 
this broker as the leader.

 * When processing the become-leader transition, the operation failed as we 
already had transaction metadata loaded at a previous epoch.

```
2019-12-12 03:11:56,320] ERROR [Transaction State Manager 3]: The metadata 
cache for txn partition 41 has already exist with epoch 111 and 6 entries while 
trying to add to it; this should not happen 
(kafka.coordinator.transaction.TransactionStateManager) [2019-12-12 
03:11:56,320] ERROR Uncaught exception in scheduled task 
'load-txns-for-partition-__transaction_state-41' 

[jira] [Assigned] (KAFKA-9307) Transaction coordinator could be left in unknown state after ZK session timeout

2019-12-16 Thread Dhruvil Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah reassigned KAFKA-9307:
---

Assignee: Dhruvil Shah

> Transaction coordinator could be left in unknown state after ZK session 
> timeout
> ---
>
> Key: KAFKA-9307
> URL: https://issues.apache.org/jira/browse/KAFKA-9307
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Major
>
> We observed a case where the transaction coordinator could not load 
> transaction state from __transaction-state topic partition. Clients would 
> continue seeing COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker 
> hosting the coordinator is restarted.
> This is the sequence of events that leads to the issue:
>  * The broker is the leader of one (or more) transaction state topic 
> partitions.
>  * The broker loses its ZK session due to a network issue.
>  * Broker reestablishes session with ZK, though there are still transient 
> network issues.
>  * Broker is made follower of the transaction state topic partition it was 
> leading earlier.
>  * During the become-follower transition, the broker loses its ZK session 
> again.
>  * The become-follower transition for this broker fails in-between, leaving 
> us in a partial leader / partial follower state for the transaction topic. 
> This meant that we could not unload the transaction metadata. However, the 
> broker successfully caches the leader epoch of associated with the 
> LeaderAndIsrRequest.
> ```
> [2019-12-12 03:08:17,864] ERROR [KafkaApi-3] Error when handling request: 
> clientId=2, correlationId=1, api=LEADER_AND_ISR, ... 
> \{topic=__transaction_state,partition_states=[{... 
> {partition=41,controller_epoch=16,leader=4,leader_epoch=112,isr=[2,4,1],zk_version=208,replicas=[3,4,2,1],is_new=false}
>  ... org.apache.zookeeper.KeeperException$SessionExpiredException: 
> KeeperErrorCode = Session expired for /brokers/topics/__transaction_state at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:130) at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:54) at 
> kafka.zookeeper.AsyncResponse.resultException(ZooKeeperClient.scala:537) at 
> kafka.zk.KafkaZkClient$$anonfun$getReplicaAssignmentForTopics$1.apply(KafkaZkClient.scala:579)
>  at 
> kafka.zk.KafkaZkClient$$anonfun$getReplicaAssignmentForTopics$1.apply(KafkaZkClient.scala:574)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at 
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at 
> kafka.zk.KafkaZkClient.getReplicaAssignmentForTopics(KafkaZkClient.scala:574) 
> at kafka.zk.KafkaZkClient.getTopicPartitionCount(KafkaZkClient.scala:624) at 
> kafka.coordinator.transaction.TransactionStateManager.getTransactionTopicPartitionCount(TransactionStateManager.scala:279)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.validateTransactionTopicPartitionCountIsStable(TransactionStateManager.scala:465)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.removeTransactionsForTxnTopicPartition(TransactionStateManager.scala:434)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.handleTxnEmigration(TransactionCoordinator.scala:282)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:190)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:186)
>  at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$onLeadershipChange$1(KafkaApis.scala:186)
>  at kafka.server.KafkaApis$$anonfun$2.apply(KafkaApis.scala:202) at 
> kafka.server.KafkaApis$$anonfun$2.apply(KafkaApis.scala:202) at 
> kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1153) 
> at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:202) 
> ```
>  * Later, when the ZK session is finally established successfully, the broker 
> ignores the become-follower transition as the leader epoch was same as the 
> one it had cached. This prevented the transaction metadata from being 
> unloaded.
>  * Because this partition was a partial follower, we had setup replica 
> fetchers. The partition continued to fetch from the leader until it was made 
> part of the ISR.
>  * Once it was part of the ISR, 

[jira] [Created] (KAFKA-9307) Transaction coordinator could be left in unknown state after ZK session timeout

2019-12-16 Thread Dhruvil Shah (Jira)
Dhruvil Shah created KAFKA-9307:
---

 Summary: Transaction coordinator could be left in unknown state 
after ZK session timeout
 Key: KAFKA-9307
 URL: https://issues.apache.org/jira/browse/KAFKA-9307
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Dhruvil Shah


We observed a case where the transaction coordinator could not load transaction 
state from __transaction-state topic partition. Clients would continue seeing 
COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker hosting the 
coordinator is restarted.

This is the sequence of events that leads to the issue:
 * The broker is the leader of one (or more) transaction state topic partitions.
 * The broker loses its ZK session due to a network issue.

 * Broker reestablishes session with ZK, though there are still transient 
network issues.
 * Broker is made follower of the transaction state topic partition it was 
leading earlier.

 * During the become-follower transition, the broker loses its ZK session again.
 * The become-follower transition for this broker fails in-between, leaving us 
in a partial leader / partial follower state for the transaction topic. This 
meant that we could not unload the transaction metadata. However, the broker 
successfully caches the leader epoch of associated with the LeaderAndIsrRequest.

```
[2019-12-12 03:08:17,864] ERROR [KafkaApi-3] Error when handling request: 
clientId=2, correlationId=1, api=LEADER_AND_ISR, ... 
\{topic=__transaction_state,partition_states=[{... 
{partition=41,controller_epoch=16,leader=4,leader_epoch=112,isr=[2,4,1],zk_version=208,replicas=[3,4,2,1],is_new=false}
 ... org.apache.zookeeper.KeeperException$SessionExpiredException: 
KeeperErrorCode = Session expired for /brokers/topics/__transaction_state at 
org.apache.zookeeper.KeeperException.create(KeeperException.java:130) at 
org.apache.zookeeper.KeeperException.create(KeeperException.java:54) at 
kafka.zookeeper.AsyncResponse.resultException(ZooKeeperClient.scala:537) at 
kafka.zk.KafkaZkClient$$anonfun$getReplicaAssignmentForTopics$1.apply(KafkaZkClient.scala:579)
 at 
kafka.zk.KafkaZkClient$$anonfun$getReplicaAssignmentForTopics$1.apply(KafkaZkClient.scala:574)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at 
scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at 
kafka.zk.KafkaZkClient.getReplicaAssignmentForTopics(KafkaZkClient.scala:574) 
at kafka.zk.KafkaZkClient.getTopicPartitionCount(KafkaZkClient.scala:624) at 
kafka.coordinator.transaction.TransactionStateManager.getTransactionTopicPartitionCount(TransactionStateManager.scala:279)
 at 
kafka.coordinator.transaction.TransactionStateManager.validateTransactionTopicPartitionCountIsStable(TransactionStateManager.scala:465)
 at 
kafka.coordinator.transaction.TransactionStateManager.removeTransactionsForTxnTopicPartition(TransactionStateManager.scala:434)
 at 
kafka.coordinator.transaction.TransactionCoordinator.handleTxnEmigration(TransactionCoordinator.scala:282)
 at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:190)
 at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:186)
 at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$onLeadershipChange$1(KafkaApis.scala:186)
 at kafka.server.KafkaApis$$anonfun$2.apply(KafkaApis.scala:202) at 
kafka.server.KafkaApis$$anonfun$2.apply(KafkaApis.scala:202) at 
kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1153) 
at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:202) 
```

 * Later, when the ZK session is finally established successfully, the broker 
ignores the become-follower transition as the leader epoch was same as the one 
it had cached. This prevented the transaction metadata from being unloaded.

 * Because this partition was a partial follower, we had setup replica 
fetchers. The partition continued to fetch from the leader until it was made 
part of the ISR.

 * Once it was part of the ISR, preferred leader election kicked in and elected 
this broker as the leader.

 * When processing the become-leader transition, the operation failed as we 
already had transaction metadata loaded at a previous epoch. This meant that 
this partition was left in the "loading" state and we thus returned 
COORDINATOR_LOAD_IN_PROGRESS errors.

 * Broker restart fixed this partial in-memory state and we were able to resume 
processing for 

[jira] [Assigned] (KAFKA-7362) enable kafka broker to remove orphan partitions automatically

2019-12-09 Thread Dhruvil Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah reassigned KAFKA-7362:
---

Assignee: Dhruvil Shah  (was: xiongqi wu)

> enable kafka broker to remove orphan partitions automatically 
> --
>
> Key: KAFKA-7362
> URL: https://issues.apache.org/jira/browse/KAFKA-7362
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, log
>Reporter: xiongqi wu
>Assignee: Dhruvil Shah
>Priority: Major
>
> When partition reassignment removes topic partitions from a offline broker, 
> those removed partitions become orphan partitions to the broker. When the 
> offline broker comes back online, it is not able to clean up both data and 
> folders that belong to orphan partitions.  Log manager will scan all all dirs 
> during startup, but the time based retention policy on a topic partition will 
> not be kicked out until the broker is either a follower or a leader of the 
> partition.  In addition, we do not have logic to delete folders that belong 
> to orphan partition today. 
> Open this ticket to provide a mechanism (when enabled) to safely remove 
> orphan partitions automatically.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7362) enable kafka broker to remove orphan partitions automatically

2019-12-09 Thread Dhruvil Shah (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16992234#comment-16992234
 ] 

Dhruvil Shah commented on KAFKA-7362:
-

[~Ahuri3] that sounds reasonable, as long as you know which partitions have 
been orphaned.

> enable kafka broker to remove orphan partitions automatically 
> --
>
> Key: KAFKA-7362
> URL: https://issues.apache.org/jira/browse/KAFKA-7362
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, log
>Reporter: xiongqi wu
>Assignee: Dhruvil Shah
>Priority: Major
>
> When partition reassignment removes topic partitions from a offline broker, 
> those removed partitions become orphan partitions to the broker. When the 
> offline broker comes back online, it is not able to clean up both data and 
> folders that belong to orphan partitions.  Log manager will scan all all dirs 
> during startup, but the time based retention policy on a topic partition will 
> not be kicked out until the broker is either a follower or a leader of the 
> partition.  In addition, we do not have logic to delete folders that belong 
> to orphan partition today. 
> Open this ticket to provide a mechanism (when enabled) to safely remove 
> orphan partitions automatically.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9167) Implement a broker to controller request channel

2019-11-16 Thread Dhruvil Shah (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16975789#comment-16975789
 ] 

Dhruvil Shah commented on KAFKA-9167:
-

Thanks for working on this [~viktorsomogyi]. If you already have something 
implemented, it would make sense for you to do the PR. I could help with 
reviewing the PR.

> Implement a broker to controller request channel
> 
>
> Key: KAFKA-9167
> URL: https://issues.apache.org/jira/browse/KAFKA-9167
> Project: Kafka
>  Issue Type: Sub-task
>  Components: controller, core
>Reporter: Viktor Somogyi-Vass
>Assignee: Dhruvil Shah
>Priority: Major
>
> In some cases, we will need to create a new API to replace an operation that 
> was formerly done via ZooKeeper.  One example of this is that when the leader 
> of a partition wants to modify the in-sync replica set, it currently modifies 
> ZooKeeper directly  In the post-ZK world, the leader will make an RPC to the 
> active controller instead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8125) Check for topic existence in CreateTopicsRequest prior to creating replica assignment

2019-10-15 Thread Dhruvil Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah resolved KAFKA-8125.
-
Resolution: Duplicate

> Check for topic existence in CreateTopicsRequest prior to creating replica 
> assignment
> -
>
> Key: KAFKA-8125
> URL: https://issues.apache.org/jira/browse/KAFKA-8125
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.1.1
>Reporter: Lucas Bradstreet
>Assignee: huxihx
>Priority: Minor
>
> Imagine the following pattern to ensure topic creation in an application:
>  # Attempt to create a topic with # partitions P and replication factor R.
>  #  If topic creation fails with TopicExistsException, continue. If topic 
> creation succeeds, continue, the topic now exists.
> This normally works fine. However, if the topic has already been created, but 
> if the number of live brokers < R, then the topic creation will fail an 
> org.apache.kafka.common.errors.InvalidReplicationFactorException, even though 
> the topic already exists.
> This could be avoided if we check whether the topic exists prior to calling 
> AdminUtils.assignReplicasToBrokers.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8125) Check for topic existence in CreateTopicsRequest prior to creating replica assignment

2019-10-15 Thread Dhruvil Shah (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16952415#comment-16952415
 ] 

Dhruvil Shah commented on KAFKA-8125:
-

Yeah, I'll close it as duplicate as the issue was fixed by KAFKA-8875.

> Check for topic existence in CreateTopicsRequest prior to creating replica 
> assignment
> -
>
> Key: KAFKA-8125
> URL: https://issues.apache.org/jira/browse/KAFKA-8125
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.1.1
>Reporter: Lucas Bradstreet
>Assignee: huxihx
>Priority: Minor
>
> Imagine the following pattern to ensure topic creation in an application:
>  # Attempt to create a topic with # partitions P and replication factor R.
>  #  If topic creation fails with TopicExistsException, continue. If topic 
> creation succeeds, continue, the topic now exists.
> This normally works fine. However, if the topic has already been created, but 
> if the number of live brokers < R, then the topic creation will fail an 
> org.apache.kafka.common.errors.InvalidReplicationFactorException, even though 
> the topic already exists.
> This could be avoided if we check whether the topic exists prior to calling 
> AdminUtils.assignReplicasToBrokers.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-8962) KafkaAdminClient#describeTopics always goes through the controller

2019-09-30 Thread Dhruvil Shah (Jira)
Dhruvil Shah created KAFKA-8962:
---

 Summary: KafkaAdminClient#describeTopics always goes through the 
controller
 Key: KAFKA-8962
 URL: https://issues.apache.org/jira/browse/KAFKA-8962
 Project: Kafka
  Issue Type: Bug
Reporter: Dhruvil Shah


KafkaAdminClient#describeTopic makes a MetadataRequest against the controller. 
We should consider routing the request to any broker in the cluster using 
`LeastLoadedNodeProvider` instead, so that we don't overwhelm the controller 
with these requests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8950) KafkaConsumer stops fetching

2019-09-26 Thread Dhruvil Shah (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16939062#comment-16939062
 ] 

Dhruvil Shah commented on KAFKA-8950:
-

Thanks for reporting the issue. If you have consumer logs from around the time 
it stopped making progress, that may be helpful.

> KafkaConsumer stops fetching
> 
>
> Key: KAFKA-8950
> URL: https://issues.apache.org/jira/browse/KAFKA-8950
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.3.0
> Environment: linux
>Reporter: Will James
>Priority: Major
>
> We have a KafkaConsumer consuming from a single partition with 
> enable.auto.commit set to true.
> Very occasionally, the consumer goes into a broken state. It returns no 
> records from the broker with every poll, and from most of the Kafka metrics 
> in the consumer it looks like it is fully caught up to the end of the log. 
> We see that we are long polling for the max poll timeout, and that there is 
> zero lag. In addition, we see that the heartbeat rate stays unchanged from 
> before the issue begins (so the consumer stays a part of the consumer group).
> In addition, from looking at the __consumer_offsets topic, it is possible to 
> see that the consumer is committing the same offset on the auto commit 
> interval, however, the offset does not move, and the lag from the broker's 
> perspective continues to increase.
> The issue is only resolved by restarting our application (which restarts the 
> KafkaConsumer instance).
> From a heap dump of an application in this state, I can see that the Fetcher 
> is in a state where it believes there are nodesWithPendingFetchRequests.
> However, I can see the state of the fetch latency sensor, specifically, the 
> fetch rate, and see that the samples were not updated for a long period of 
> time (actually, precisely the amount of time that the problem in our 
> application was occurring, around 50 hours - we have alerting on other 
> metrics but not the fetch rate, so we didn't notice the problem until a 
> customer complained).
> In this example, the consumer was processing around 40 messages per second, 
> with an average size of about 10kb, although most of the other examples of 
> this have happened with higher volume (250 messages / second, around 23kb per 
> message on average).
> I have spent some time investigating the issue on our end, and will continue 
> to do so as time allows, however I wanted to raise this as an issue because 
> it may be affecting other people.
> Please let me know if you have any questions or need additional information. 
> I doubt I can provide heap dumps unfortunately, but I can provide further 
> information as needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8522) Tombstones can survive forever

2019-08-19 Thread Dhruvil Shah (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16910791#comment-16910791
 ] 

Dhruvil Shah commented on KAFKA-8522:
-

[~Yohan123] yes, the idea would be to have one checkpoint file per partition 
stored alongside the log data. Each log directory can contain multiple 
partitions like you said.

> Tombstones can survive forever
> --
>
> Key: KAFKA-8522
> URL: https://issues.apache.org/jira/browse/KAFKA-8522
> Project: Kafka
>  Issue Type: Improvement
>  Components: log cleaner
>Reporter: Evelyn Bayes
>Priority: Minor
>
> This is a bit grey zone as to whether it's a "bug" but it is certainly 
> unintended behaviour.
>  
> Under specific conditions tombstones effectively survive forever:
>  * Small amount of throughput;
>  * min.cleanable.dirty.ratio near or at 0; and
>  * Other parameters at default.
> What  happens is all the data continuously gets cycled into the oldest 
> segment. Old records get compacted away, but the new records continuously 
> update the timestamp of the oldest segment reseting the countdown for 
> deleting tombstones.
> So tombstones build up in the oldest segment forever.
>  
> While you could "fix" this by reducing the segment size, this can be 
> undesirable as a sudden change in throughput could cause a dangerous number 
> of segments to be created.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8242) Exception in ReplicaFetcher blocks replication of all other partitions

2019-06-24 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16872002#comment-16872002
 ] 

Dhruvil Shah commented on KAFKA-8242:
-

I believe this is addressed by 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-461+-+Improve+Replica+Fetcher+behavior+at+handling+partition+failure].

> Exception in ReplicaFetcher blocks replication of all other partitions
> --
>
> Key: KAFKA-8242
> URL: https://issues.apache.org/jira/browse/KAFKA-8242
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 1.1.1
>Reporter: Nevins Bartolomeo
>Priority: Major
>
> We're seeing the following exception in our replication threads. 
> {code:java}
> [2019-04-16 14:14:39,724] ERROR [ReplicaFetcher replicaId=15, leaderId=8, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> testtopic-123 offset 9880379
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164)
> at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> Caused by: 
> org.apache.kafka.common.errors.TransactionCoordinatorFencedException: Invalid 
> coordinator epoch: 27 (zombie), 31 (current)
> {code}
> While this is an issue itself the larger issue is that this exception kills 
> the replication threads so no other partitions get replicated to this broker. 
> That a single corrupt partition can affect the availability of multiple 
> topics is a great concern to us.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8570) Downconversion could fail when log contains out of order message formats

2019-06-19 Thread Dhruvil Shah (JIRA)
Dhruvil Shah created KAFKA-8570:
---

 Summary: Downconversion could fail when log contains out of order 
message formats
 Key: KAFKA-8570
 URL: https://issues.apache.org/jira/browse/KAFKA-8570
 Project: Kafka
  Issue Type: Bug
Reporter: Dhruvil Shah
Assignee: Dhruvil Shah


When the log contains out of order message formats (for example a v2 message 
followed by a v1 message), it is possible for down-conversion to fail in 
certain scenarios where batches compressed and greater than 1kB in size. 
Down-conversion fails with a stack like the following:

java.lang.IllegalArgumentException
at java.nio.Buffer.limit(Buffer.java:275)
at 
org.apache.kafka.common.record.FileLogInputStream$FileChannelRecordBatch.writeTo(FileLogInputStream.java:176)
at 
org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:107)
at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:242)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8359) Reconsider default for leader imbalance percentage

2019-05-13 Thread Dhruvil Shah (JIRA)
Dhruvil Shah created KAFKA-8359:
---

 Summary: Reconsider default for leader imbalance percentage
 Key: KAFKA-8359
 URL: https://issues.apache.org/jira/browse/KAFKA-8359
 Project: Kafka
  Issue Type: Improvement
Reporter: Dhruvil Shah


By default, the leader imbalance ratio is 10%. This means that the controller 
won't trigger preferred leader election for a broker unless the ratio of the 
number of partitions a broker is the current leader of and the number of 
partitions it is the preferred leader of is off by more than 10%. The problem 
is when a broker is catching up after a restart, the smallest topics tend to 
catch up first and the largest ones later, so the 10% remaining difference may 
not be proportional to the broker's load. To keep better balance in the 
cluster, we should consider setting `leader.imbalance.per.broker.percentage=0` 
by default so that the preferred leaders are always elected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8322) Flaky test: SslTransportLayerTest.testListenerConfigOverride

2019-05-03 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-8322:

Description: 
java.lang.AssertionError: expected: but 
was: at org.junit.Assert.fail(Assert.java:89) at 
org.junit.Assert.failNotEquals(Assert.java:835) at 
org.junit.Assert.assertEquals(Assert.java:120) at 
org.junit.Assert.assertEquals(Assert.java:146) at 
org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:111)

[https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/4250/testReport/junit/org.apache.kafka.common.network/SslTransportLayerTest/testListenerConfigOverride/]

  was:
java.lang.AssertionError: expected: but 
was: at org.junit.Assert.fail(Assert.java:89) at 
org.junit.Assert.failNotEquals(Assert.java:835) at 
org.junit.Assert.assertEquals(Assert.java:120) at 
org.junit.Assert.assertEquals(Assert.java:146) at 
org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:111)
 at 
org.apache.kafka.common.network.SslTransportLayerTest.testListenerConfigOverride(SslTransportLayerTest.java:319)

 

[https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/4250/testReport/junit/org.apache.kafka.common.network/SslTransportLayerTest/testListenerConfigOverride/]


> Flaky test: SslTransportLayerTest.testListenerConfigOverride
> 
>
> Key: KAFKA-8322
> URL: https://issues.apache.org/jira/browse/KAFKA-8322
> Project: Kafka
>  Issue Type: Test
>  Components: core, unit tests
>Reporter: Dhruvil Shah
>Priority: Major
>
> java.lang.AssertionError: expected: but 
> was: at org.junit.Assert.fail(Assert.java:89) at 
> org.junit.Assert.failNotEquals(Assert.java:835) at 
> org.junit.Assert.assertEquals(Assert.java:120) at 
> org.junit.Assert.assertEquals(Assert.java:146) at 
> org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:111)
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/4250/testReport/junit/org.apache.kafka.common.network/SslTransportLayerTest/testListenerConfigOverride/]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8322) Flaky test: SslTransportLayerTest.testListenerConfigOverride

2019-05-03 Thread Dhruvil Shah (JIRA)
Dhruvil Shah created KAFKA-8322:
---

 Summary: Flaky test: 
SslTransportLayerTest.testListenerConfigOverride
 Key: KAFKA-8322
 URL: https://issues.apache.org/jira/browse/KAFKA-8322
 Project: Kafka
  Issue Type: Test
  Components: core, unit tests
Reporter: Dhruvil Shah


java.lang.AssertionError: expected: but 
was: at org.junit.Assert.fail(Assert.java:89) at 
org.junit.Assert.failNotEquals(Assert.java:835) at 
org.junit.Assert.assertEquals(Assert.java:120) at 
org.junit.Assert.assertEquals(Assert.java:146) at 
org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:111)
 at 
org.apache.kafka.common.network.SslTransportLayerTest.testListenerConfigOverride(SslTransportLayerTest.java:319)

 

[https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/4250/testReport/junit/org.apache.kafka.common.network/SslTransportLayerTest/testListenerConfigOverride/]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8185) Controller becomes stale and not able to failover the leadership for the partitions

2019-04-17 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16820573#comment-16820573
 ] 

Dhruvil Shah edited comment on KAFKA-8185 at 4/17/19 11:45 PM:
---

I investigated this issue with Kang and after looking through older logs, the 
most logical explanation is that the controller got into an invalid state after 
the topic znode was deleted directly from ZK. This is not an expected scenario 
as we expect users to use the `delete-topics` command, AdminClient or the 
DeleteTopicsRequest to delete a topic.


was (Author: dhruvilshah):
I investigated this issue with Kang and after looking through older logs, the 
most logical explanation is that the controller got into an invalid state after 
the topic znode was deleted from ZK. This is not an expected scenario as we 
expect users to use the `delete-topics` command, AdminClient or the 
DeleteTopicsRequest to delete a topic.

> Controller becomes stale and not able to failover the leadership for the 
> partitions
> ---
>
> Key: KAFKA-8185
> URL: https://issues.apache.org/jira/browse/KAFKA-8185
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 1.1.1
>Reporter: Kang H Lee
>Priority: Critical
> Attachments: broker12.zip, broker9.zip, zookeeper.zip
>
>
> Description:
> After broker 9 went offline, all partitions led by it went offline. The 
> controller attempted to move leadership but ran into an exception while doing 
> so:
> {code:java}
> // [2019-03-26 01:23:34,114] ERROR [PartitionStateMachine controllerId=12] 
> Error while moving some partitions to OnlinePartition state 
> (kafka.controller.PartitionStateMachine)
> java.util.NoSuchElementException: key not found: me-test-1
> at scala.collection.MapLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:59)
> at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
> at 
> kafka.controller.PartitionStateMachine$$anonfun$14.apply(PartitionStateMachine.scala:202)
> at 
> kafka.controller.PartitionStateMachine$$anonfun$14.apply(PartitionStateMachine.scala:202)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at 
> kafka.controller.PartitionStateMachine.initializeLeaderAndIsrForPartitions(PartitionStateMachine.scala:202)
> at 
> kafka.controller.PartitionStateMachine.doHandleStateChanges(PartitionStateMachine.scala:167)
> at 
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:116)
> at 
> kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:106)
> at 
> kafka.controller.KafkaController.kafka$controller$KafkaController$$onReplicasBecomeOffline(KafkaController.scala:437)
> at 
> kafka.controller.KafkaController.kafka$controller$KafkaController$$onBrokerFailure(KafkaController.scala:405)
> at 
> kafka.controller.KafkaController$BrokerChange$.process(KafkaController.scala:1246)
> at 
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:69)
> at 
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69)
> at 
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
> at 
> kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:68)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> The controller was unable to move leadership of partitions led by broker 9 as 
> a result. It's worth noting that the controller ran into the same exception 
> when the broker came back up online. The controller thinks `me-test-1` is a 
> new partition and when attempting to transition it to an online partition, it 
> is unable to retrieve its replica assignment from 
> ControllerContext#partitionReplicaAssignment. I need to look through the code 
> to figure out if there's a race condition or situations where we remove the 
> partition from ControllerContext#partitionReplicaAssignment but might still 
> leave it in PartitionStateMachine#partitionState.
> They had to change the controller to recover from the offline status.
> Sequential event:

[jira] [Comment Edited] (KAFKA-8185) Controller becomes stale and not able to failover the leadership for the partitions

2019-04-17 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16820573#comment-16820573
 ] 

Dhruvil Shah edited comment on KAFKA-8185 at 4/17/19 11:42 PM:
---

I investigated this issue with Kang and after looking through older logs, the 
most logical explanation is that the controller got into an invalid state after 
the topic znode was deleted from ZK. This is not an expected scenario as we 
expect users to use the `delete-topics` command, AdminClient or the 
DeleteTopicsRequest to delete a topic.


was (Author: dhruvilshah):
This is not a typically expected scenario and would only happen when the topic 
znode is deleted directly from ZK.

> Controller becomes stale and not able to failover the leadership for the 
> partitions
> ---
>
> Key: KAFKA-8185
> URL: https://issues.apache.org/jira/browse/KAFKA-8185
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 1.1.1
>Reporter: Kang H Lee
>Priority: Critical
> Attachments: broker12.zip, broker9.zip, zookeeper.zip
>
>
> Description:
> After broker 9 went offline, all partitions led by it went offline. The 
> controller attempted to move leadership but ran into an exception while doing 
> so:
> {code:java}
> // [2019-03-26 01:23:34,114] ERROR [PartitionStateMachine controllerId=12] 
> Error while moving some partitions to OnlinePartition state 
> (kafka.controller.PartitionStateMachine)
> java.util.NoSuchElementException: key not found: me-test-1
> at scala.collection.MapLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:59)
> at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
> at 
> kafka.controller.PartitionStateMachine$$anonfun$14.apply(PartitionStateMachine.scala:202)
> at 
> kafka.controller.PartitionStateMachine$$anonfun$14.apply(PartitionStateMachine.scala:202)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at 
> kafka.controller.PartitionStateMachine.initializeLeaderAndIsrForPartitions(PartitionStateMachine.scala:202)
> at 
> kafka.controller.PartitionStateMachine.doHandleStateChanges(PartitionStateMachine.scala:167)
> at 
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:116)
> at 
> kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:106)
> at 
> kafka.controller.KafkaController.kafka$controller$KafkaController$$onReplicasBecomeOffline(KafkaController.scala:437)
> at 
> kafka.controller.KafkaController.kafka$controller$KafkaController$$onBrokerFailure(KafkaController.scala:405)
> at 
> kafka.controller.KafkaController$BrokerChange$.process(KafkaController.scala:1246)
> at 
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:69)
> at 
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69)
> at 
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
> at 
> kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:68)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> The controller was unable to move leadership of partitions led by broker 9 as 
> a result. It's worth noting that the controller ran into the same exception 
> when the broker came back up online. The controller thinks `me-test-1` is a 
> new partition and when attempting to transition it to an online partition, it 
> is unable to retrieve its replica assignment from 
> ControllerContext#partitionReplicaAssignment. I need to look through the code 
> to figure out if there's a race condition or situations where we remove the 
> partition from ControllerContext#partitionReplicaAssignment but might still 
> leave it in PartitionStateMachine#partitionState.
> They had to change the controller to recover from the offline status.
> Sequential event:
> * Broker 9 got restated in between : 2019-03-26 01:22:54,236 - 2019-03-26 
> 01:27:30,967: This was unclean shutdown.
> * From 2019-03-26 01:27:30,967, broker 9 was rebuilding indexes. Broker 9 
> wasn't able to process data at this 

[jira] [Resolved] (KAFKA-8185) Controller becomes stale and not able to failover the leadership for the partitions

2019-04-17 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah resolved KAFKA-8185.
-
Resolution: Not A Problem

This is not a typically expected scenario and would only happen when the topic 
znode is deleted directly from ZK.

> Controller becomes stale and not able to failover the leadership for the 
> partitions
> ---
>
> Key: KAFKA-8185
> URL: https://issues.apache.org/jira/browse/KAFKA-8185
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 1.1.1
>Reporter: Kang H Lee
>Priority: Critical
> Attachments: broker12.zip, broker9.zip, zookeeper.zip
>
>
> Description:
> After broker 9 went offline, all partitions led by it went offline. The 
> controller attempted to move leadership but ran into an exception while doing 
> so:
> {code:java}
> // [2019-03-26 01:23:34,114] ERROR [PartitionStateMachine controllerId=12] 
> Error while moving some partitions to OnlinePartition state 
> (kafka.controller.PartitionStateMachine)
> java.util.NoSuchElementException: key not found: me-test-1
> at scala.collection.MapLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:59)
> at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
> at 
> kafka.controller.PartitionStateMachine$$anonfun$14.apply(PartitionStateMachine.scala:202)
> at 
> kafka.controller.PartitionStateMachine$$anonfun$14.apply(PartitionStateMachine.scala:202)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at 
> kafka.controller.PartitionStateMachine.initializeLeaderAndIsrForPartitions(PartitionStateMachine.scala:202)
> at 
> kafka.controller.PartitionStateMachine.doHandleStateChanges(PartitionStateMachine.scala:167)
> at 
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:116)
> at 
> kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:106)
> at 
> kafka.controller.KafkaController.kafka$controller$KafkaController$$onReplicasBecomeOffline(KafkaController.scala:437)
> at 
> kafka.controller.KafkaController.kafka$controller$KafkaController$$onBrokerFailure(KafkaController.scala:405)
> at 
> kafka.controller.KafkaController$BrokerChange$.process(KafkaController.scala:1246)
> at 
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:69)
> at 
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69)
> at 
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
> at 
> kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:68)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> The controller was unable to move leadership of partitions led by broker 9 as 
> a result. It's worth noting that the controller ran into the same exception 
> when the broker came back up online. The controller thinks `me-test-1` is a 
> new partition and when attempting to transition it to an online partition, it 
> is unable to retrieve its replica assignment from 
> ControllerContext#partitionReplicaAssignment. I need to look through the code 
> to figure out if there's a race condition or situations where we remove the 
> partition from ControllerContext#partitionReplicaAssignment but might still 
> leave it in PartitionStateMachine#partitionState.
> They had to change the controller to recover from the offline status.
> Sequential event:
> * Broker 9 got restated in between : 2019-03-26 01:22:54,236 - 2019-03-26 
> 01:27:30,967: This was unclean shutdown.
> * From 2019-03-26 01:27:30,967, broker 9 was rebuilding indexes. Broker 9 
> wasn't able to process data at this moment.
> * At 2019-03-26 01:29:36,741, broker 9 was starting to load replica.
> * [2019-03-26 01:29:36,202] ERROR [KafkaApi-9] Number of alive brokers '0' 
> does not meet the required replication factor '3' for the offsets topic 
> (configured via 'offsets.topic.replication.factor'). This error can be 
> ignored if the cluster is starting up and not all brokers are up yet. 
> (kafka.server.KafkaApis)
> * At 2019-03-26 

[jira] [Comment Edited] (KAFKA-7362) enable kafka broker to remove orphan partitions automatically

2019-04-09 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813990#comment-16813990
 ] 

Dhruvil Shah edited comment on KAFKA-7362 at 4/10/19 2:12 AM:
--

[~xiongqiwu] thank you for working on this! I agree that we need to figure out 
the appropriate time to initiate cleanup of orphaned partitions. Perhaps we 
could discuss more about the implementation after you open the PR for review.


was (Author: dhruvilshah):
[~xiongqiwu] from my understanding, we could only have orphan partitions when 
an offline broker comes back online, so doing this cleanup once on startup 
should be sufficient. We could talk more about the implementation when you open 
the PR for review. Thank you for working on this!

> enable kafka broker to remove orphan partitions automatically 
> --
>
> Key: KAFKA-7362
> URL: https://issues.apache.org/jira/browse/KAFKA-7362
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
>
> When partition reassignment removes topic partitions from a offline broker, 
> those removed partitions become orphan partitions to the broker. When the 
> offline broker comes back online, it is not able to clean up both data and 
> folders that belong to orphan partitions.  Log manager will scan all all dirs 
> during startup, but the time based retention policy on a topic partition will 
> not be kicked out until the broker is either a follower or a leader of the 
> partition.  In addition, we do not have logic to delete folders that belong 
> to orphan partition today. 
> Open this ticket to provide a mechanism (when enabled) to safely remove 
> orphan partitions automatically.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7362) enable kafka broker to remove orphan partitions automatically

2019-04-09 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16813990#comment-16813990
 ] 

Dhruvil Shah commented on KAFKA-7362:
-

[~xiongqiwu] from my understanding, we could only have orphan partitions when 
an offline broker comes back online, so doing this cleanup once on startup 
should be sufficient. We could talk more about the implementation when you open 
the PR for review. Thank you for working on this!

> enable kafka broker to remove orphan partitions automatically 
> --
>
> Key: KAFKA-7362
> URL: https://issues.apache.org/jira/browse/KAFKA-7362
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
>
> When partition reassignment removes topic partitions from a offline broker, 
> those removed partitions become orphan partitions to the broker. When the 
> offline broker comes back online, it is not able to clean up both data and 
> folders that belong to orphan partitions.  Log manager will scan all all dirs 
> during startup, but the time based retention policy on a topic partition will 
> not be kicked out until the broker is either a follower or a leader of the 
> partition.  In addition, we do not have logic to delete folders that belong 
> to orphan partition today. 
> Open this ticket to provide a mechanism (when enabled) to safely remove 
> orphan partitions automatically.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7362) enable kafka broker to remove orphan partitions automatically

2019-03-29 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805258#comment-16805258
 ] 

Dhruvil Shah edited comment on KAFKA-7362 at 3/29/19 6:38 PM:
--

Thanks [~xiongqiwu]. It will be interesting to hear your proposal on how to 
make progress on topic deletion in spite of having offline brokers. I think the 
main challenge there is to figure out what happens on topic recreation, i.e. 
when you delete a topic and a topic with the same name is recreated.

I think it would make sense to decouple both of these issues though. Cleaning 
up orphaned partitions is useful to reclaim the disk space, regardless of 
whether we solve the topic deletion problem or not. I am also not sure if we 
need a KIP to implement cleanup of orphaned partitions, as this does not change 
user experience in any way. What do you think?


was (Author: dhruvilshah):
Thanks [~xiongqiwu]. It will be interesting to hear your proposal on how to 
make progress on topic deletion in spite of having offline brokers. I think the 
main challenge there is to figure out what happens on topic recreation, i.e. 
when you delete a topic and a topic with the same name is recreated.

 

I think it would make sense to decouple both of these issues though. Cleaning 
up orphaned partitions is useful to reclaim the disk space, regardless of 
whether we solve the topic deletion problem or not. I am also not sure if we 
need a KIP to implement cleanup of orphaned partitions, as this does not change 
user experience in any way. What do you think?

> enable kafka broker to remove orphan partitions automatically 
> --
>
> Key: KAFKA-7362
> URL: https://issues.apache.org/jira/browse/KAFKA-7362
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
>
> When partition reassignment removes topic partitions from a offline broker, 
> those removed partitions become orphan partitions to the broker. When the 
> offline broker comes back online, it is not able to clean up both data and 
> folders that belong to orphan partitions.  Log manager will scan all all dirs 
> during startup, but the time based retention policy on a topic partition will 
> not be kicked out until the broker is either a follower or a leader of the 
> partition.  In addition, we do not have logic to delete folders that belong 
> to orphan partition today. 
> Open this ticket to provide a mechanism (when enabled) to safely remove 
> orphan partitions automatically.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7362) enable kafka broker to remove orphan partitions automatically

2019-03-29 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16805258#comment-16805258
 ] 

Dhruvil Shah commented on KAFKA-7362:
-

Thanks [~xiongqiwu]. It will be interesting to hear your proposal on how to 
make progress on topic deletion in spite of having offline brokers. I think the 
main challenge there is to figure out what happens on topic recreation, i.e. 
when you delete a topic and a topic with the same name is recreated.

 

I think it would make sense to decouple both of these issues though. Cleaning 
up orphaned partitions is useful to reclaim the disk space, regardless of 
whether we solve the topic deletion problem or not. I am also not sure if we 
need a KIP to implement cleanup of orphaned partitions, as this does not change 
user experience in any way. What do you think?

> enable kafka broker to remove orphan partitions automatically 
> --
>
> Key: KAFKA-7362
> URL: https://issues.apache.org/jira/browse/KAFKA-7362
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
>
> When partition reassignment removes topic partitions from a offline broker, 
> those removed partitions become orphan partitions to the broker. When the 
> offline broker comes back online, it is not able to clean up both data and 
> folders that belong to orphan partitions.  Log manager will scan all all dirs 
> during startup, but the time based retention policy on a topic partition will 
> not be kicked out until the broker is either a follower or a leader of the 
> partition.  In addition, we do not have logic to delete folders that belong 
> to orphan partition today. 
> Open this ticket to provide a mechanism (when enabled) to safely remove 
> orphan partitions automatically.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7362) enable kafka broker to remove orphan partitions automatically

2019-03-28 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804470#comment-16804470
 ] 

Dhruvil Shah commented on KAFKA-7362:
-

[~xiongqiwu] are you planning to work on this JIRA? If not, I could take a stab 
at fixing it.

> enable kafka broker to remove orphan partitions automatically 
> --
>
> Key: KAFKA-7362
> URL: https://issues.apache.org/jira/browse/KAFKA-7362
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
>
> When partition reassignment removes topic partitions from a offline broker, 
> those removed partitions become orphan partitions to the broker. When the 
> offline broker comes back online, it is not able to clean up both data and 
> folders that belong to orphan partitions.  Log manager will scan all all dirs 
> during startup, but the time based retention policy on a topic partition will 
> not be kicked out until the broker is either a follower or a leader of the 
> partition.  In addition, we do not have logic to delete folders that belong 
> to orphan partition today. 
> Open this ticket to provide a mechanism (when enabled) to safely remove 
> orphan partitions automatically.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-8134) ProducerConfig.LINGER_MS_CONFIG undocumented breaking change in kafka-clients 2.1

2019-03-25 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah reassigned KAFKA-8134:
---

Assignee: Dhruvil Shah

> ProducerConfig.LINGER_MS_CONFIG undocumented breaking change in kafka-clients 
> 2.1
> -
>
> Key: KAFKA-8134
> URL: https://issues.apache.org/jira/browse/KAFKA-8134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Sam Lendle
>Assignee: Dhruvil Shah
>Priority: Major
>
> Prior to 2.1, the type of the "linger.ms" config was Long, but was changed to 
> Integer in 2.1.0 ([https://github.com/apache/kafka/pull/5270]) A config using 
> a Long value for that parameter which works with kafka-clients < 2.1 will 
> cause a ConfigException to be thrown when constructing a KafkaProducer if 
> kafka-clients is upgraded to >= 2.1.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6098) Delete and Re-create topic operation could result in race condition

2019-01-31 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah reassigned KAFKA-6098:
---

Assignee: (was: Dhruvil Shah)

> Delete and Re-create topic operation could result in race condition
> ---
>
> Key: KAFKA-6098
> URL: https://issues.apache.org/jira/browse/KAFKA-6098
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: reliability
>
> Here is the following process to re-produce this issue:
> 1. Delete a topic using the delete topic request.
> 2. Confirm the topic is deleted using the list topics request.
> 3. Create the topic using the create topic request.
> In step 3) a race condition can happen that the response returns a 
> {{TOPIC_ALREADY_EXISTS}} error code, indicating the topic has already existed.
> The root cause of the above issue is in the {{TopicDeletionManager}} class:
> {code}
> controller.partitionStateMachine.handleStateChanges(partitionsForDeletedTopic.toSeq,
>  OfflinePartition)
> controller.partitionStateMachine.handleStateChanges(partitionsForDeletedTopic.toSeq,
>  NonExistentPartition)
> topicsToBeDeleted -= topic
> partitionsToBeDeleted.retain(_.topic != topic)
> kafkaControllerZkUtils.deleteTopicZNode(topic)
> kafkaControllerZkUtils.deleteTopicConfigs(Seq(topic))
> kafkaControllerZkUtils.deleteTopicDeletions(Seq(topic))
> controllerContext.removeTopic(topic)
> {code}
> I.e. it first update the broker's metadata cache through the ISR and metadata 
> update request, then delete the topic zk path, and then delete the 
> topic-deletion zk path. However, upon handling the create topic request, the 
> broker will simply try to write to the topic zk path directly. Hence there is 
> a race condition that between brokers update their metadata cache (hence list 
> topic request not returning this topic anymore) and zk path for the topic be 
> deleted (hence the create topic succeed).
> The reason this problem could be exposed, is through current handling logic 
> of the create topic response, most of which takes {{TOPIC_ALREADY_EXISTS}} as 
> "OK" and moves on, and the zk path will be deleted later, hence leaving the 
> topic to be not created at all:
> https://github.com/apache/kafka/blob/249e398bf84cdd475af6529e163e78486b43c570/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java#L221
> https://github.com/apache/kafka/blob/1a653c813c842c0b67f26fb119d7727e272cf834/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java#L232
> Looking at the code history, it seems this race condition always exist, but 
> testing on trunk / 1.0 with the above steps it is more likely to happen than 
> before. I wonder if the ZK async calls have an effect here. cc [~junrao] 
> [~onurkaraman]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-3886) Consumer should handle wakeups while rebalancing more gracefully

2019-01-31 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah reassigned KAFKA-3886:
---

Assignee: (was: Dhruvil Shah)

> Consumer should handle wakeups while rebalancing more gracefully
> 
>
> Key: KAFKA-3886
> URL: https://issues.apache.org/jira/browse/KAFKA-3886
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.0.0
>Reporter: Jason Gustafson
>Priority: Major
>
> If the user calls wakeup() while a rebalance in progress, we currently lose 
> track of the state of that rebalance. In the worst case, this can result in 
> an additional unneeded rebalance when the user calls poll() again. 
> The other thing that can happen is that the rebalance could complete inside 
> another blocking call (e.g. {{commitSync()}}). There may be scenarios where 
> this can cause us to commit offsets outside the generation an assignment is 
> valid for. For example: 
> 1. Consumer is initially assigned partition A
> 2. The consumer starts rebalancing, but is interrupted with a call to 
> wakeup().
> 3. User calls commitSync with offsets (A, 5)
> 4. Before offset commit is sent, an interrupted rebalance completes and 
> changes the assignment to include only partition B.
> 5. Now we proceed with the unsafe offset commit on partition A.
> In this case, we should probably ensure that it is not possible to commit 
> offsets after an assignment has been revoked. Other cases, such as 
> position(), may be handled similarly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7837) maybeShrinkIsr may not reflect OfflinePartitions immediately

2019-01-23 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750805#comment-16750805
 ] 

Dhruvil Shah commented on KAFKA-7837:
-

[~junrao] [~lindong] having looked a bit into this, it seems this should 
already work as expected. We obtain the offline partitions iterator using:

```

private def nonOfflinePartitionsIterator: Iterator[Partition] =
   allPartitions.values.iterator.filter(_ ne ReplicaManager.OfflinePartition)

```

As far as I can tell, the iterator returned is a view backed by the map, and 
the `filter` should be evaluated lazily as we iterate through the elements.

We mark partitions offline in `handleLogDirFailure`. The only operation before 
marking partitions offline is to ensure fetchers are removed:

```
 replicaFetcherManager.removeFetcherForPartitions(newOfflinePartitions)
 replicaAlterLogDirsManager.removeFetcherForPartitions(newOfflinePartitions ++ 
partitionsWithOfflineFutureReplica.map(_.topicPartition))

partitionsWithOfflineFutureReplica.foreach(partition => 
partition.removeFutureLocalReplica(deleteFromLogDir = false))
 newOfflinePartitions.foreach

{ topicPartition => val partition = allPartitions.put(topicPartition, 
ReplicaManager.OfflinePartition) partition.removePartitionMetrics() }

```

> maybeShrinkIsr may not reflect OfflinePartitions immediately
> 
>
> Key: KAFKA-7837
> URL: https://issues.apache.org/jira/browse/KAFKA-7837
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Assignee: Dhruvil Shah
>Priority: Major
>
> When a partition is marked offline due to a failed disk, the leader is 
> supposed to not shrink its ISR any more. In ReplicaManager.maybeShrinkIsr(), 
> we iterate through all non-offline partitions to shrink the ISR. If an ISR 
> needs to shrink, we need to write the new ISR to ZK, which can take a bit of 
> time. In this window, some partitions could now be marked as offline, but may 
> not be picked up by the iterator since it only reflects the state at that 
> point. This can cause all in-sync followers to be dropped out of ISR 
> unnecessarily and prevents a clean leader election.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7837) maybeShrinkIsr may not reflect OfflinePartitions immediately

2019-01-23 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16750805#comment-16750805
 ] 

Dhruvil Shah edited comment on KAFKA-7837 at 1/24/19 7:14 AM:
--

[~junrao] [~lindong] having looked a bit into this, it seems this should 
already work as expected. We obtain the offline partitions iterator using:

{{private def nonOfflinePartitionsIterator: Iterator[Partition] =}}
{{   allPartitions.values.iterator.filter(_ ne 
ReplicaManager.OfflinePartition)}}

As far as I can tell, the iterator returned is a view backed by the map, and 
the `filter` should be evaluated lazily as we iterate through the elements.

We mark partitions offline in `handleLogDirFailure`. The only operation before 
marking partitions offline is to ensure fetchers are removed:

{{replicaFetcherManager.removeFetcherForPartitions(newOfflinePartitions)}}
{{replicaAlterLogDirsManager.removeFetcherForPartitions(newOfflinePartitions ++ 
partitionsWithOfflineFutureReplica.map(_.topicPartition))}}

{{partitionsWithOfflineFutureReplica.foreach(partition => 
partition.removeFutureLocalReplica(deleteFromLogDir = false))}}
{{newOfflinePartitions.foreach { topicPartition =>}}
{{ val partition = allPartitions.put(topicPartition, 
ReplicaManager.OfflinePartition)}}
{{ partition.removePartitionMetrics()}}
{{}}}


was (Author: dhruvilshah):
[~junrao] [~lindong] having looked a bit into this, it seems this should 
already work as expected. We obtain the offline partitions iterator using:

```

private def nonOfflinePartitionsIterator: Iterator[Partition] =
   allPartitions.values.iterator.filter(_ ne ReplicaManager.OfflinePartition)

```

As far as I can tell, the iterator returned is a view backed by the map, and 
the `filter` should be evaluated lazily as we iterate through the elements.

We mark partitions offline in `handleLogDirFailure`. The only operation before 
marking partitions offline is to ensure fetchers are removed:

```
 replicaFetcherManager.removeFetcherForPartitions(newOfflinePartitions)
 replicaAlterLogDirsManager.removeFetcherForPartitions(newOfflinePartitions ++ 
partitionsWithOfflineFutureReplica.map(_.topicPartition))

partitionsWithOfflineFutureReplica.foreach(partition => 
partition.removeFutureLocalReplica(deleteFromLogDir = false))
 newOfflinePartitions.foreach

{ topicPartition => val partition = allPartitions.put(topicPartition, 
ReplicaManager.OfflinePartition) partition.removePartitionMetrics() }

```

> maybeShrinkIsr may not reflect OfflinePartitions immediately
> 
>
> Key: KAFKA-7837
> URL: https://issues.apache.org/jira/browse/KAFKA-7837
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Assignee: Dhruvil Shah
>Priority: Major
>
> When a partition is marked offline due to a failed disk, the leader is 
> supposed to not shrink its ISR any more. In ReplicaManager.maybeShrinkIsr(), 
> we iterate through all non-offline partitions to shrink the ISR. If an ISR 
> needs to shrink, we need to write the new ISR to ZK, which can take a bit of 
> time. In this window, some partitions could now be marked as offline, but may 
> not be picked up by the iterator since it only reflects the state at that 
> point. This can cause all in-sync followers to be dropped out of ISR 
> unnecessarily and prevents a clean leader election.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7045) Consumer may not be able to consume all messages when down-conversion is required

2019-01-22 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16749277#comment-16749277
 ] 

Dhruvil Shah commented on KAFKA-7045:
-

Unassigning as I don't have time to work on this at the moment.

> Consumer may not be able to consume all messages when down-conversion is 
> required
> -
>
> Key: KAFKA-7045
> URL: https://issues.apache.org/jira/browse/KAFKA-7045
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2, 1.0.0, 1.0.1, 1.1.0, 2.0.0
>Reporter: Dhruvil Shah
>Priority: Major
> Fix For: 2.2.0, 2.1.1
>
> Attachments: log-cleaner-test.zip
>
>
> When down-conversion is required, the consumer might fail consuming messages 
> under certain conditions. Couple such cases are outlined below:
> (1) When consuming from a compacted topic, it is possible that the consumer 
> wants to fetch messages that fall in the middle of a batch but the messages 
> have been compacted by the cleaner. For example, let's say we have the 
> following two segments. The brackets indicate a single batch of messages and 
> the numbers within are the message offsets.
> Segment #1: [0, 1, 2],  [3, 4, 5], [6, 7, 8]
>  Segment #2: [9, 10, 11], [12, 13, 14]
> If the cleaner were to come in now and clean up messages with offsets 7 and 
> 8, the segments would look like the following:
> Segment #1: [0, 1, 2], [3, 4, 5], [6]
>  Segment #2: [9, 10, 11], [12, 13, 14]
> A consumer attempting to fetch messages at offset 7 will start reading the 
> batch starting at offset 6. During down-conversion, we will drop the record 
> starting at 6 it is less than the current fetch start offset. However, there 
> are no messages in the log following offset 6. In such cases, we return the 
> `FileRecords` itself which would cause the consumer to throw an exception 
> because it does not understand the stored message format.
> (2) When consuming from a topic with transactional messages, down-conversion 
> usually drops control batches because these do not exist in V0 and V1 message 
> formats. If there are no message batches following the control batch in the 
> particular segment (or if we are at the end of the log), we would again get 
> no records after down-conversion and will return the `FileRecords`. Because 
> the consumer is not able to interpret control batches, it will again throw an 
> exception.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7045) Consumer may not be able to consume all messages when down-conversion is required

2019-01-22 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah reassigned KAFKA-7045:
---

Assignee: (was: Dhruvil Shah)

> Consumer may not be able to consume all messages when down-conversion is 
> required
> -
>
> Key: KAFKA-7045
> URL: https://issues.apache.org/jira/browse/KAFKA-7045
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2, 1.0.0, 1.0.1, 1.1.0, 2.0.0
>Reporter: Dhruvil Shah
>Priority: Major
> Fix For: 2.2.0, 2.1.1
>
> Attachments: log-cleaner-test.zip
>
>
> When down-conversion is required, the consumer might fail consuming messages 
> under certain conditions. Couple such cases are outlined below:
> (1) When consuming from a compacted topic, it is possible that the consumer 
> wants to fetch messages that fall in the middle of a batch but the messages 
> have been compacted by the cleaner. For example, let's say we have the 
> following two segments. The brackets indicate a single batch of messages and 
> the numbers within are the message offsets.
> Segment #1: [0, 1, 2],  [3, 4, 5], [6, 7, 8]
>  Segment #2: [9, 10, 11], [12, 13, 14]
> If the cleaner were to come in now and clean up messages with offsets 7 and 
> 8, the segments would look like the following:
> Segment #1: [0, 1, 2], [3, 4, 5], [6]
>  Segment #2: [9, 10, 11], [12, 13, 14]
> A consumer attempting to fetch messages at offset 7 will start reading the 
> batch starting at offset 6. During down-conversion, we will drop the record 
> starting at 6 it is less than the current fetch start offset. However, there 
> are no messages in the log following offset 6. In such cases, we return the 
> `FileRecords` itself which would cause the consumer to throw an exception 
> because it does not understand the stored message format.
> (2) When consuming from a topic with transactional messages, down-conversion 
> usually drops control batches because these do not exist in V0 and V1 message 
> formats. If there are no message batches following the control batch in the 
> particular segment (or if we are at the end of the log), we would again get 
> no records after down-conversion and will return the `FileRecords`. Because 
> the consumer is not able to interpret control batches, it will again throw an 
> exception.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7837) maybeShrinkIsr may not reflect OfflinePartitions immediately

2019-01-18 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16746588#comment-16746588
 ] 

Dhruvil Shah commented on KAFKA-7837:
-

I could take a stab at fixing this.

> maybeShrinkIsr may not reflect OfflinePartitions immediately
> 
>
> Key: KAFKA-7837
> URL: https://issues.apache.org/jira/browse/KAFKA-7837
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Assignee: Dhruvil Shah
>Priority: Major
>
> When a partition is marked offline due to a failed disk, the leader is 
> supposed to not shrink its ISR any more. In ReplicaManager.maybeShrinkIsr(), 
> we iterate through all non-offline partitions to shrink the ISR. If an ISR 
> needs to shrink, we need to write the new ISR to ZK, which can take a bit of 
> time. In this window, some partitions could now be marked as offline, but may 
> not be picked up by the iterator since it only reflects the state at that 
> point. This can cause all in-sync followers to be dropped out of ISR 
> unnecessarily and prevents a clean leader election.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7837) maybeShrinkIsr may not reflect OfflinePartitions immediately

2019-01-18 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah reassigned KAFKA-7837:
---

Assignee: Dhruvil Shah

> maybeShrinkIsr may not reflect OfflinePartitions immediately
> 
>
> Key: KAFKA-7837
> URL: https://issues.apache.org/jira/browse/KAFKA-7837
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Assignee: Dhruvil Shah
>Priority: Major
>
> When a partition is marked offline due to a failed disk, the leader is 
> supposed to not shrink its ISR any more. In ReplicaManager.maybeShrinkIsr(), 
> we iterate through all non-offline partitions to shrink the ISR. If an ISR 
> needs to shrink, we need to write the new ISR to ZK, which can take a bit of 
> time. In this window, some partitions could now be marked as offline, but may 
> not be picked up by the iterator since it only reflects the state at that 
> point. This can cause all in-sync followers to be dropped out of ISR 
> unnecessarily and prevents a clean leader election.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7838) improve logging in Partition.maybeShrinkIsr()

2019-01-17 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah reassigned KAFKA-7838:
---

Assignee: Dhruvil Shah

> improve logging in Partition.maybeShrinkIsr()
> -
>
> Key: KAFKA-7838
> URL: https://issues.apache.org/jira/browse/KAFKA-7838
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Assignee: Dhruvil Shah
>Priority: Major
>
> When we take a replica out of ISR, it would be useful to further log the 
> fetch offset of the out of sync replica and the leader's HW at the point. 
> This could be useful when the admin needs to manually enable unclean leader 
> election.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7832) Use automatic RPC generation in CreateTopics

2019-01-17 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16745598#comment-16745598
 ] 

Dhruvil Shah commented on KAFKA-7832:
-

Putting the link here so the PR is easy to find: 
https://github.com/apache/kafka/pull/5972

> Use automatic RPC generation in CreateTopics
> 
>
> Key: KAFKA-7832
> URL: https://issues.apache.org/jira/browse/KAFKA-7832
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>Priority: Major
>
> Use automatic RPC generation for the CreateTopics RPC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7827) Consumer stops receiving messages when FIRST leader started is killed

2019-01-15 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16743707#comment-16743707
 ] 

Dhruvil Shah commented on KAFKA-7827:
-

This is unexpected. How many replicas does the topic being consumed have?

> Consumer stops receiving messages when FIRST leader started is killed
> -
>
> Key: KAFKA-7827
> URL: https://issues.apache.org/jira/browse/KAFKA-7827
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0
> Environment: CentOS linux 7, Kafka 1.1.0
>Reporter: Somnath Choudhuri
>Priority: Major
>
> The Kafka consumer stops receiving messages IF the first broker that was 
> physically brought up when the cluster was started is killed. The consumer 
> receives the "missing" messages once that broker is brought back up. Bringing 
> down other brokers in the cluster does not impact the consumer so long as 
> quorum is maintained.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7814) Broker shut down while cleaning up log file

2019-01-15 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16743699#comment-16743699
 ] 

Dhruvil Shah commented on KAFKA-7814:
-

[~Ever600686] could you also upload your docker-compose file? I wonder what 
`/kafka/logs` maps to. It seems like files are being deleted from underneath 
while Kafka also tries to delete them.

> Broker shut down while cleaning up log file
> ---
>
> Key: KAFKA-7814
> URL: https://issues.apache.org/jira/browse/KAFKA-7814
> Project: Kafka
>  Issue Type: Bug
>  Components: log, offset manager
>Affects Versions: 1.1.0, 2.1.0
> Environment: os: aliYun, centos7
> docker image:wurstmeister/kafka:2.12-2.1.0
>Reporter: EverZhang
>Priority: Critical
> Attachments: broker1.log-cleaner.log.2019-01-11-17, 
> broker1.server.log.2019-01-11-17, broker2.log-cleaner.log.2019-01-11-17, 
> broker2.server.log.2019-01-11-17, broker3.log-cleaner.log.2019-01-11-17, 
> broker3.server.log.2019-01-11-17
>
>
> Kafka cluster with 3 brokers(version:1.1.0) and is well running for over 6 
> months.
> Then we modified partitions from 3 to 48 for every topic after 2018/12/12,  
> then the brokers shutdown every 5-10 days.
> Then we upgraded the broker from 1.1.0 to 2.1.0,  but the brokers still keep 
> shutting down every 5-10 days.
> Each time, one broker shut down after the following error log,  then several 
> minutes later, the other 2 brokers shut down too, with the same error but 
> other partition log files.
> {code:bash}
> [2019-01-11 17:16:36,572] INFO [ProducerStateManager 
> partition=__transaction_state-11] Writing producer snapshot at offset 807760 
> (kafka.log.ProducerStateManager)
> [2019-01-11 17:16:36,572] INFO [Log partition=__transaction_state-11, 
> dir=/kafka/logs] Rolled new log segment at offset 807760 in 4 ms. 
> (kafka.log.Log)
> [2019-01-11 17:16:46,150] WARN Resetting first dirty offset of 
> __transaction_state-35 to log start offset 194404 since the checkpointed 
> offset 194345 is invalid. (kafka.log.LogCleanerManager$)
> [2019-01-11 17:16:46,239] ERROR Failed to clean up log for 
> __transaction_state-11 in dir /kafka/logs due to IOException 
> (kafka.server.LogDirFailureChannel)
> java.nio.file.NoSuchFileException: 
> /kafka/logs/__transaction_state-11/00807727.log
> at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
> at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
> at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
> at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:409)
> at 
> sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
> at java.nio.file.Files.move(Files.java:1395)
> at 
> org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:809)
> at 
> org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:222)
> at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:488)
> at kafka.log.Log.asyncDeleteSegment(Log.scala:1838)
> at kafka.log.Log.$anonfun$replaceSegments$6(Log.scala:1901)
> at kafka.log.Log.$anonfun$replaceSegments$6$adapted(Log.scala:1896)
> at scala.collection.immutable.List.foreach(List.scala:388)
> at kafka.log.Log.replaceSegments(Log.scala:1896)
> at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:583)
> at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:515)
> at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:514)
> at scala.collection.immutable.List.foreach(List.scala:388)
> at kafka.log.Cleaner.doClean(LogCleaner.scala:514)
> at kafka.log.Cleaner.clean(LogCleaner.scala:492)
> at kafka.log.LogCleaner$CleanerThread.cleanLog(LogCleaner.scala:353)
> at 
> kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:319)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:300)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> Suppressed: java.nio.file.NoSuchFileException: 
> /kafka/logs/__transaction_state-11/00807727.log -> 
> /kafka/logs/__transaction_state-11/00807727.log.deleted
> at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
> at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
> at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:396)
> at 
> sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
> at java.nio.file.Files.move(Files.java:1395)
> at 
> 

[jira] [Comment Edited] (KAFKA-7814) Broker shut down while cleaning up log file

2019-01-11 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16740570#comment-16740570
 ] 

Dhruvil Shah edited comment on KAFKA-7814 at 1/11/19 8:01 PM:
--

[~Ever600686] thanks for reporting the issue. It looks like the segment was 
deleted while log cleaner was trying to clean it. Could you please upload 
broker and log cleaner logs from around the time of the incidence?


was (Author: dhruvilshah):
[~Ever600686] thanks for reporting the issue. It looks like the segment was 
deleted while log cleaner was trying to clean it. What is the topic's 
`cleanup.policy` set to? Could you also upload broker and log cleaner logs from 
around the time of the incidence?

> Broker shut down while cleaning up log file
> ---
>
> Key: KAFKA-7814
> URL: https://issues.apache.org/jira/browse/KAFKA-7814
> Project: Kafka
>  Issue Type: Bug
>  Components: log, offset manager
>Affects Versions: 1.1.0, 2.1.0
> Environment: os: aliYun, centos7
> docker image:wurstmeister/kafka:2.12-2.1.0
>Reporter: EverZhang
>Priority: Critical
>
> Kafka cluster with 3 brokers(version:1.1.0) and is well running for over 6 
> months.
> Then we modified partitions from 3 to 48 for every topic after 2018/12/12,  
> then the brokers shutdown every 5-10 days.
> Then we upgraded the broker from 1.1.0 to 2.1.0,  but the brokers still keep 
> shutting down every 5-10 days.
> Each time, one broker shut down after the following error log,  then several 
> minutes later, the other 2 brokers shut down too, with the same error but 
> other partition log files.
> {code:bash}
> [2019-01-11 17:16:36,572] INFO [ProducerStateManager 
> partition=__transaction_state-11] Writing producer snapshot at offset 807760 
> (kafka.log.ProducerStateManager)
> [2019-01-11 17:16:36,572] INFO [Log partition=__transaction_state-11, 
> dir=/kafka/logs] Rolled new log segment at offset 807760 in 4 ms. 
> (kafka.log.Log)
> [2019-01-11 17:16:46,150] WARN Resetting first dirty offset of 
> __transaction_state-35 to log start offset 194404 since the checkpointed 
> offset 194345 is invalid. (kafka.log.LogCleanerManager$)
> [2019-01-11 17:16:46,239] ERROR Failed to clean up log for 
> __transaction_state-11 in dir /kafka/logs due to IOException 
> (kafka.server.LogDirFailureChannel)
> java.nio.file.NoSuchFileException: 
> /kafka/logs/__transaction_state-11/00807727.log
> at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
> at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
> at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
> at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:409)
> at 
> sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
> at java.nio.file.Files.move(Files.java:1395)
> at 
> org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:809)
> at 
> org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:222)
> at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:488)
> at kafka.log.Log.asyncDeleteSegment(Log.scala:1838)
> at kafka.log.Log.$anonfun$replaceSegments$6(Log.scala:1901)
> at kafka.log.Log.$anonfun$replaceSegments$6$adapted(Log.scala:1896)
> at scala.collection.immutable.List.foreach(List.scala:388)
> at kafka.log.Log.replaceSegments(Log.scala:1896)
> at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:583)
> at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:515)
> at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:514)
> at scala.collection.immutable.List.foreach(List.scala:388)
> at kafka.log.Cleaner.doClean(LogCleaner.scala:514)
> at kafka.log.Cleaner.clean(LogCleaner.scala:492)
> at kafka.log.LogCleaner$CleanerThread.cleanLog(LogCleaner.scala:353)
> at 
> kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:319)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:300)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> Suppressed: java.nio.file.NoSuchFileException: 
> /kafka/logs/__transaction_state-11/00807727.log -> 
> /kafka/logs/__transaction_state-11/00807727.log.deleted
> at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
> at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
> at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:396)
> at 
> sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
> at 

[jira] [Commented] (KAFKA-7814) Broker shut down while cleaning up log file

2019-01-11 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16740570#comment-16740570
 ] 

Dhruvil Shah commented on KAFKA-7814:
-

[~Ever600686] thanks for reporting the issue. It looks like the segment was 
deleted while log cleaner was trying to clean it. What is the topic's 
`cleanup.policy` set to? Could you also upload broker and log cleaner logs from 
around the time of the incidence?

> Broker shut down while cleaning up log file
> ---
>
> Key: KAFKA-7814
> URL: https://issues.apache.org/jira/browse/KAFKA-7814
> Project: Kafka
>  Issue Type: Bug
>  Components: log, offset manager
>Affects Versions: 1.1.0, 2.1.0
> Environment: os: aliYun, centos7
> docker image:wurstmeister/kafka:2.12-2.1.0
>Reporter: EverZhang
>Priority: Critical
>
> Kafka cluster with 3 brokers(version:1.1.0) and is well running for over 6 
> months.
> Then we modified partitions from 3 to 48 for every topic after 2018/12/12,  
> then the brokers shutdown every 5-10 days.
> Then we upgraded the broker from 1.1.0 to 2.1.0,  but the brokers still keep 
> shutting down every 5-10 days.
> Each time, one broker shut down after the following error log,  then several 
> minutes later, the other 2 brokers shut down too, with the same error but 
> other partition log files.
> {code:bash}
> [2019-01-11 17:16:36,572] INFO [ProducerStateManager 
> partition=__transaction_state-11] Writing producer snapshot at offset 807760 
> (kafka.log.ProducerStateManager)
> [2019-01-11 17:16:36,572] INFO [Log partition=__transaction_state-11, 
> dir=/kafka/logs] Rolled new log segment at offset 807760 in 4 ms. 
> (kafka.log.Log)
> [2019-01-11 17:16:46,150] WARN Resetting first dirty offset of 
> __transaction_state-35 to log start offset 194404 since the checkpointed 
> offset 194345 is invalid. (kafka.log.LogCleanerManager$)
> [2019-01-11 17:16:46,239] ERROR Failed to clean up log for 
> __transaction_state-11 in dir /kafka/logs due to IOException 
> (kafka.server.LogDirFailureChannel)
> java.nio.file.NoSuchFileException: 
> /kafka/logs/__transaction_state-11/00807727.log
> at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
> at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
> at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
> at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:409)
> at 
> sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
> at java.nio.file.Files.move(Files.java:1395)
> at 
> org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:809)
> at 
> org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:222)
> at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:488)
> at kafka.log.Log.asyncDeleteSegment(Log.scala:1838)
> at kafka.log.Log.$anonfun$replaceSegments$6(Log.scala:1901)
> at kafka.log.Log.$anonfun$replaceSegments$6$adapted(Log.scala:1896)
> at scala.collection.immutable.List.foreach(List.scala:388)
> at kafka.log.Log.replaceSegments(Log.scala:1896)
> at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:583)
> at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:515)
> at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:514)
> at scala.collection.immutable.List.foreach(List.scala:388)
> at kafka.log.Cleaner.doClean(LogCleaner.scala:514)
> at kafka.log.Cleaner.clean(LogCleaner.scala:492)
> at kafka.log.LogCleaner$CleanerThread.cleanLog(LogCleaner.scala:353)
> at 
> kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:319)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:300)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> Suppressed: java.nio.file.NoSuchFileException: 
> /kafka/logs/__transaction_state-11/00807727.log -> 
> /kafka/logs/__transaction_state-11/00807727.log.deleted
> at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
> at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
> at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:396)
> at 
> sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
> at java.nio.file.Files.move(Files.java:1395)
> at 
> org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:806)
> ... 17 more
> [2019-01-11 17:16:46,245] INFO [ReplicaManager broker=2] Stopping serving 
> replicas in dir /kafka/logs (kafka.server.ReplicaManager)
> [2019-01-11 

[jira] [Commented] (KAFKA-7764) Authentication exceptions during consumer metadata updates may not get propagated

2018-12-21 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16726966#comment-16726966
 ] 

Dhruvil Shah commented on KAFKA-7764:
-

I had been looking into this codepath for 
[https://github.com/apache/kafka/pull/5542]. I can take a stab at fixing this 
issue as well.

> Authentication exceptions during consumer metadata updates may not get 
> propagated
> -
>
> Key: KAFKA-7764
> URL: https://issues.apache.org/jira/browse/KAFKA-7764
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Dhruvil Shah
>Priority: Major
>
> The consumer should propagate authentication errors to the user. We handle 
> the common case in ConsumerNetworkClient when the exception occurs in 
> response to an explicitly provided request. However, we are missing the logic 
> to propagate exceptions during metadata updates, which are handled internally 
> by NetworkClient. This logic exists in 
> ConsumerNetworkClient.awaitMetadataUpdate, but metadata updates can occur 
> outside of this path. Probably we just need to move that logic into 
> ConsumerNetworkClient.poll() so that errors are always checked.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7764) Authentication exceptions during consumer metadata updates may not get propagated

2018-12-21 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah reassigned KAFKA-7764:
---

Assignee: Dhruvil Shah

> Authentication exceptions during consumer metadata updates may not get 
> propagated
> -
>
> Key: KAFKA-7764
> URL: https://issues.apache.org/jira/browse/KAFKA-7764
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Dhruvil Shah
>Priority: Major
>
> The consumer should propagate authentication errors to the user. We handle 
> the common case in ConsumerNetworkClient when the exception occurs in 
> response to an explicitly provided request. However, we are missing the logic 
> to propagate exceptions during metadata updates, which are handled internally 
> by NetworkClient. This logic exists in 
> ConsumerNetworkClient.awaitMetadataUpdate, but metadata updates can occur 
> outside of this path. Probably we just need to move that logic into 
> ConsumerNetworkClient.poll() so that errors are always checked.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException

2018-11-27 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-7678:

Component/s: streams

> Failed to close producer due to java.lang.NullPointerException
> --
>
> Key: KAFKA-7678
> URL: https://issues.apache.org/jira/browse/KAFKA-7678
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jonathan Santilli
>Priority: Major
>
> This occurs when the group is rebalancing in a Kafka Stream application and 
> the process (the Kafka Stream application) receives a *SIGTERM* to stop it 
> gracefully.
>  
>  
> {noformat}
> ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] 
> Failed to close producer due to the following error:
> java.lang.NullPointerException
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat}
>  
>  
> Although I have checked the code and the method 
> `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` 
> class is expecting any kind of error to happen since is catching 
> `*Throwable*`.
>  
>  
>  
> {noformat}
> try {
>  recordCollector.close();
> } catch (final Throwable e) {
>  log.error("Failed to close producer due to the following error:", e);
> } finally {
>  producer = null;
> }{noformat}
>  
> Should we consider this a bug?
> In my opinion, we could check for the `*null*` possibility at 
> `*RecordCollectorImpl*.*java*` class:
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  producer.close();
>  producer = null;
>  checkForException();
> }{noformat}
>  
> Change it for:
>  
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  if ( Objects.nonNull(producer) ) {
> producer.close();
> producer = null;
>  }
>  checkForException();
> }{noformat}
>  
> How does that sound?
>  
> Kafka Brokers running 2.0.0
> Kafka Stream and client 2.1.0
> OpenJDK 8
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable

2018-10-19 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah reassigned KAFKA-7519:
---

Assignee: (was: Dhruvil Shah)

> Transactional Ids Left in Pending State by TransactionStateManager During 
> Transactional Id Expiration Are Unusable
> --
>
> Key: KAFKA-7519
> URL: https://issues.apache.org/jira/browse/KAFKA-7519
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.0.0
>Reporter: Bridger Howell
>Priority: Critical
> Attachments: KAFKA-7519.patch, image-2018-10-18-13-02-22-371.png
>
>
>  
> After digging into a case where an exactly-once streams process was bizarrely 
> unable to process incoming data, we observed the following:
>  * StreamThreads stalling while creating a producer, eventually resulting in 
> no consumption by that streams process. Looking into those threads, we found 
> they were stuck in a loop, sending InitProducerIdRequests and always 
> receiving back the retriable error CONCURRENT_TRANSACTIONS and trying again. 
> These requests always had the same transactional id.
>  * After changing the streams process to not use exactly-once, it was able to 
> process messages with no problems.
>  * Alternatively, changing the applicationId for that streams process, it was 
> able to process with no problems.
>  * Every hour,  every broker would fail the task `transactionalId-expiration` 
> with the following error:
>  ** 
> {code:java}
> {"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing 
> transaction state transition to Dead while it already a pending sta
> te Dead
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262)
>     at kafka.coordinator
> .transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237)
>     at kafka.coordinator.transaction.TransactionStateManager$$a
> nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal
> a:151)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano
> nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151)
>     at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>     at
>  
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>     at kafka.coordinator.transaction.TransactionSt
> ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc
> ala:150)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a
> nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149)
>     at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable
> Like.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.foreach(List.scala:392)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.map(List.scala:296)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app
> ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149)
>     at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl
> eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142)
>     at scala.collection.Traversabl
> eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>     at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.
> scala:241)
>     at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
>     at scala.collection.mutable.HashMap$$anon
> fun$foreach$1.apply(HashMap.scala:130)
>     at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
>     at scala.collec
> tion.mutable.HashMap.foreachEntry(HashMap.scala:40)
>     at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
>     at scala.collecti
> on.TraversableLike$class.flatMap(TraversableLike.scala:241)
>     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>     a
> t 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Tr
> ansactionStateManager.scala:142)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$a
> 

[jira] [Commented] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable

2018-10-19 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657056#comment-16657056
 ] 

Dhruvil Shah commented on KAFKA-7519:
-

[~howellbridger] thanks for the patch! Do you want to open a PR? I could help 
reviewing the code and look into any failures if needed.

> Transactional Ids Left in Pending State by TransactionStateManager During 
> Transactional Id Expiration Are Unusable
> --
>
> Key: KAFKA-7519
> URL: https://issues.apache.org/jira/browse/KAFKA-7519
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.0.0
>Reporter: Bridger Howell
>Assignee: Dhruvil Shah
>Priority: Critical
> Attachments: KAFKA-7519.patch, image-2018-10-18-13-02-22-371.png
>
>
>  
> After digging into a case where an exactly-once streams process was bizarrely 
> unable to process incoming data, we observed the following:
>  * StreamThreads stalling while creating a producer, eventually resulting in 
> no consumption by that streams process. Looking into those threads, we found 
> they were stuck in a loop, sending InitProducerIdRequests and always 
> receiving back the retriable error CONCURRENT_TRANSACTIONS and trying again. 
> These requests always had the same transactional id.
>  * After changing the streams process to not use exactly-once, it was able to 
> process messages with no problems.
>  * Alternatively, changing the applicationId for that streams process, it was 
> able to process with no problems.
>  * Every hour,  every broker would fail the task `transactionalId-expiration` 
> with the following error:
>  ** 
> {code:java}
> {"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing 
> transaction state transition to Dead while it already a pending sta
> te Dead
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262)
>     at kafka.coordinator
> .transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237)
>     at kafka.coordinator.transaction.TransactionStateManager$$a
> nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal
> a:151)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano
> nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151)
>     at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>     at
>  
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>     at kafka.coordinator.transaction.TransactionSt
> ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc
> ala:150)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a
> nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149)
>     at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable
> Like.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.foreach(List.scala:392)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.map(List.scala:296)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app
> ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149)
>     at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl
> eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142)
>     at scala.collection.Traversabl
> eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>     at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.
> scala:241)
>     at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
>     at scala.collection.mutable.HashMap$$anon
> fun$foreach$1.apply(HashMap.scala:130)
>     at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
>     at scala.collec
> tion.mutable.HashMap.foreachEntry(HashMap.scala:40)
>     at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
>     at scala.collecti
> on.TraversableLike$class.flatMap(TraversableLike.scala:241)
>     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>     a
> t 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Tr
> 

[jira] [Updated] (KAFKA-7320) Provide ability to disable auto topic creation in KafkaConsumer

2018-10-02 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-7320:

Fix Version/s: 2.1.0

> Provide ability to disable auto topic creation in KafkaConsumer
> ---
>
> Key: KAFKA-7320
> URL: https://issues.apache.org/jira/browse/KAFKA-7320
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Major
> Fix For: 2.1.0
>
>
> Consumers should have a configuration to control whether subscribing to 
> non-existent topics should automatically create the topic or not.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7320) Provide ability to disable auto topic creation in KafkaConsumer

2018-10-01 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634780#comment-16634780
 ] 

Dhruvil Shah commented on KAFKA-7320:
-

PR is ready for review: https://github.com/apache/kafka/pull/5542

> Provide ability to disable auto topic creation in KafkaConsumer
> ---
>
> Key: KAFKA-7320
> URL: https://issues.apache.org/jira/browse/KAFKA-7320
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Major
>
> Consumers should have a configuration to control whether subscribing to 
> non-existent topics should automatically create the topic or not.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7398) Transient test failure: SaslAuthenticatorFailureDelayTest.testInvalidPasswordSaslPlain

2018-09-11 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16610900#comment-16610900
 ] 

Dhruvil Shah commented on KAFKA-7398:
-

Thanks for reporting this, [~omkreddy]. I have a theory for why this could 
happen - I will submit a PR later this week.

> Transient test failure: 
> SaslAuthenticatorFailureDelayTest.testInvalidPasswordSaslPlain
> --
>
> Key: KAFKA-7398
> URL: https://issues.apache.org/jira/browse/KAFKA-7398
> Project: Kafka
>  Issue Type: Task
>  Components: unit tests
>Reporter: Manikumar
>Assignee: Dhruvil Shah
>Priority: Minor
>
> Observed below failure in one of the Jenkins job:
> {code}
>  java.lang.IllegalArgumentException: Setting the time to 1536609116303 while 
> current time 1536609116553 is newer; this is not allowed
>  Stacktrace
>  java.lang.IllegalArgumentException: Setting the time to 1536609116303 while 
> current time 1536609116553 is newer; this is not allowed
>  at org.apache.kafka.common.utils.MockTime.setCurrentTimeMs(MockTime.java:91)
>  at 
> org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:100)
>  at 
> org.apache.kafka.common.security.authenticator.SaslAuthenticatorFailureDelayTest.createAndCheckClientConnectionFailure(SaslAuthenticatorFailureDelayTest.java:223)
>  at 
> org.apache.kafka.common.security.authenticator.SaslAuthenticatorFailureDelayTest.createAndCheckClientAuthenticationFailure(SaslAuthenticatorFailureDelayTest.java:212)
>  at 
> org.apache.kafka.common.security.authenticator.SaslAuthenticatorFailureDelayTest.testInvalidPasswordSaslPlain(SaslAuthenticatorFailureDelayTest.java:115)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7398) Transient test failure: SaslAuthenticatorFailureDelayTest.testInvalidPasswordSaslPlain

2018-09-11 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah reassigned KAFKA-7398:
---

Assignee: Dhruvil Shah

> Transient test failure: 
> SaslAuthenticatorFailureDelayTest.testInvalidPasswordSaslPlain
> --
>
> Key: KAFKA-7398
> URL: https://issues.apache.org/jira/browse/KAFKA-7398
> Project: Kafka
>  Issue Type: Task
>  Components: unit tests
>Reporter: Manikumar
>Assignee: Dhruvil Shah
>Priority: Minor
>
> Observed below failure in one of the Jenkins job:
> {code}
>  java.lang.IllegalArgumentException: Setting the time to 1536609116303 while 
> current time 1536609116553 is newer; this is not allowed
>  Stacktrace
>  java.lang.IllegalArgumentException: Setting the time to 1536609116303 while 
> current time 1536609116553 is newer; this is not allowed
>  at org.apache.kafka.common.utils.MockTime.setCurrentTimeMs(MockTime.java:91)
>  at 
> org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:100)
>  at 
> org.apache.kafka.common.security.authenticator.SaslAuthenticatorFailureDelayTest.createAndCheckClientConnectionFailure(SaslAuthenticatorFailureDelayTest.java:223)
>  at 
> org.apache.kafka.common.security.authenticator.SaslAuthenticatorFailureDelayTest.createAndCheckClientAuthenticationFailure(SaslAuthenticatorFailureDelayTest.java:212)
>  at 
> org.apache.kafka.common.security.authenticator.SaslAuthenticatorFailureDelayTest.testInvalidPasswordSaslPlain(SaslAuthenticatorFailureDelayTest.java:115)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7385) Log cleaner crashes when empty batches are retained with idempotent or transactional producers

2018-09-07 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-7385:

Summary: Log cleaner crashes when empty batches are retained with 
idempotent or transactional producers  (was: Log cleaner crashes when empty 
headers are retained with idempotent or transactional producers)

> Log cleaner crashes when empty batches are retained with idempotent or 
> transactional producers
> --
>
> Key: KAFKA-7385
> URL: https://issues.apache.org/jira/browse/KAFKA-7385
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Major
>
> During log compaction, we retain an empty header if the batch contains the 
> last sequence number for a particular producer. When such headers are the 
> only messages retained, we do not update state such as `maxOffset` in 
> `MemoryRecords#filterTo` causing us to append these into the cleaned segment 
> with `largestOffset` = -1. This throws a `LogSegmentOffsetOverflowException` 
> for a segment that does not actually have an overflow. When we attempt to 
> split the segment, the log cleaner dies.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7385) Log cleaner crashes when empty headers are retained with idempotent or transactional producers

2018-09-07 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-7385:

Summary: Log cleaner crashes when empty headers are retained with 
idempotent or transactional producers  (was: Log compactor crashes when empty 
headers are retained with idempotent or transactional producers)

> Log cleaner crashes when empty headers are retained with idempotent or 
> transactional producers
> --
>
> Key: KAFKA-7385
> URL: https://issues.apache.org/jira/browse/KAFKA-7385
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Major
>
> During log compaction, we retain an empty header if the batch contains the 
> last sequence number for a particular producer. When such headers are the 
> only messages retained, we do not update state such as `maxOffset` in 
> `MemoryRecords#filterTo` causing us to append these into the cleaned segment 
> with `largestOffset` = -1. This throws a `LogSegmentOffsetOverflowException` 
> for a segment that does not actually have an overflow. When we attempt to 
> split the segment, the log cleaner dies.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7385) Log compactor crashes when empty headers are retained with idempotent or transactional producers

2018-09-07 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah reassigned KAFKA-7385:
---

Assignee: Dhruvil Shah

> Log compactor crashes when empty headers are retained with idempotent or 
> transactional producers
> 
>
> Key: KAFKA-7385
> URL: https://issues.apache.org/jira/browse/KAFKA-7385
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Major
>
> During log compaction, we retain an empty header if the batch contains the 
> last sequence number for a particular producer. When such headers are the 
> only messages retained, we do not update state such as `maxOffset` in 
> `MemoryRecords#filterTo` causing us to append these into the cleaned segment 
> with `largestOffset` = -1. This throws a `LogSegmentOffsetOverflowException` 
> for a segment that does not actually have an overflow. When we attempt to 
> split the segment, the log cleaner dies.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7385) Log compactor crashes when empty headers are retained with idempotent / transaction producers

2018-09-07 Thread Dhruvil Shah (JIRA)
Dhruvil Shah created KAFKA-7385:
---

 Summary: Log compactor crashes when empty headers are retained 
with idempotent / transaction producers
 Key: KAFKA-7385
 URL: https://issues.apache.org/jira/browse/KAFKA-7385
 Project: Kafka
  Issue Type: Bug
Reporter: Dhruvil Shah


During log compaction, we retain an empty header if the batch contains the last 
sequence number for a particular producer. When such headers are the only 
messages retained, we do not update state such as `maxOffset` in 
`MemoryRecords#filterTo` causing us to append these into the cleaned segment 
with `largestOffset` = -1. This throws a `LogSegmentOffsetOverflowException` 
for a segment that does not actually have an overflow. When we attempt to split 
the segment, the log cleaner dies.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7385) Log compactor crashes when empty headers are retained with idempotent or transactional producers

2018-09-07 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7385?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-7385:

Summary: Log compactor crashes when empty headers are retained with 
idempotent or transactional producers  (was: Log compactor crashes when empty 
headers are retained with idempotent / transaction producers)

> Log compactor crashes when empty headers are retained with idempotent or 
> transactional producers
> 
>
> Key: KAFKA-7385
> URL: https://issues.apache.org/jira/browse/KAFKA-7385
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dhruvil Shah
>Priority: Major
>
> During log compaction, we retain an empty header if the batch contains the 
> last sequence number for a particular producer. When such headers are the 
> only messages retained, we do not update state such as `maxOffset` in 
> `MemoryRecords#filterTo` causing us to append these into the cleaned segment 
> with `largestOffset` = -1. This throws a `LogSegmentOffsetOverflowException` 
> for a segment that does not actually have an overflow. When we attempt to 
> split the segment, the log cleaner dies.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7322) race between compaction thread and retention thread when changing topic cleanup policy

2018-08-22 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16589213#comment-16589213
 ] 

Dhruvil Shah commented on KAFKA-7322:
-

I think that makes sense, thanks for the confirmation.

> race between compaction thread and retention thread when changing topic 
> cleanup policy
> --
>
> Key: KAFKA-7322
> URL: https://issues.apache.org/jira/browse/KAFKA-7322
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
>
> The deletion thread will grab the log.lock when it tries to rename log 
> segment and schedule for actual deletion.
> The compaction thread only grabs the log.lock when it tries to replace the 
> original segments with the cleaned segment. The compaction thread doesn't 
> grab the log when it reads records from the original segments to build 
> offsetmap and new segments. As a result, if both deletion and compaction 
> threads work on the same log partition. We have a race condition. 
> This race happens when the topic cleanup policy is updated on the fly.  
> One case to hit this race condition:
> 1: topic clean up policy is "compact" initially 
> 2: log cleaner (compaction) thread picks up the partition for compaction and 
> still in progress
> 3: the topic clean up policy has been updated to "deletion"
> 4: retention thread pick up the topic partition and delete some old segments.
> 5: log cleaner thread reads from the deleted log and raise an IO exception. 
>  
> The proposed solution is to use "inprogress" map that cleaner manager has to 
> protect such a race.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7322) race between compaction thread and retention thread when changing topic cleanup policy

2018-08-22 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16588450#comment-16588450
 ] 

Dhruvil Shah commented on KAFKA-7322:
-

Does this issue still exist after KAFKA-7278 was fixed?

> race between compaction thread and retention thread when changing topic 
> cleanup policy
> --
>
> Key: KAFKA-7322
> URL: https://issues.apache.org/jira/browse/KAFKA-7322
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
>
> The deletion thread will grab the log.lock when it tries to rename log 
> segment and schedule for actual deletion.
> The compaction thread only grabs the log.lock when it tries to replace the 
> original segments with the cleaned segment. The compaction thread doesn't 
> grab the log when it reads records from the original segments to build 
> offsetmap and new segments. As a result, if both deletion and compaction 
> threads work on the same log partition. We have a race condition. 
> This race happens when the topic cleanup policy is updated on the fly.  
> One case to hit this race condition:
> 1: topic clean up policy is "compact" initially 
> 2: log cleaner (compaction) thread picks up the partition for compaction and 
> still in progress
> 3: the topic clean up policy has been updated to "deletion"
> 4: retention thread pick up the topic partition and delete some old segments.
> 5: log cleaner thread reads from the deleted log and raise an IO exception. 
>  
> The proposed solution is to use "inprogress" map that cleaner manager has to 
> protect such a race.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7282) Failed to read `log header` from file channel

2018-08-14 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579268#comment-16579268
 ] 

Dhruvil Shah edited comment on KAFKA-7282 at 8/14/18 5:48 PM:
--

For the EOFException, could you upload the particular affected log segment?


was (Author: dhruvilshah):
For the EOFException, could you upload the particular affected log segment? You 
can use the DumpLogSegments tool for this.

> Failed to read `log header` from file channel
> -
>
> Key: KAFKA-7282
> URL: https://issues.apache.org/jira/browse/KAFKA-7282
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.2, 1.1.1, 2.0.0
> Environment: Linux
>Reporter: Alastair Munro
>Priority: Major
>
> Full stack trace:
> {code:java}
> [2018-08-13 11:22:01,635] ERROR [ReplicaManager broker=2] Error processing 
> fetch operation on partition segmenter-evt-v1-14, offset 96745 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.KafkaException: java.io.EOFException: Failed to read 
> `log header` from file channel `sun.nio.ch.FileChannelImpl@6e6d8ddd`. 
> Expected to read 17 bytes, but reached end of file after reading 0 bytes. 
> Started read from position 25935.
> at 
> org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:40)
> at 
> org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:24)
> at 
> org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
> at 
> org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
> at 
> org.apache.kafka.common.record.FileRecords.searchForOffsetWithSize(FileRecords.java:286)
> at kafka.log.LogSegment.translateOffset(LogSegment.scala:254)
> at kafka.log.LogSegment.read(LogSegment.scala:277)
> at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1159)
> at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1114)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1837)
> at kafka.log.Log.read(Log.scala:1114)
> at 
> kafka.server.ReplicaManager.kafka$server$ReplicaManager$$read$1(ReplicaManager.scala:912)
> at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:974)
> at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:973)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:973)
> at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:802)
> at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:815)
> at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:678)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:107)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
> at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7282) Failed to read `log header` from file channel

2018-08-13 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579268#comment-16579268
 ] 

Dhruvil Shah commented on KAFKA-7282:
-

For the EOFException, could you upload the particular affected log segment? You 
can use the DumpLogSegments tool for this.

> Failed to read `log header` from file channel
> -
>
> Key: KAFKA-7282
> URL: https://issues.apache.org/jira/browse/KAFKA-7282
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.2, 1.1.1, 2.0.0
> Environment: Linux
>Reporter: Alastair Munro
>Priority: Major
>
> Full stack trace:
> {code:java}
> [2018-08-13 11:22:01,635] ERROR [ReplicaManager broker=2] Error processing 
> fetch operation on partition segmenter-evt-v1-14, offset 96745 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.KafkaException: java.io.EOFException: Failed to read 
> `log header` from file channel `sun.nio.ch.FileChannelImpl@6e6d8ddd`. 
> Expected to read 17 bytes, but reached end of file after reading 0 bytes. 
> Started read from position 25935.
> at 
> org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:40)
> at 
> org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:24)
> at 
> org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
> at 
> org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
> at 
> org.apache.kafka.common.record.FileRecords.searchForOffsetWithSize(FileRecords.java:286)
> at kafka.log.LogSegment.translateOffset(LogSegment.scala:254)
> at kafka.log.LogSegment.read(LogSegment.scala:277)
> at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1159)
> at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1114)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1837)
> at kafka.log.Log.read(Log.scala:1114)
> at 
> kafka.server.ReplicaManager.kafka$server$ReplicaManager$$read$1(ReplicaManager.scala:912)
> at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:974)
> at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:973)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:973)
> at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:802)
> at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:815)
> at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:678)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:107)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
> at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7282) Failed to read `log header` from file channel

2018-08-13 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579265#comment-16579265
 ] 

Dhruvil Shah commented on KAFKA-7282:
-

[~amunro] the UnknownServerException on broker looks a bit vexing. Is it 
possible that you have brokers running an older Kafka version while some others 
running a new one? If so, `inter.broker.protocol.version` must be set to the 
lowest version broker in your cluster until all brokers are upgraded, more 
details in the [upgrade 
documentation|https://kafka.apache.org/documentation/#upgrade].

> Failed to read `log header` from file channel
> -
>
> Key: KAFKA-7282
> URL: https://issues.apache.org/jira/browse/KAFKA-7282
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.2, 1.1.1, 2.0.0
> Environment: Linux
>Reporter: Alastair Munro
>Priority: Major
>
> Full stack trace:
> {code:java}
> [2018-08-13 11:22:01,635] ERROR [ReplicaManager broker=2] Error processing 
> fetch operation on partition segmenter-evt-v1-14, offset 96745 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.KafkaException: java.io.EOFException: Failed to read 
> `log header` from file channel `sun.nio.ch.FileChannelImpl@6e6d8ddd`. 
> Expected to read 17 bytes, but reached end of file after reading 0 bytes. 
> Started read from position 25935.
> at 
> org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:40)
> at 
> org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:24)
> at 
> org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
> at 
> org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
> at 
> org.apache.kafka.common.record.FileRecords.searchForOffsetWithSize(FileRecords.java:286)
> at kafka.log.LogSegment.translateOffset(LogSegment.scala:254)
> at kafka.log.LogSegment.read(LogSegment.scala:277)
> at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1159)
> at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1114)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1837)
> at kafka.log.Log.read(Log.scala:1114)
> at 
> kafka.server.ReplicaManager.kafka$server$ReplicaManager$$read$1(ReplicaManager.scala:912)
> at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:974)
> at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:973)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:973)
> at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:802)
> at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:815)
> at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:678)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:107)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
> at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6639) Follower may have sparse index if catching up

2018-07-25 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah reassigned KAFKA-6639:
---

Assignee: (was: Dhruvil Shah)

> Follower may have sparse index if catching up
> -
>
> Key: KAFKA-6639
> URL: https://issues.apache.org/jira/browse/KAFKA-6639
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Major
>
> When a follower is catching up, it may fetch considerably more data than the 
> size of the offset index interval. Since we only write to the index once for 
> every append, this could lead to a sparsely populated index, which may have a 
> performance impact if the follower ever becomes leader.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7185) getMatchingAcls throws StringIndexOutOfBoundsException for empty resource name

2018-07-19 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-7185:

Description: 
KIP-290 introduced a way to match ACLs based on prefix. Certain resource names 
like that for group id can be empty strings. When an empty string is passed 
into `getMatchingAcls`, it would throw a `StringIndexOutOfBoundsException` 
because of the following logic:
{noformat}
val prefixed = aclCache.range(
 Resource(resourceType, resourceName, PatternType.PREFIXED),
 Resource(resourceType, resourceName.substring(0, Math.min(1, 
resourceName.length)), PatternType.PREFIXED)
 )
 .filterKeys(resource => resourceName.startsWith(resource.name))
 .flatMap { case (resource, versionedAcls) => versionedAcls.acls }
 .toSet{noformat}
This is a regression introduced in 2.0.

  was:
KIP-290 introduced a way to match ACLs based on prefix. Certain resource names 
like that for group id can be empty strings. When an empty string is passed 
into `getMatchingAcls`, it would throw a `StringIndexOutOfBoundsException` 
because of the following logic:

```

val prefixed = aclCache.range(
 Resource(resourceType, resourceName, PatternType.PREFIXED),
 Resource(resourceType, resourceName.substring(0, Math.min(1, 
resourceName.length)), PatternType.PREFIXED)
)
 .filterKeys(resource => resourceName.startsWith(resource.name))
 .flatMap \{ case (resource, versionedAcls) => versionedAcls.acls }
 .toSet

```

This is a regression introduced in 2.0.


> getMatchingAcls throws StringIndexOutOfBoundsException for empty resource name
> --
>
> Key: KAFKA-7185
> URL: https://issues.apache.org/jira/browse/KAFKA-7185
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.0.0
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Blocker
>
> KIP-290 introduced a way to match ACLs based on prefix. Certain resource 
> names like that for group id can be empty strings. When an empty string is 
> passed into `getMatchingAcls`, it would throw a 
> `StringIndexOutOfBoundsException` because of the following logic:
> {noformat}
> val prefixed = aclCache.range(
>  Resource(resourceType, resourceName, PatternType.PREFIXED),
>  Resource(resourceType, resourceName.substring(0, Math.min(1, 
> resourceName.length)), PatternType.PREFIXED)
>  )
>  .filterKeys(resource => resourceName.startsWith(resource.name))
>  .flatMap { case (resource, versionedAcls) => versionedAcls.acls }
>  .toSet{noformat}
> This is a regression introduced in 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7185) getMatchingAcls throws StringIndexOutOfBoundsException for empty resource name

2018-07-19 Thread Dhruvil Shah (JIRA)
Dhruvil Shah created KAFKA-7185:
---

 Summary: getMatchingAcls throws StringIndexOutOfBoundsException 
for empty resource name
 Key: KAFKA-7185
 URL: https://issues.apache.org/jira/browse/KAFKA-7185
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.0.0
Reporter: Dhruvil Shah
Assignee: Dhruvil Shah


KIP-290 introduced a way to match ACLs based on prefix. Certain resource names 
like that for group id can be empty strings. When an empty string is passed 
into `getMatchingAcls`, it would throw a `StringIndexOutOfBoundsException` 
because of the following logic:

```

val prefixed = aclCache.range(
 Resource(resourceType, resourceName, PatternType.PREFIXED),
 Resource(resourceType, resourceName.substring(0, Math.min(1, 
resourceName.length)), PatternType.PREFIXED)
)
 .filterKeys(resource => resourceName.startsWith(resource.name))
 .flatMap \{ case (resource, versionedAcls) => versionedAcls.acls }
 .toSet

```

This is a regression introduced in 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7184) Kafka is going down with issue ERROR Failed to clean up log for __consumer_offsets-0 in dir /tmp/kafkadev2-logs due to IOException (kafka.server.LogDirFailureChannel)

2018-07-19 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16549336#comment-16549336
 ] 

Dhruvil Shah commented on KAFKA-7184:
-

[~smuddamsetty] after reaching the retention point, Kafka will try to delete 
segments which are past retention. It is possible that these files no longer 
exist because the /tmp directory was cleaned out before. Kafka requires a 
persistent store be used for the log directory so you should test with that.

> Kafka is going down with issue ERROR Failed to clean up log for 
> __consumer_offsets-0 in dir /tmp/kafkadev2-logs due to IOException 
> (kafka.server.LogDirFailureChannel)
> --
>
> Key: KAFKA-7184
> URL: https://issues.apache.org/jira/browse/KAFKA-7184
> Project: Kafka
>  Issue Type: Test
>  Components: admin, log
>Affects Versions: 1.1.0
>Reporter: Sandeep Muddamsetty
>Priority: Blocker
> Attachments: log-cleaner.log, server.log.2018-07-18-15
>
>
> Kafka is going down with issue ERROR Failed to clean up log for 
> __consumer_offsets-0 in dir /tmp/kafkadev2-logs due to IOException 
> (kafka.server.LogDirFailureChannel).
> This  error we are seeing very frequently for every  168 hours(7 days) where 
> the defualt value of log retention in kafka configuration . After modifying 
> it to 240 hours this thing is happening again after 240 hours . I ahve gone 
> thorugh some google groups some artiicles and have observed this is happening 
> in windows system but here i am facing this issue in linux system . Below are 
> my configuration details . 
> kafka_2.11-1.0.0
> OS: OEL 7.1



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7184) Kafka is going down with issue ERROR Failed to clean up log for __consumer_offsets-0 in dir /tmp/kafkadev2-logs due to IOException (kafka.server.LogDirFailureChannel)

2018-07-19 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah resolved KAFKA-7184.
-
Resolution: Not A Problem

> Kafka is going down with issue ERROR Failed to clean up log for 
> __consumer_offsets-0 in dir /tmp/kafkadev2-logs due to IOException 
> (kafka.server.LogDirFailureChannel)
> --
>
> Key: KAFKA-7184
> URL: https://issues.apache.org/jira/browse/KAFKA-7184
> Project: Kafka
>  Issue Type: Test
>  Components: admin, log
>Affects Versions: 1.1.0
>Reporter: Sandeep Muddamsetty
>Priority: Blocker
> Attachments: log-cleaner.log, server.log.2018-07-18-15
>
>
> Kafka is going down with issue ERROR Failed to clean up log for 
> __consumer_offsets-0 in dir /tmp/kafkadev2-logs due to IOException 
> (kafka.server.LogDirFailureChannel).
> This  error we are seeing very frequently for every  168 hours(7 days) where 
> the defualt value of log retention in kafka configuration . After modifying 
> it to 240 hours this thing is happening again after 240 hours . I ahve gone 
> thorugh some google groups some artiicles and have observed this is happening 
> in windows system but here i am facing this issue in linux system . Below are 
> my configuration details . 
> kafka_2.11-1.0.0
> OS: OEL 7.1



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7184) Kafka is going down with issue ERROR Failed to clean up log for __consumer_offsets-0 in dir /tmp/kafkadev2-logs due to IOException (kafka.server.LogDirFailureChannel)

2018-07-19 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16548907#comment-16548907
 ] 

Dhruvil Shah commented on KAFKA-7184:
-

What kind of storage are you using? Could you please upload server.log and 
log_cleaner.log from around the time of the error.

> Kafka is going down with issue ERROR Failed to clean up log for 
> __consumer_offsets-0 in dir /tmp/kafkadev2-logs due to IOException 
> (kafka.server.LogDirFailureChannel)
> --
>
> Key: KAFKA-7184
> URL: https://issues.apache.org/jira/browse/KAFKA-7184
> Project: Kafka
>  Issue Type: Test
>  Components: admin, log
>Affects Versions: 1.1.0
>Reporter: Sandeep Muddamsetty
>Priority: Blocker
>
> Kafka is going down with issue ERROR Failed to clean up log for 
> __consumer_offsets-0 in dir /tmp/kafkadev2-logs due to IOException 
> (kafka.server.LogDirFailureChannel).
> This  error we are seeing very frequently for every  168 hours(7 days) where 
> the defualt value of log retention in kafka configuration . After modifying 
> it to 240 hours this thing is happening again after 240 hours . I ahve gone 
> thorugh some google groups some artiicles and have observed this is happening 
> in windows system but here i am facing this issue in linux system . Below are 
> my configuration details . 
> kafka_2.11-1.0.0
> OS: OEL 7.1



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7130) EOFException after rolling log segment

2018-07-06 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16535102#comment-16535102
 ] 

Dhruvil Shah commented on KAFKA-7130:
-

Yes, you could use a smaller segment size. I would recommend using the default 
of 1GB unless there is a good reason to change it.

I think we should include the fix in 1.1.1. [~hachikuji], what do you think?

> EOFException after rolling log segment
> --
>
> Key: KAFKA-7130
> URL: https://issues.apache.org/jira/browse/KAFKA-7130
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 1.1.0
>Reporter: Karsten Schnitter
>Priority: Major
> Attachments: dump-001311940075.index.bz2, 
> dump-001311940075.log.bz2
>
>
> When rolling a log segment one of our Kafka cluster got an immediate read 
> error on the same partition. This lead to a flood of log messages containing 
> the corresponding stacktraces. Data was still appended to the partition but 
> consumers were unable to read from that partition. Reason for the exception 
> is unclear.
> {noformat}
> [2018-07-02 23:53:32,732] INFO [Log partition=ingestion-3, 
> dir=/var/vcap/store/kafka] Rolled new log segment at offset 971865991 in 1 
> ms. (kafka.log.Log)
> [2018-07-02 23:53:32,739] INFO [ProducerStateManager partition=ingestion-3] 
> Writing producer snapshot at offset 971865991 (kafka.log.ProducerStateManager)
> [2018-07-02 23:53:32,739] INFO [Log partition=ingestion-3, 
> dir=/var/vcap/store/kafka] Rolled new log segment at offset 971865991 in 1 
> ms. (kafka.log.Log)
> [2018-07-02 23:53:32,750] ERROR [ReplicaManager broker=1] Error processing 
> fetch operation on partition ingestion-3, offset 971865977 
> (kafka.server.ReplicaManager)
> Caused by: java.io.EOFException: Failed to read `log header` from file 
> channel `sun.nio.ch.FileChannelImpl@2e0e8810`. Expected to read 17 bytes, but 
> reached end of file after reading 0 bytes. Started read from position 
> 2147483643.
> {noformat}
> We mitigated the issue by stopping the affected node and deleting the 
> corresponding directory. Once the partition was recreated for the replica (we 
> use replication-factor 2) the other replica experienced the same problem. We 
> mitigated likewise.
> To us it is unclear, what caused this issue. Can you help us in finding the 
> root cause of this problem?
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7130) EOFException after rolling log segment

2018-07-05 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16533997#comment-16533997
 ] 

Dhruvil Shah commented on KAFKA-7130:
-

Thanks for the information, [~kschnitter]. There was an issue with potential 
overflow in the logic to read messages from a segment which could cause us to 
fall outside the maximum allowable segment size. 
[KAFKA-6292|https://issues.apache.org/jira/browse/KAFKA-6292] fixed this issue, 
and I think what you are running into is very similar to that.

The interesting part though is how the segment got to be this big in the first 
place. Is either of `log.segment.bytes` or `segment.bytes` set to a non-default 
value for the broker / topic? Is compaction enabled for this topic?

> EOFException after rolling log segment
> --
>
> Key: KAFKA-7130
> URL: https://issues.apache.org/jira/browse/KAFKA-7130
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 1.1.0
>Reporter: Karsten Schnitter
>Priority: Major
> Attachments: dump-001311940075.index.bz2, 
> dump-001311940075.log.bz2
>
>
> When rolling a log segment one of our Kafka cluster got an immediate read 
> error on the same partition. This lead to a flood of log messages containing 
> the corresponding stacktraces. Data was still appended to the partition but 
> consumers were unable to read from that partition. Reason for the exception 
> is unclear.
> {noformat}
> [2018-07-02 23:53:32,732] INFO [Log partition=ingestion-3, 
> dir=/var/vcap/store/kafka] Rolled new log segment at offset 971865991 in 1 
> ms. (kafka.log.Log)
> [2018-07-02 23:53:32,739] INFO [ProducerStateManager partition=ingestion-3] 
> Writing producer snapshot at offset 971865991 (kafka.log.ProducerStateManager)
> [2018-07-02 23:53:32,739] INFO [Log partition=ingestion-3, 
> dir=/var/vcap/store/kafka] Rolled new log segment at offset 971865991 in 1 
> ms. (kafka.log.Log)
> [2018-07-02 23:53:32,750] ERROR [ReplicaManager broker=1] Error processing 
> fetch operation on partition ingestion-3, offset 971865977 
> (kafka.server.ReplicaManager)
> Caused by: java.io.EOFException: Failed to read `log header` from file 
> channel `sun.nio.ch.FileChannelImpl@2e0e8810`. Expected to read 17 bytes, but 
> reached end of file after reading 0 bytes. Started read from position 
> 2147483643.
> {noformat}
> We mitigated the issue by stopping the affected node and deleting the 
> corresponding directory. Once the partition was recreated for the replica (we 
> use replication-factor 2) the other replica experienced the same problem. We 
> mitigated likewise.
> To us it is unclear, what caused this issue. Can you help us in finding the 
> root cause of this problem?
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7130) EOFException after rolling log segment

2018-07-04 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16533035#comment-16533035
 ] 

Dhruvil Shah commented on KAFKA-7130:
-

[~kschnitter] it would be useful if you could save the affected partition (or 
at least the segment) while we investigate this.

As a first step, could you please share information about all segments for this 
partition, including the size of each of them? `ls -l` output of the directory 
should be sufficient.

Would you be willing to share metadata dump of the affected segment? This will 
tell us what the segment state is like. You could use `bin/kafka-run-class.sh 
kafka.tools.DumpLogSegments`.

> EOFException after rolling log segment
> --
>
> Key: KAFKA-7130
> URL: https://issues.apache.org/jira/browse/KAFKA-7130
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 1.1.0
>Reporter: Karsten Schnitter
>Priority: Major
>
> When rolling a log segment one of our Kafka cluster got an immediate read 
> error on the same partition. This lead to a flood of log messages containing 
> the corresponding stacktraces. Data was still appended to the partition but 
> consumers were unable to read from that partition. Reason for the exception 
> is unclear.
> {noformat}
> [2018-07-02 23:53:32,732] INFO [Log partition=ingestion-3, 
> dir=/var/vcap/store/kafka] Rolled new log segment at offset 971865991 in 1 
> ms. (kafka.log.Log)
> [2018-07-02 23:53:32,739] INFO [ProducerStateManager partition=ingestion-3] 
> Writing producer snapshot at offset 971865991 (kafka.log.ProducerStateManager)
> [2018-07-02 23:53:32,739] INFO [Log partition=ingestion-3, 
> dir=/var/vcap/store/kafka] Rolled new log segment at offset 971865991 in 1 
> ms. (kafka.log.Log)
> [2018-07-02 23:53:32,750] ERROR [ReplicaManager broker=1] Error processing 
> fetch operation on partition ingestion-3, offset 971865977 
> (kafka.server.ReplicaManager)
> Caused by: java.io.EOFException: Failed to read `log header` from file 
> channel `sun.nio.ch.FileChannelImpl@2e0e8810`. Expected to read 17 bytes, but 
> reached end of file after reading 0 bytes. Started read from position 
> 2147483643.
> {noformat}
> We mitigated the issue by stopping the affected node and deleting the 
> corresponding directory. Once the partition was recreated for the replica (we 
> use replication-factor 2) the other replica experienced the same problem. We 
> mitigated likewise.
> To us it is unclear, what caused this issue. Can you help us in finding the 
> root cause of this problem?
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-3886) Consumer should handle wakeups while rebalancing more gracefully

2018-07-03 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah reassigned KAFKA-3886:
---

Assignee: Dhruvil Shah  (was: Jason Gustafson)

> Consumer should handle wakeups while rebalancing more gracefully
> 
>
> Key: KAFKA-3886
> URL: https://issues.apache.org/jira/browse/KAFKA-3886
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.0.0
>Reporter: Jason Gustafson
>Assignee: Dhruvil Shah
>Priority: Major
>
> If the user calls wakeup() while a rebalance in progress, we currently lose 
> track of the state of that rebalance. In the worst case, this can result in 
> an additional unneeded rebalance when the user calls poll() again. 
> The other thing that can happen is that the rebalance could complete inside 
> another blocking call (e.g. {{commitSync()}}). There may be scenarios where 
> this can cause us to commit offsets outside the generation an assignment is 
> valid for. For example: 
> 1. Consumer is initially assigned partition A
> 2. The consumer starts rebalancing, but is interrupted with a call to 
> wakeup().
> 3. User calls commitSync with offsets (A, 5)
> 4. Before offset commit is sent, an interrupted rebalance completes and 
> changes the assignment to include only partition B.
> 5. Now we proceed with the unsafe offset commit on partition A.
> In this case, we should probably ensure that it is not possible to commit 
> offsets after an assignment has been revoked. Other cases, such as 
> position(), may be handled similarly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6098) Delete and Re-create topic operation could result in race condition

2018-06-25 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16522943#comment-16522943
 ] 

Dhruvil Shah commented on KAFKA-6098:
-

I don't think we can provide any formal guarantees for these APIs if topics are 
being created and deleted concurrently. From the discussion so far, it looks 
like we want to be able to define the semantics for what happens in a single 
threaded application trying to delete, list, create topics, is this correct?

One way to fix this problem could be to have deleteTopics return success only 
after the topic has been completely deleted (i.e. the topic znode has been 
deleted). listTopics could continue returning the topic information for this 
duration. Would this address the issue?

> Delete and Re-create topic operation could result in race condition
> ---
>
> Key: KAFKA-6098
> URL: https://issues.apache.org/jira/browse/KAFKA-6098
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Dhruvil Shah
>Priority: Major
>  Labels: reliability
>
> Here is the following process to re-produce this issue:
> 1. Delete a topic using the delete topic request.
> 2. Confirm the topic is deleted using the list topics request.
> 3. Create the topic using the create topic request.
> In step 3) a race condition can happen that the response returns a 
> {{TOPIC_ALREADY_EXISTS}} error code, indicating the topic has already existed.
> The root cause of the above issue is in the {{TopicDeletionManager}} class:
> {code}
> controller.partitionStateMachine.handleStateChanges(partitionsForDeletedTopic.toSeq,
>  OfflinePartition)
> controller.partitionStateMachine.handleStateChanges(partitionsForDeletedTopic.toSeq,
>  NonExistentPartition)
> topicsToBeDeleted -= topic
> partitionsToBeDeleted.retain(_.topic != topic)
> kafkaControllerZkUtils.deleteTopicZNode(topic)
> kafkaControllerZkUtils.deleteTopicConfigs(Seq(topic))
> kafkaControllerZkUtils.deleteTopicDeletions(Seq(topic))
> controllerContext.removeTopic(topic)
> {code}
> I.e. it first update the broker's metadata cache through the ISR and metadata 
> update request, then delete the topic zk path, and then delete the 
> topic-deletion zk path. However, upon handling the create topic request, the 
> broker will simply try to write to the topic zk path directly. Hence there is 
> a race condition that between brokers update their metadata cache (hence list 
> topic request not returning this topic anymore) and zk path for the topic be 
> deleted (hence the create topic succeed).
> The reason this problem could be exposed, is through current handling logic 
> of the create topic response, most of which takes {{TOPIC_ALREADY_EXISTS}} as 
> "OK" and moves on, and the zk path will be deleted later, hence leaving the 
> topic to be not created at all:
> https://github.com/apache/kafka/blob/249e398bf84cdd475af6529e163e78486b43c570/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java#L221
> https://github.com/apache/kafka/blob/1a653c813c842c0b67f26fb119d7727e272cf834/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java#L232
> Looking at the code history, it seems this race condition always exist, but 
> testing on trunk / 1.0 with the above steps it is more likely to happen than 
> before. I wonder if the ZK async calls have an effect here. cc [~junrao] 
> [~onurkaraman]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >