[jira] [Commented] (KAFKA-12713) Report "REAL" follower/consumer fetch latency

2021-07-29 Thread Ming Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17390068#comment-17390068
 ] 

Ming Liu commented on KAFKA-12713:
--

Certainly, [~kaihuang], please take the ownership of the KIP and the PR. 

> Report "REAL" follower/consumer fetch latency
> -
>
> Key: KAFKA-12713
> URL: https://issues.apache.org/jira/browse/KAFKA-12713
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ming Liu
>Priority: Major
>
> The fetch latency is an important metrics to monitor for the cluster 
> performance. With ACK=ALL, the produce latency is affected primarily by 
> broker fetch latency.
> However, currently the reported fetch latency didn't reflect the true fetch 
> latency because it sometimes need to stay in purgatory and wait for 
> replica.fetch.wait.max.ms when data is not available. This greatly affect the 
> real P50, P99 etc. 
> I like to propose a KIP to be able track the real fetch latency for both 
> broker follower and consumer. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12713) Report "REAL" follower/consumer fetch latency

2021-05-11 Thread Ming Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17342778#comment-17342778
 ] 

Ming Liu commented on KAFKA-12713:
--

Here is PR [https://github.com/apache/kafka/pull/10674/files] for the KIP. 
Please comment.

> Report "REAL" follower/consumer fetch latency
> -
>
> Key: KAFKA-12713
> URL: https://issues.apache.org/jira/browse/KAFKA-12713
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ming Liu
>Priority: Major
>
> The fetch latency is an important metrics to monitor for the cluster 
> performance. With ACK=ALL, the produce latency is affected primarily by 
> broker fetch latency.
> However, currently the reported fetch latency didn't reflect the true fetch 
> latency because it sometimes need to stay in purgatory and wait for 
> replica.fetch.wait.max.ms when data is not available. This greatly affect the 
> real P50, P99 etc. 
> I like to propose a KIP to be able track the real fetch latency for both 
> broker follower and consumer. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12713) Report "REAL" follower/consumer fetch latency

2021-04-26 Thread Ming Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331590#comment-17331590
 ] 

Ming Liu edited comment on KAFKA-12713 at 4/27/21, 3:25 AM:


The idea is:
 # Add waitTimeMs in FetchResponse
 # In processResponseCallback() of handleFetchRequest,  set the waitTimeMs as 
the time spent in purgatory.
 # In FetcherStats, we will add a new meter to track the fetch latency, by 
deduct the waitTimeMs from the latency. 

Also, in FetchLatency, we should also report a time called TotalEffectiveTime = 
TotalTime-RemoteTime. 

Created KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-736%3A+Track+the+real+fetch+latency


was (Author: mingaliu):
The idea is:
 #  Add waitTimeMs in FetchResponse
 #  In processResponseCallback() of handleFetchRequest,  set the waitTimeMs as 
the time spent in purgatory.
 #  In FetcherStats, we will add a new meter to track the fetch latency, by 
deduct the waitTimeMs from the latency. 

Also, in FetchLatency, we should also report a time called TotalEffectiveTime = 
TotalTime-RemoteTime. 

Let me know for any suggestion/feedback.  I like to propose a KIP on that 
change. 

> Report "REAL" follower/consumer fetch latency
> -
>
> Key: KAFKA-12713
> URL: https://issues.apache.org/jira/browse/KAFKA-12713
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ming Liu
>Priority: Major
>
> The fetch latency is an important metrics to monitor for the cluster 
> performance. With ACK=ALL, the produce latency is affected primarily by 
> broker fetch latency.
> However, currently the reported fetch latency didn't reflect the true fetch 
> latency because it sometimes need to stay in purgatory and wait for 
> replica.fetch.wait.max.ms when data is not available. This greatly affect the 
> real P50, P99 etc. 
> I like to propose a KIP to be able track the real fetch latency for both 
> broker follower and consumer. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12713) Report "REAL" follower/consumer fetch latency

2021-04-26 Thread Ming Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331590#comment-17331590
 ] 

Ming Liu edited comment on KAFKA-12713 at 4/27/21, 12:17 AM:
-

The idea is:
 #  Add waitTimeMs in FetchResponse
 #  In processResponseCallback() of handleFetchRequest,  set the waitTimeMs as 
the time spent in purgatory.
 #  In FetcherStats, we will add a new meter to track the fetch latency, by 
deduct the waitTimeMs from the latency. 

Also, in FetchLatency, we should also report a time called TotalEffectiveTime = 
TotalTime-RemoteTime. 

Let me know for any suggestion/feedback.  I like to propose a KIP on that 
change. 


was (Author: mingaliu):
The idea is:

0. Add waitTimeMs in Request()

1. In delayedOperation DelayedFetch class, add some code to track the actual 
wait time. 

2. In processResponseCallback() of handleFetchRequest, we can add additional 
parameter of waitTimeMs invoked from DelayedFetch.  It will set 
request.waitTimeMs.

3. In updateRequestMetrics() function, if waitTimeMs is not zero, we will 
deduct that out of RemoteTime and TotalTime.

Let me know for any suggestion/feedback.  I like to propose a KIP on that 
change. 

> Report "REAL" follower/consumer fetch latency
> -
>
> Key: KAFKA-12713
> URL: https://issues.apache.org/jira/browse/KAFKA-12713
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ming Liu
>Priority: Major
>
> The fetch latency is an important metrics to monitor for the cluster 
> performance. With ACK=ALL, the produce latency is affected primarily by 
> broker fetch latency.
> However, currently the reported fetch latency didn't reflect the true fetch 
> latency because it sometimes need to stay in purgatory and wait for 
> replica.fetch.wait.max.ms when data is not available. This greatly affect the 
> real P50, P99 etc. 
> I like to propose a KIP to be able track the real fetch latency for both 
> broker follower and consumer. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12713) Report "REAL" follower/consumer fetch latency

2021-04-26 Thread Ming Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ming Liu updated KAFKA-12713:
-
Summary: Report "REAL" follower/consumer fetch latency  (was: Report "REAL" 
broker/consumer fetch latency)

> Report "REAL" follower/consumer fetch latency
> -
>
> Key: KAFKA-12713
> URL: https://issues.apache.org/jira/browse/KAFKA-12713
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ming Liu
>Priority: Major
>
> The fetch latency is an important metrics to monitor for the cluster 
> performance. With ACK=ALL, the produce latency is affected primarily by 
> broker fetch latency.
> However, currently the reported fetch latency didn't reflect the true fetch 
> latency because it sometimes need to stay in purgatory and wait for 
> replica.fetch.wait.max.ms when data is not available. This greatly affect the 
> real P50, P99 etc. 
> I like to propose a KIP to be able track the real fetch latency for both 
> broker follower and consumer. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12713) Report "REAL" broker/consumer fetch latency

2021-04-25 Thread Ming Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331590#comment-17331590
 ] 

Ming Liu edited comment on KAFKA-12713 at 4/25/21, 11:16 PM:
-

The idea is:

0. Add waitTimeMs in Request()

1. In delayedOperation DelayedFetch class, add some code to track the actual 
wait time. 

2. In processResponseCallback() of handleFetchRequest, we can add additional 
parameter of waitTimeMs invoked from DelayedFetch.  It will set 
request.waitTimeMs.

3. In updateRequestMetrics() function, if waitTimeMs is not zero, we will 
deduct that out of RemoteTime and TotalTime.

Let me know for any suggestion/feedback.  I like to propose a KIP on that 
change. 


was (Author: mingaliu):
The idea I am trying right now is:
 1. Add waitTimeMS in FetchResponse.
 2. If the fetch has to wait in purgatory due to either 
replica.fetch.wait.max.ms or fetch.min.bytes, then it will fill the waitTimeMS 
in FetchResponse.
 3. In updateRequestMetrics() function, we will special-process the Fetch 
response, and remove the waitTimeMS out of RemoteTime and TotalTime.

Let me know for any suggestion/feedback.  I like to propose a KIP on that 
change. 

> Report "REAL" broker/consumer fetch latency
> ---
>
> Key: KAFKA-12713
> URL: https://issues.apache.org/jira/browse/KAFKA-12713
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ming Liu
>Priority: Major
>
> The fetch latency is an important metrics to monitor for the cluster 
> performance. With ACK=ALL, the produce latency is affected primarily by 
> broker fetch latency.
> However, currently the reported fetch latency didn't reflect the true fetch 
> latency because it sometimes need to stay in purgatory and wait for 
> replica.fetch.wait.max.ms when data is not available. This greatly affect the 
> real P50, P99 etc. 
> I like to propose a KIP to be able track the real fetch latency for both 
> broker follower and consumer. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12713) Report "REAL" broker/consumer fetch latency

2021-04-25 Thread Ming Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331590#comment-17331590
 ] 

Ming Liu edited comment on KAFKA-12713 at 4/25/21, 5:33 PM:


The idea I am trying right now is:
 1. Add waitTimeMS in FetchResponse.
 2. If the fetch has to wait in purgatory due to either 
replica.fetch.wait.max.ms or fetch.min.bytes, then it will fill the waitTimeMS 
in FetchResponse.
 3. In updateRequestMetrics() function, we will special-process the Fetch 
response, and remove the waitTimeMS out of RemoteTime and TotalTime.

Let me know for any suggestion/feedback.  I like to propose a KIP on that 
change. 


was (Author: mingaliu):
The idea is like this,  we can:
1. Add waitTimeMS in FetchResponse.
2. If the fetch has to wait in purgatory due to either 
replica.fetch.wait.max.ms or fetch.min.bytes, then it will fill the waitTimeMS 
in FetchResponse.
3. In updateRequestMetrics() function, we will special-process the Fetch 
response, and remove the waitTimeMS out of RemoteTime and TotalTime.

> Report "REAL" broker/consumer fetch latency
> ---
>
> Key: KAFKA-12713
> URL: https://issues.apache.org/jira/browse/KAFKA-12713
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ming Liu
>Priority: Major
>
> The fetch latency is an important metrics to monitor for the cluster 
> performance. With ACK=ALL, the produce latency is affected primarily by 
> broker fetch latency.
> However, currently the reported fetch latency didn't reflect the true fetch 
> latency because it sometimes need to stay in purgatory and wait for 
> replica.fetch.wait.max.ms when data is not available. This greatly affect the 
> real P50, P99 etc. 
> I like to propose a KIP to be able track the real fetch latency for both 
> broker follower and consumer. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12713) Report "REAL" broker/consumer fetch latency

2021-04-25 Thread Ming Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17331590#comment-17331590
 ] 

Ming Liu commented on KAFKA-12713:
--

The idea is like this,  we can:
1. Add waitTimeMS in FetchResponse.
2. If the fetch has to wait in purgatory due to either 
replica.fetch.wait.max.ms or fetch.min.bytes, then it will fill the waitTimeMS 
in FetchResponse.
3. In updateRequestMetrics() function, we will special-process the Fetch 
response, and remove the waitTimeMS out of RemoteTime and TotalTime.

> Report "REAL" broker/consumer fetch latency
> ---
>
> Key: KAFKA-12713
> URL: https://issues.apache.org/jira/browse/KAFKA-12713
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ming Liu
>Priority: Major
>
> The fetch latency is an important metrics to monitor for the cluster 
> performance. With ACK=ALL, the produce latency is affected primarily by 
> broker fetch latency.
> However, currently the reported fetch latency didn't reflect the true fetch 
> latency because it sometimes need to stay in purgatory and wait for 
> replica.fetch.wait.max.ms when data is not available. This greatly affect the 
> real P50, P99 etc. 
> I like to propose a KIP to be able track the real fetch latency for both 
> broker follower and consumer. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12713) Report "REAL" broker/consumer fetch latency

2021-04-24 Thread Ming Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ming Liu updated KAFKA-12713:
-
Summary: Report "REAL" broker/consumer fetch latency  (was: Report 
broker/consumer fetch latency more accurately)

> Report "REAL" broker/consumer fetch latency
> ---
>
> Key: KAFKA-12713
> URL: https://issues.apache.org/jira/browse/KAFKA-12713
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ming Liu
>Priority: Major
>
> The fetch latency is an important metrics to monitor for the cluster 
> performance. With ACK=ALL, the produce latency is affected primarily by 
> broker fetch latency.
> However, currently the reported fetch latency didn't reflect the true fetch 
> latency because it sometimes need to stay in purgatory and wait for 
> replica.fetch.wait.max.ms when data is not available. This greatly affect the 
> real P50, P99 etc. 
> I like to propose a KIP to be able track the real fetch latency for both 
> broker follower and consumer. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12713) Report broker/consumer fetch latency more accurately

2021-04-24 Thread Ming Liu (Jira)
Ming Liu created KAFKA-12713:


 Summary: Report broker/consumer fetch latency more accurately
 Key: KAFKA-12713
 URL: https://issues.apache.org/jira/browse/KAFKA-12713
 Project: Kafka
  Issue Type: Bug
Reporter: Ming Liu


The fetch latency is an important metrics to monitor for the cluster 
performance. With ACK=ALL, the produce latency is affected primarily by broker 
fetch latency.

However, currently the reported fetch latency didn't reflect the true fetch 
latency because it sometimes need to stay in purgatory and wait for 
replica.fetch.wait.max.ms when data is not available. This greatly affect the 
real P50, P99 etc. 



I like to propose a KIP to be able track the real fetch latency for both broker 
follower and consumer. 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2020-11-30 Thread Ming Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17240908#comment-17240908
 ] 

Ming Liu commented on KAFKA-7918:
-

We also had the code to optimize the changelog supporting it.  We only push to 
changelog at the certain interval (and collapse the data with same key to save 
the serialization cost). 

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.3.0
>
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters

2020-11-23 Thread Ming Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17237861#comment-17237861
 ] 

Ming Liu commented on KAFKA-7918:
-

[~ableegoldman], [~guozhang] [~mjsax],  one good benefit of generics based 
inMemoryStore is to save the serialization cost (one great benefit of using 
inMemoryStore).  In the matter of fact, we do have application relying on such 
inMemoryStore capability to improve the performance.  With this change (when we 
try upgrading from 2.2 to 2.5), a few of our applications can't keep it up 
anymore.  

We did writing code to create InMemoryKeyValueStore directly (bypass the 
Store.inMemoryKeyValueStore).   I think we should expose K, V based memory 
store directly, thoughts?

> Streams store cleanup: inline byte-store generic parameters
> ---
>
> Key: KAFKA-7918
> URL: https://issues.apache.org/jira/browse/KAFKA-7918
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: A. Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.3.0
>
>
> Currently, the fundamental layer of stores in Streams is the "bytes store".
> The easiest way to identify this is in 
> `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a 
> `XXBytesStoreSupplier`. 
> We provide several implementations of these bytes stores, typically an 
> in-memory one and a persistent one (aka RocksDB).
> Inside these bytes stores, the key is always `Bytes` and the value is always 
> `byte[]` (serialization happens at a higher level). However, the store 
> implementations are generically typed, just `K` and `V`.
> This is good for flexibility, but it makes the code a little harder to 
> understand. I think that we used to do serialization at a lower level, so the 
> generics are a hold-over from that.
> It would simplify the code if we just inlined the actual k/v types and maybe 
> even renamed the classes from (e.g.) `InMemoryKeyValueStore` to 
> `InMemoryKeyValueBytesStore`, and so forth.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8733) Offline partitions occur when leader's disk is slow in reads while responding to follower fetch requests.

2020-10-24 Thread Ming Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17220139#comment-17220139
 ] 

Ming Liu commented on KAFKA-8733:
-

One observation is after moving to 2.5 (so 
_[replica.lag.time.max.ms|http://replica.lag.time.max.ms/]_  is changed from 10 
second to 30 seconds), Offline partitions when leader's disk went bad occurs 
much less frequently.

> Offline partitions occur when leader's disk is slow in reads while responding 
> to follower fetch requests.
> -
>
> Key: KAFKA-8733
> URL: https://issues.apache.org/jira/browse/KAFKA-8733
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.2, 2.4.0
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Critical
> Attachments: weighted-io-time-2.png, wio-time.png
>
>
> We found offline partitions issue multiple times on some of the hosts in our 
> clusters. After going through the broker logs and hosts’s disk stats, it 
> looks like this issue occurs whenever the read/write operations take more 
> time on that disk. In a particular case where read time is more than the 
> replica.lag.time.max.ms, follower replicas will be out of sync as their 
> earlier fetch requests are stuck while reading the local log and their fetch 
> status is not yet updated as mentioned in the below code of `ReplicaManager`. 
> If there is an issue in reading the data from the log for a duration more 
> than replica.lag.time.max.ms then all the replicas will be out of sync and 
> partition becomes offline if min.isr.replicas > 1 and unclean.leader.election 
> is false.
>  
> {code:java}
> def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
>   val result = readFromLocalLog( // this call took more than 
> `replica.lag.time.max.ms`
>   replicaId = replicaId,
>   fetchOnlyFromLeader = fetchOnlyFromLeader,
>   readOnlyCommitted = fetchOnlyCommitted,
>   fetchMaxBytes = fetchMaxBytes,
>   hardMaxBytesLimit = hardMaxBytesLimit,
>   readPartitionInfo = fetchInfos,
>   quota = quota,
>   isolationLevel = isolationLevel)
>   if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // 
> fetch time gets updated here, but mayBeShrinkIsr should have been already 
> called and the replica is removed from isr
>  else result
>  }
> val logReadResults = readFromLog()
> {code}
> Attached the graphs of disk weighted io time stats when this issue occurred.
> I will raise [KIP-501|https://s.apache.org/jhbpn] describing options on how 
> to handle this scenario.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10398) Intra-broker disk move failed with onPartitionFenced()

2020-08-13 Thread Ming Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177132#comment-17177132
 ] 

Ming Liu commented on KAFKA-10398:
--

[~lukech], that is another problem blocking the disk move. I wonder whether you 
are aware of it.

> Intra-broker disk move failed with onPartitionFenced()
> --
>
> Key: KAFKA-10398
> URL: https://issues.apache.org/jira/browse/KAFKA-10398
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.5.0
>Reporter: Ming Liu
>Priority: Major
>
> When I tried the intra-broker disk move on 2.5.0, it always failed quickly in 
> onPartitionFenced() failure. That is all the log for ReplicaAlterLogManager:
> {code:java}
>  [2020-06-03 04:52:17,541] INFO [ReplicaAlterLogDirsManager on broker 5] 
> Added fetcher to broker BrokerEndPoint(id=5, host=localhost:-1) for 
> partitions Map(author_id_enrichment_changelog_staging-302 -> (offset=0, 
> leaderEpoch=45)) (kafka.server.ReplicaAlterLogDirsManager)
>  [2020-06-03 04:52:17,546] INFO [ReplicaAlterLogDirsThread-5]: Starting 
> (kafka.server.ReplicaAlterLogDirsThread)
>  [2020-06-03 04:52:17,546] INFO [ReplicaAlterLogDirsThread-5]: Truncating 
> partition author_id_enrichment_changelog_staging-302 to local high watermark 
> 0 (kafka.server.ReplicaAlterLogDirsThread)
>  [2020-06-03 04:52:17,547] INFO [ReplicaAlterLogDirsThread-5]: 
> Beginning/resuming copy of partition 
> author_id_enrichment_changelog_staging-302 from offset 0. Including this 
> partition, there are 1 remaining partitions to copy by this thread. 
> (kafka.server.ReplicaAlterLogDirsThread)
>  [2020-06-03 04:52:17,547] WARN [ReplicaAlterLogDirsThread-5]: Reset fetch 
> offset for partition author_id_enrichment_changelog_staging-302 from 0 to 
> current leader's start offset 1656927679 
> (kafka.server.ReplicaAlterLogDirsThread)
>  [2020-06-03 04:52:17,550] INFO [ReplicaAlterLogDirsThread-5]: Current offset 
> 0 for partition author_id_enrichment_changelog_staging-302 is out of range, 
> which typically implies a leader change. Reset fetch offset to 1656927679 
> (kafka.server.ReplicaAlterLogDirsThread)
>  [2020-06-03 04:52:17,653] INFO [ReplicaAlterLogDirsThread-5]: Partition 
> author_id_enrichment_changelog_staging-302 has an older epoch (45) than the 
> current leader. Will await the new LeaderAndIsr state before resuming 
> fetching. (kafka.server.ReplicaAlterLogDirsThread)
>  [2020-06-03 04:52:17,653] WARN [ReplicaAlterLogDirsThread-5]: Partition 
> author_id_enrichment_changelog_staging-302 marked as failed 
> (kafka.server.ReplicaAlterLogDirsThread)
>  [2020-06-03 04:52:17,657] INFO [ReplicaAlterLogDirsThread-5]: Shutting down 
> (kafka.server.ReplicaAlterLogDirsThread)
>  [2020-06-03 04:52:17,661] INFO [ReplicaAlterLogDirsThread-5]: Stopped 
> (kafka.server.ReplicaAlterLogDirsThread)
>  [2020-06-03 04:52:17,661] INFO [ReplicaAlterLogDirsThread-5]: Shutdown 
> completed (kafka.server.ReplicaAlterLogDirsThread){code}
>  Only after restart the broker, the disk move succeed. The offset and epoch 
> number looks better.
> {code:java}
>  [2020-06-03 05:20:12,597] INFO [ReplicaAlterLogDirsManager on broker 5] 
> Added fetcher to broker BrokerEndPoint(id=5, host=localhost:-1) for 
> partitions Map(author_id_enrichment_changelog_staging-302 -> 
> (offset=166346, leaderEpoch=47)) (kafka.server.ReplicaAlterLogDirsManager)
>  [2020-06-03 05:20:12,606] INFO [ReplicaAlterLogDirsThread-5]: Starting 
> (kafka.server.ReplicaAlterLogDirsThread)
>  [2020-06-03 05:20:12,618] INFO [ReplicaAlterLogDirsThread-5]: 
> Beginning/resuming copy of partition 
> author_id_enrichment_changelog_staging-302 from offset 1657605964. Including 
> this partition, there are 1 remaining partitions to copy by this thread. 
> (kafka.server.ReplicaAlterLogDirsThread)
>  [2020-06-03 05:20:20,992] INFO [ReplicaAlterLogDirsThread-5]: Shutting down 
> (kafka.server.ReplicaAlterLogDirsThread)
>  [2020-06-03 05:20:20,994] INFO [ReplicaAlterLogDirsThread-5]: Shutdown 
> completed (kafka.server.ReplicaAlterLogDirsThread)
>  [2020-06-03 05:20:20,994] INFO [ReplicaAlterLogDirsThread-5]: Stopped 
> (kafka.server.ReplicaAlterLogDirsThread)
>   {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10398) Intra-broker disk move failed with onPartitionFenced()

2020-08-13 Thread Ming Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ming Liu updated KAFKA-10398:
-
Description: 
When I tried the intra-broker disk move on 2.5.0, it always failed quickly in 
onPartitionFenced() failure. That is all the log for ReplicaAlterLogManager:
{code:java}

 [2020-06-03 04:52:17,541] INFO [ReplicaAlterLogDirsManager on broker 5] Added 
fetcher to broker BrokerEndPoint(id=5, host=localhost:-1) for partitions 
Map(author_id_enrichment_changelog_staging-302 -> (offset=0, leaderEpoch=45)) 
(kafka.server.ReplicaAlterLogDirsManager)
 [2020-06-03 04:52:17,546] INFO [ReplicaAlterLogDirsThread-5]: Starting 
(kafka.server.ReplicaAlterLogDirsThread)
 [2020-06-03 04:52:17,546] INFO [ReplicaAlterLogDirsThread-5]: Truncating 
partition author_id_enrichment_changelog_staging-302 to local high watermark 0 
(kafka.server.ReplicaAlterLogDirsThread)
 [2020-06-03 04:52:17,547] INFO [ReplicaAlterLogDirsThread-5]: 
Beginning/resuming copy of partition author_id_enrichment_changelog_staging-302 
from offset 0. Including this partition, there are 1 remaining partitions to 
copy by this thread. (kafka.server.ReplicaAlterLogDirsThread)
 [2020-06-03 04:52:17,547] WARN [ReplicaAlterLogDirsThread-5]: Reset fetch 
offset for partition author_id_enrichment_changelog_staging-302 from 0 to 
current leader's start offset 1656927679 
(kafka.server.ReplicaAlterLogDirsThread)
 [2020-06-03 04:52:17,550] INFO [ReplicaAlterLogDirsThread-5]: Current offset 0 
for partition author_id_enrichment_changelog_staging-302 is out of range, which 
typically implies a leader change. Reset fetch offset to 1656927679 
(kafka.server.ReplicaAlterLogDirsThread)
 [2020-06-03 04:52:17,653] INFO [ReplicaAlterLogDirsThread-5]: Partition 
author_id_enrichment_changelog_staging-302 has an older epoch (45) than the 
current leader. Will await the new LeaderAndIsr state before resuming fetching. 
(kafka.server.ReplicaAlterLogDirsThread)
 [2020-06-03 04:52:17,653] WARN [ReplicaAlterLogDirsThread-5]: Partition 
author_id_enrichment_changelog_staging-302 marked as failed 
(kafka.server.ReplicaAlterLogDirsThread)
 [2020-06-03 04:52:17,657] INFO [ReplicaAlterLogDirsThread-5]: Shutting down 
(kafka.server.ReplicaAlterLogDirsThread)
 [2020-06-03 04:52:17,661] INFO [ReplicaAlterLogDirsThread-5]: Stopped 
(kafka.server.ReplicaAlterLogDirsThread)
 [2020-06-03 04:52:17,661] INFO [ReplicaAlterLogDirsThread-5]: Shutdown 
completed (kafka.server.ReplicaAlterLogDirsThread){code}

 Only after restart the broker, the disk move succeed. The offset and epoch 
number looks better.
{code:java}

 [2020-06-03 05:20:12,597] INFO [ReplicaAlterLogDirsManager on broker 5] Added 
fetcher to broker BrokerEndPoint(id=5, host=localhost:-1) for partitions 
Map(author_id_enrichment_changelog_staging-302 -> (offset=166346, 
leaderEpoch=47)) (kafka.server.ReplicaAlterLogDirsManager)
 [2020-06-03 05:20:12,606] INFO [ReplicaAlterLogDirsThread-5]: Starting 
(kafka.server.ReplicaAlterLogDirsThread)
 [2020-06-03 05:20:12,618] INFO [ReplicaAlterLogDirsThread-5]: 
Beginning/resuming copy of partition author_id_enrichment_changelog_staging-302 
from offset 1657605964. Including this partition, there are 1 remaining 
partitions to copy by this thread. (kafka.server.ReplicaAlterLogDirsThread)
 [2020-06-03 05:20:20,992] INFO [ReplicaAlterLogDirsThread-5]: Shutting down 
(kafka.server.ReplicaAlterLogDirsThread)
 [2020-06-03 05:20:20,994] INFO [ReplicaAlterLogDirsThread-5]: Shutdown 
completed (kafka.server.ReplicaAlterLogDirsThread)
 [2020-06-03 05:20:20,994] INFO [ReplicaAlterLogDirsThread-5]: Stopped 
(kafka.server.ReplicaAlterLogDirsThread)
  {code}
 

  was:
When I tried the intra-broker disk move on 2.5.0, it always failed quickly in 
onPartitionFenced() failure. That is all the log for ReplicaAlterLogManager:
[2020-06-03 04:52:17,541] INFO [ReplicaAlterLogDirsManager on broker 5] Added 
fetcher to broker BrokerEndPoint(id=5, host=localhost:-1) for partitions 
Map(author_id_enrichment_changelog_staging-302 -> (offset=0, leaderEpoch=45)) 
(kafka.server.ReplicaAlterLogDirsManager)
[2020-06-03 04:52:17,546] INFO [ReplicaAlterLogDirsThread-5]: Starting 
(kafka.server.ReplicaAlterLogDirsThread)
[2020-06-03 04:52:17,546] INFO [ReplicaAlterLogDirsThread-5]: Truncating 
partition author_id_enrichment_changelog_staging-302 to local high watermark 0 
(kafka.server.ReplicaAlterLogDirsThread)
[2020-06-03 04:52:17,547] INFO [ReplicaAlterLogDirsThread-5]: 
Beginning/resuming copy of partition author_id_enrichment_changelog_staging-302 
from offset 0. Including this partition, there are 1 remaining partitions to 
copy by this thread. (kafka.server.ReplicaAlterLogDirsThread)
[2020-06-03 04:52:17,547] WARN [ReplicaAlterLogDirsThread-5]: Reset fetch 
offset for partition author_id_enrichment_changelog_staging-302 from 0 to 
current leader's start offset 1656927679 
(kafka.server.ReplicaAlterLogDirsThread)

[jira] [Created] (KAFKA-10398) Intra-broker disk move failed with onPartitionFenced()

2020-08-13 Thread Ming Liu (Jira)
Ming Liu created KAFKA-10398:


 Summary: Intra-broker disk move failed with onPartitionFenced()
 Key: KAFKA-10398
 URL: https://issues.apache.org/jira/browse/KAFKA-10398
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.5.0
Reporter: Ming Liu


When I tried the intra-broker disk move on 2.5.0, it always failed quickly in 
onPartitionFenced() failure. That is all the log for ReplicaAlterLogManager:
[2020-06-03 04:52:17,541] INFO [ReplicaAlterLogDirsManager on broker 5] Added 
fetcher to broker BrokerEndPoint(id=5, host=localhost:-1) for partitions 
Map(author_id_enrichment_changelog_staging-302 -> (offset=0, leaderEpoch=45)) 
(kafka.server.ReplicaAlterLogDirsManager)
[2020-06-03 04:52:17,546] INFO [ReplicaAlterLogDirsThread-5]: Starting 
(kafka.server.ReplicaAlterLogDirsThread)
[2020-06-03 04:52:17,546] INFO [ReplicaAlterLogDirsThread-5]: Truncating 
partition author_id_enrichment_changelog_staging-302 to local high watermark 0 
(kafka.server.ReplicaAlterLogDirsThread)
[2020-06-03 04:52:17,547] INFO [ReplicaAlterLogDirsThread-5]: 
Beginning/resuming copy of partition author_id_enrichment_changelog_staging-302 
from offset 0. Including this partition, there are 1 remaining partitions to 
copy by this thread. (kafka.server.ReplicaAlterLogDirsThread)
[2020-06-03 04:52:17,547] WARN [ReplicaAlterLogDirsThread-5]: Reset fetch 
offset for partition author_id_enrichment_changelog_staging-302 from 0 to 
current leader's start offset 1656927679 
(kafka.server.ReplicaAlterLogDirsThread)
[2020-06-03 04:52:17,550] INFO [ReplicaAlterLogDirsThread-5]: Current offset 0 
for partition author_id_enrichment_changelog_staging-302 is out of range, which 
typically implies a leader change. Reset fetch offset to 1656927679 
(kafka.server.ReplicaAlterLogDirsThread)
[2020-06-03 04:52:17,653] INFO [ReplicaAlterLogDirsThread-5]: Partition 
author_id_enrichment_changelog_staging-302 has an older epoch (45) than the 
current leader. Will await the new LeaderAndIsr state before resuming fetching. 
(kafka.server.ReplicaAlterLogDirsThread)
[2020-06-03 04:52:17,653] WARN [ReplicaAlterLogDirsThread-5]: Partition 
author_id_enrichment_changelog_staging-302 marked as failed 
(kafka.server.ReplicaAlterLogDirsThread)
[2020-06-03 04:52:17,657] INFO [ReplicaAlterLogDirsThread-5]: Shutting down 
(kafka.server.ReplicaAlterLogDirsThread)
[2020-06-03 04:52:17,661] INFO [ReplicaAlterLogDirsThread-5]: Stopped 
(kafka.server.ReplicaAlterLogDirsThread)
[2020-06-03 04:52:17,661] INFO [ReplicaAlterLogDirsThread-5]: Shutdown 
completed (kafka.server.ReplicaAlterLogDirsThread)
Only after restart the broker, the disk move succeed. The offset and epoch 
number looks better.
[2020-06-03 05:20:12,597] INFO [ReplicaAlterLogDirsManager on broker 5] Added 
fetcher to broker BrokerEndPoint(id=5, host=localhost:-1) for partitions 
Map(author_id_enrichment_changelog_staging-302 -> (offset=166346, 
leaderEpoch=47)) (kafka.server.ReplicaAlterLogDirsManager)
[2020-06-03 05:20:12,606] INFO [ReplicaAlterLogDirsThread-5]: Starting 
(kafka.server.ReplicaAlterLogDirsThread)
[2020-06-03 05:20:12,618] INFO [ReplicaAlterLogDirsThread-5]: 
Beginning/resuming copy of partition author_id_enrichment_changelog_staging-302 
from offset 1657605964. Including this partition, there are 1 remaining 
partitions to copy by this thread. (kafka.server.ReplicaAlterLogDirsThread)
[2020-06-03 05:20:20,992] INFO [ReplicaAlterLogDirsThread-5]: Shutting down 
(kafka.server.ReplicaAlterLogDirsThread)
[2020-06-03 05:20:20,994] INFO [ReplicaAlterLogDirsThread-5]: Shutdown 
completed (kafka.server.ReplicaAlterLogDirsThread)
[2020-06-03 05:20:20,994] INFO [ReplicaAlterLogDirsThread-5]: Stopped 
(kafka.server.ReplicaAlterLogDirsThread)
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8362) LogCleaner gets stuck after partition move between log directories

2020-08-13 Thread Ming Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177123#comment-17177123
 ] 

Ming Liu commented on KAFKA-8362:
-

Awesome, what is the patch link?

> LogCleaner gets stuck after partition move between log directories
> --
>
> Key: KAFKA-8362
> URL: https://issues.apache.org/jira/browse/KAFKA-8362
> Project: Kafka
>  Issue Type: Bug
>  Components: jbod, log cleaner
>Reporter: Julio Ng
>Assignee: Luke Chen
>Priority: Major
>
> When a partition is moved from one directory to another, their checkpoint 
> entry in cleaner-offset-checkpoint file is not removed from the source 
> directory.
> As a consequence when we read the last firstDirtyOffset, we might get a stale 
> value from the old checkpoint file.
> Basically, we need clean up the entry from the check point file in the source 
> directory when the move is completed
> The current issue is that the code in LogCleanerManager:
> {noformat}
> /**
>  * @return the position processed for all logs.
>  */
> def allCleanerCheckpoints: Map[TopicPartition, Long] = {
>   inLock(lock) {
> checkpoints.values.flatMap(checkpoint => {
>   try {
> checkpoint.read()
>   } catch {
> case e: KafkaStorageException =>
>   error(s"Failed to access checkpoint file ${checkpoint.file.getName} 
> in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e)
>   Map.empty[TopicPartition, Long]
>   }
> }).toMap
>   }
> }{noformat}
> collapses the offsets when multiple entries exist for the topicPartition



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8362) LogCleaner gets stuck after partition move between log directories

2020-08-12 Thread Ming Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17176476#comment-17176476
 ] 

Ming Liu commented on KAFKA-8362:
-

In that case, we need to modify *allCleanerCheckpoints* implementation. 
Otherwise, the logcleanner is completely broken. 

> LogCleaner gets stuck after partition move between log directories
> --
>
> Key: KAFKA-8362
> URL: https://issues.apache.org/jira/browse/KAFKA-8362
> Project: Kafka
>  Issue Type: Bug
>  Components: jbod, log cleaner
>Reporter: Julio Ng
>Assignee: Luke Chen
>Priority: Major
>
> When a partition is moved from one directory to another, their checkpoint 
> entry in cleaner-offset-checkpoint file is not removed from the source 
> directory.
> As a consequence when we read the last firstDirtyOffset, we might get a stale 
> value from the old checkpoint file.
> Basically, we need clean up the entry from the check point file in the source 
> directory when the move is completed
> The current issue is that the code in LogCleanerManager:
> {noformat}
> /**
>  * @return the position processed for all logs.
>  */
> def allCleanerCheckpoints: Map[TopicPartition, Long] = {
>   inLock(lock) {
> checkpoints.values.flatMap(checkpoint => {
>   try {
> checkpoint.read()
>   } catch {
> case e: KafkaStorageException =>
>   error(s"Failed to access checkpoint file ${checkpoint.file.getName} 
> in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e)
>   Map.empty[TopicPartition, Long]
>   }
> }).toMap
>   }
> }{noformat}
> collapses the offsets when multiple entries exist for the topicPartition



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-8733) Offline partitions occur when leader's disk is slow in reads while responding to follower fetch requests.

2020-07-25 Thread Ming Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17098264#comment-17098264
 ] 

Ming Liu edited comment on KAFKA-8733 at 7/25/20, 3:03 PM:
---

We (at Twitter) also saw this issue almost every month. Given it is 
availability loss, we have to react very fast and set unclean leader election. 

First,  is it possible to add a metrics monitor such exact scenario before it 
happens?

One observation, given we are using min-ISR=2, is that the last follower 
falling out of ISR should have the same HW as the leader before it is going 
offline. So when you set the unclean leader election, there is 50% chance you 
incur a data loss(if the election doesn't select that replica).

 

 


was (Author: mingaliu):
We (at Twitter) also saw this issue almost every month and it is annoying. 
Given it is availability loss, we have to react very fast and set unclean 
leader election. 

Given when this happens, we should also starts the disk/host swap operation. It 
seems the low level system metrics or fetch latency metrics are good metrics to 
monitor such scenario. We should add a metrics monitor such exact scenario?

Also another observation, given we are using min-ISR=2. So the last replica 
kicked out of ISR should have the same HW as the eventual offline leader. So 
when you set the unclean leader election, there is 50% chance you incur a data 
loss(if the election doesn't select that replica).

 

 

> Offline partitions occur when leader's disk is slow in reads while responding 
> to follower fetch requests.
> -
>
> Key: KAFKA-8733
> URL: https://issues.apache.org/jira/browse/KAFKA-8733
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.2, 2.4.0
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Critical
> Attachments: weighted-io-time-2.png, wio-time.png
>
>
> We found offline partitions issue multiple times on some of the hosts in our 
> clusters. After going through the broker logs and hosts’s disk stats, it 
> looks like this issue occurs whenever the read/write operations take more 
> time on that disk. In a particular case where read time is more than the 
> replica.lag.time.max.ms, follower replicas will be out of sync as their 
> earlier fetch requests are stuck while reading the local log and their fetch 
> status is not yet updated as mentioned in the below code of `ReplicaManager`. 
> If there is an issue in reading the data from the log for a duration more 
> than replica.lag.time.max.ms then all the replicas will be out of sync and 
> partition becomes offline if min.isr.replicas > 1 and unclean.leader.election 
> is false.
>  
> {code:java}
> def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
>   val result = readFromLocalLog( // this call took more than 
> `replica.lag.time.max.ms`
>   replicaId = replicaId,
>   fetchOnlyFromLeader = fetchOnlyFromLeader,
>   readOnlyCommitted = fetchOnlyCommitted,
>   fetchMaxBytes = fetchMaxBytes,
>   hardMaxBytesLimit = hardMaxBytesLimit,
>   readPartitionInfo = fetchInfos,
>   quota = quota,
>   isolationLevel = isolationLevel)
>   if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // 
> fetch time gets updated here, but mayBeShrinkIsr should have been already 
> called and the replica is removed from isr
>  else result
>  }
> val logReadResults = readFromLog()
> {code}
> Attached the graphs of disk weighted io time stats when this issue occurred.
> I will raise [KIP-501|https://s.apache.org/jhbpn] describing options on how 
> to handle this scenario.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10242) Adding metrics to track the total count of idempotent producers that Broker need to track

2020-07-07 Thread Ming Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17153091#comment-17153091
 ] 

Ming Liu commented on KAFKA-10242:
--

Yes, correct, then it should be 2.7.0 instead of 2.6.0 

> Adding metrics to track the total count of idempotent producers that Broker 
> need to track
> -
>
> Key: KAFKA-10242
> URL: https://issues.apache.org/jira/browse/KAFKA-10242
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 2.5.0
>Reporter: Ming Liu
>Priority: Critical
>  Labels: needs-kip
> Fix For: 2.7.0
>
>
> We found it is very useful to track the total number of idempotent producers 
> that broker is tracking.
> In our production environment, we have many idempotent producers for a 
> cluster and sometimes that number increased to very high number which 
> requires some attention to mitigate.
> This is especially true for client (< 2.4) where the client retry might 
> generate too many different idempotent producers which can trigger broker GC.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10242) Adding metrics to track the total count of idempotent producers that Broker need to track

2020-07-06 Thread Ming Liu (Jira)
Ming Liu created KAFKA-10242:


 Summary: Adding metrics to track the total count of idempotent 
producers that Broker need to track
 Key: KAFKA-10242
 URL: https://issues.apache.org/jira/browse/KAFKA-10242
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 2.5.0
Reporter: Ming Liu
 Fix For: 2.6.0


We found it is very useful to track the total number of idempotent producers 
that broker is tracking.
In our production environment, we have many idempotent producers for a cluster 
and sometimes that number increased to very high number which requires some 
attention to mitigate.
This is especially true for client (< 2.4) where the client retry might 
generate too many different idempotent producers which can trigger broker GC.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10242) Adding metrics to track the total count of idempotent producers that Broker need to track

2020-07-06 Thread Ming Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17152494#comment-17152494
 ] 

Ming Liu commented on KAFKA-10242:
--

The PR is [https://github.com/apache/kafka/pull/8982]

> Adding metrics to track the total count of idempotent producers that Broker 
> need to track
> -
>
> Key: KAFKA-10242
> URL: https://issues.apache.org/jira/browse/KAFKA-10242
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 2.5.0
>Reporter: Ming Liu
>Priority: Critical
> Fix For: 2.6.0
>
>
> We found it is very useful to track the total number of idempotent producers 
> that broker is tracking.
> In our production environment, we have many idempotent producers for a 
> cluster and sometimes that number increased to very high number which 
> requires some attention to mitigate.
> This is especially true for client (< 2.4) where the client retry might 
> generate too many different idempotent producers which can trigger broker GC.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8362) LogCleaner gets stuck after partition move between log directories

2020-06-14 Thread Ming Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17135400#comment-17135400
 ] 

Ming Liu commented on KAFKA-8362:
-

 In 2.5 Kafka, this still easily repro for me.

With compacted topic, let's say topic Test1 partition 100 is disk 1, in 
"cleaner-offset-checkpoint", you can see

Test1 100 2344432

Then if you move that partition to disk 2, you will see the same thing in 
"cleaner-offset-checkpoint", but the disk 1 "cleaner-offset-checkpoint" still 
contains that partition.

Test1 100 2344543 

This actually caused the problem during logclean due to how 
allCleanerCheckpoints is implemented.

> LogCleaner gets stuck after partition move between log directories
> --
>
> Key: KAFKA-8362
> URL: https://issues.apache.org/jira/browse/KAFKA-8362
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Reporter: Julio Ng
>Priority: Major
>
> When a partition is moved from one directory to another, their checkpoint 
> entry in cleaner-offset-checkpoint file is not removed from the source 
> directory.
> As a consequence when we read the last firstDirtyOffset, we might get a stale 
> value from the old checkpoint file.
> Basically, we need clean up the entry from the check point file in the source 
> directory when the move is completed
> The current issue is that the code in LogCleanerManager:
> {noformat}
> /**
>  * @return the position processed for all logs.
>  */
> def allCleanerCheckpoints: Map[TopicPartition, Long] = {
>   inLock(lock) {
> checkpoints.values.flatMap(checkpoint => {
>   try {
> checkpoint.read()
>   } catch {
> case e: KafkaStorageException =>
>   error(s"Failed to access checkpoint file ${checkpoint.file.getName} 
> in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e)
>   Map.empty[TopicPartition, Long]
>   }
> }).toMap
>   }
> }{noformat}
> collapses the offsets when multiple entries exist for the topicPartition



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8733) Offline partitions occur when leader's disk is slow in reads while responding to follower fetch requests.

2020-05-03 Thread Ming Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17098264#comment-17098264
 ] 

Ming Liu commented on KAFKA-8733:
-

We (at Twitter) also saw this issue almost every month and it is annoying. 
Given it is availability loss, we have to react very fast and set unclean 
leader election. 

Given when this happens, we should also starts the disk/host swap operation. It 
seems the low level system metrics or fetch latency metrics are good metrics to 
monitor such scenario. We should add a metrics monitor such exact scenario?

Also another observation, given we are using min-ISR=2. So the last replica 
kicked out of ISR should have the same HW as the eventual offline leader. So 
when you set the unclean leader election, there is 50% chance you incur a data 
loss(if the election doesn't select that replica).

 

 

> Offline partitions occur when leader's disk is slow in reads while responding 
> to follower fetch requests.
> -
>
> Key: KAFKA-8733
> URL: https://issues.apache.org/jira/browse/KAFKA-8733
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.2, 2.4.0
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Critical
> Attachments: weighted-io-time-2.png, wio-time.png
>
>
> We found offline partitions issue multiple times on some of the hosts in our 
> clusters. After going through the broker logs and hosts’s disk stats, it 
> looks like this issue occurs whenever the read/write operations take more 
> time on that disk. In a particular case where read time is more than the 
> replica.lag.time.max.ms, follower replicas will be out of sync as their 
> earlier fetch requests are stuck while reading the local log and their fetch 
> status is not yet updated as mentioned in the below code of `ReplicaManager`. 
> If there is an issue in reading the data from the log for a duration more 
> than replica.lag.time.max.ms then all the replicas will be out of sync and 
> partition becomes offline if min.isr.replicas > 1 and unclean.leader.election 
> is false.
>  
> {code:java}
> def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
>   val result = readFromLocalLog( // this call took more than 
> `replica.lag.time.max.ms`
>   replicaId = replicaId,
>   fetchOnlyFromLeader = fetchOnlyFromLeader,
>   readOnlyCommitted = fetchOnlyCommitted,
>   fetchMaxBytes = fetchMaxBytes,
>   hardMaxBytesLimit = hardMaxBytesLimit,
>   readPartitionInfo = fetchInfos,
>   quota = quota,
>   isolationLevel = isolationLevel)
>   if (isFromFollower) updateFollowerLogReadResults(replicaId, result). // 
> fetch time gets updated here, but mayBeShrinkIsr should have been already 
> called and the replica is removed from isr
>  else result
>  }
> val logReadResults = readFromLog()
> {code}
> Attached the graphs of disk weighted io time stats when this issue occurred.
> I will raise [KIP-501|https://s.apache.org/jhbpn] describing options on how 
> to handle this scenario.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8325) Remove from the incomplete set failed. This should be impossible

2019-05-13 Thread Ming Liu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16838718#comment-16838718
 ] 

Ming Liu commented on KAFKA-8325:
-

We also run into this problem after we upgrade to Kafka 2.2.

> Remove from the incomplete set failed. This should be impossible
> 
>
> Key: KAFKA-8325
> URL: https://issues.apache.org/jira/browse/KAFKA-8325
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.1.0
>Reporter: Mattia Barbon
>Priority: Major
>
> I got this error when using the Kafka producer. So far it happened twice, 
> with an interval of about 1 week.
> {{ERROR [2019-05-05 08:43:07,505] 
> org.apache.kafka.clients.producer.internals.Sender: [Producer 
> clientId=, transactionalId=] Uncaught error in kafka 
> producer I/O thread:}}
> {{ ! java.lang.IllegalStateException: Remove from the incomplete set failed. 
> This should be impossible.}}
> {{ ! at 
> org.apache.kafka.clients.producer.internals.IncompleteBatches.remove(IncompleteBatches.java:44)}}
> {{ ! at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.deallocate(RecordAccumulator.java:645)}}
> {{ ! at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:717)}}
> {{ ! at 
> org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:365)}}
> {{ ! at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:308)}}
> {{ ! at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:233)}}
> {{ ! at java.lang.Thread.run(Thread.java:748)}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7698) Kafka Broker fail to start: ProducerFencedException thrown from producerstatemanager.scala!checkProducerEpoch

2018-12-03 Thread Ming Liu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707621#comment-16707621
 ] 

Ming Liu commented on KAFKA-7698:
-

The problem is seems coming from appendEndTxnMarker of 
ProducerStateManager.scala.

ProducerAppendInfo() has this ValidationType passed in. During initial 
bootstrapping, ValidationType is set to None (instead of Client or All).

So, for Append(), maybeValidateAppend() is called and the check is skipped 
during loading phase.

But for appendEndTxnMarker(), it seems we should also skip the check. 

> Kafka Broker fail to start: ProducerFencedException thrown from 
> producerstatemanager.scala!checkProducerEpoch 
> --
>
> Key: KAFKA-7698
> URL: https://issues.apache.org/jira/browse/KAFKA-7698
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ming Liu
>Priority: Major
>
> During our operation of Kafka, we frequently saw this failure: 
>    There was an error in one of the threads during logs loading: 
> org.apache.kafka.common.errors.ProducerFencedException:
> {code:java}
> [06:57:09,697] INFO [ProducerStateManager partition=interaction_events-127] 
> Loading producer state from snapshot file 
> '/data/disk5/kafka/interaction_events-127/092130764817.snapshot' 
> (kafka.log.ProducerStateManager)
> [06:57:09,698] INFO [Log partition=interaction_events-127, 
> dir=/data/disk5/kafka] Completed load of log with 11 segments, log start 
> offset 91975003024 and log end offset 92130764817 in 12701 ms (kafka.log.Log)
> [06:57:09,701] ERROR There was an error in one of the threads during logs 
> loading: org.apache.kafka.common.errors.ProducerFencedException: Producer's 
> epoch is no longer valid. There is probably another producer with a newer 
> epoch. 63 (request epoch), 66 (server epoch) (kafka.log.LogManager)
> [06:57:09,705] INFO [ProducerStateManager 
> partition=client_interaction_events_authorid_enrichment-20] Writing producer 
> snapshot at offset 92418754384 (kafka.log.ProducerStateManager)
> [06:57:09,707] ERROR [KafkaServer id=2] Fatal error during KafkaServer 
> startup. Prepare to shutdown (kafka.server.KafkaServer)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 63 
> (request epoch), 66 (server epoch)
> {code:java}
>  {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7698) Kafka Broker fail to start: ProducerFencedException thrown from producerstatemanager.scala!checkProducerEpoch

2018-12-03 Thread Ming Liu (JIRA)
Ming Liu created KAFKA-7698:
---

 Summary: Kafka Broker fail to start: ProducerFencedException 
thrown from producerstatemanager.scala!checkProducerEpoch 
 Key: KAFKA-7698
 URL: https://issues.apache.org/jira/browse/KAFKA-7698
 Project: Kafka
  Issue Type: Bug
Reporter: Ming Liu


During our operation of Kafka, we frequently saw this failure: 

   There was an error in one of the threads during logs loading: 
org.apache.kafka.common.errors.ProducerFencedException:

{code:java}

[06:57:09,697] INFO [ProducerStateManager partition=interaction_events-127] 
Loading producer state from snapshot file 
'/data/disk5/kafka/interaction_events-127/092130764817.snapshot' 
(kafka.log.ProducerStateManager)
[06:57:09,698] INFO [Log partition=interaction_events-127, 
dir=/data/disk5/kafka] Completed load of log with 11 segments, log start offset 
91975003024 and log end offset 92130764817 in 12701 ms (kafka.log.Log)
[06:57:09,701] ERROR There was an error in one of the threads during logs 
loading: org.apache.kafka.common.errors.ProducerFencedException: Producer's 
epoch is no longer valid. There is probably another producer with a newer 
epoch. 63 (request epoch), 66 (server epoch) (kafka.log.LogManager)
[06:57:09,705] INFO [ProducerStateManager 
partition=client_interaction_events_authorid_enrichment-20] Writing producer 
snapshot at offset 92418754384 (kafka.log.ProducerStateManager)
[06:57:09,707] ERROR [KafkaServer id=2] Fatal error during KafkaServer startup. 
Prepare to shutdown (kafka.server.KafkaServer)
org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is no 
longer valid. There is probably another producer with a newer epoch. 63 
(request epoch), 66 (server epoch)
{code:java}
 {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7693) "IllegalArgumentException: Invalid negative sequence number used" in Kafka Client

2018-12-02 Thread Ming Liu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706463#comment-16706463
 ] 

Ming Liu commented on KAFKA-7693:
-

With some further debugging, the bug seems at client TransactionManager. 
incrementSequenceNumber().

Instead of

 currentSequenceNumber += increment;

as the sequenceNumber might overflow, it should 

currentSequenceNumber = 
DefaultRecordBatch.incrementSequence(currentSequenceNumber, increment);

I am preparing the pull request for the fix.

 

> "IllegalArgumentException: Invalid negative sequence number used" in Kafka 
> Client
> -
>
> Key: KAFKA-7693
> URL: https://issues.apache.org/jira/browse/KAFKA-7693
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Ming Liu
>Priority: Major
>
> When operating Kafka, we have seen Kafka client logging this kind of 
> exception:
> {noformat}
> org.apache.kafka.clients.producer.internals.Sender  - 
>  [Producer 
> clientId=interaction-counter-service-2-dev-by-tweet-id-counting-dest-producer,
>  
> transactionalId=interaction-counter-service-2-dev-by-tweet-id-counting-81-transactional-id-gen-1]
>  Uncaught error in kafka producer I/O thread:
> java.lang.IllegalArgumentException: Invalid negative sequence number used
>     at 
> org.apache.kafka.common.record.MemoryRecordsBuilder.validateProducerState(MemoryRecordsBuilder.java:331)
>     at 
> org.apache.kafka.common.record.MemoryRecordsBuilder.close(MemoryRecordsBuilder.java:302)
>     at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.close(ProducerBatch.java:407)
>     at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:572)
>     at 
> org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:270)
>     at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
>     at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
>     at java.lang.Thread.run(Thread.java:748)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7693) "IllegalArgumentException: Invalid negative sequence number used" in Kafka Client

2018-12-02 Thread Ming Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ming Liu updated KAFKA-7693:

Description: 
When operating Kafka, we have seen Kafka client logging this kind of exception:

{noformat}

org.apache.kafka.clients.producer.internals.Sender  -   
   [Producer 
clientId=interaction-counter-service-2-dev-by-tweet-id-counting-dest-producer, 
transactionalId=interaction-counter-service-2-dev-by-tweet-id-counting-81-transactional-id-gen-1]
 Uncaught error in kafka producer I/O thread:

java.lang.IllegalArgumentException: Invalid negative sequence number used

    at 
org.apache.kafka.common.record.MemoryRecordsBuilder.validateProducerState(MemoryRecordsBuilder.java:331)

    at 
org.apache.kafka.common.record.MemoryRecordsBuilder.close(MemoryRecordsBuilder.java:302)

    at 
org.apache.kafka.clients.producer.internals.ProducerBatch.close(ProducerBatch.java:407)

    at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:572)

    at 
org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:270)

    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)

    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)

    at java.lang.Thread.run(Thread.java:748)

{noformat}

  was:
When operating Kafka, we have seen Kafka client logging this kind of exception:

org.apache.kafka.clients.producer.internals.Sender  -   
   [Producer 
clientId=interaction-counter-service-2-dev-by-tweet-id-counting-dest-producer, 
transactionalId=interaction-counter-service-2-dev-by-tweet-id-counting-81-transactional-id-gen-1]
 Uncaught error in kafka producer I/O thread:

java.lang.IllegalArgumentException: Invalid negative sequence number used

    at 
org.apache.kafka.common.record.MemoryRecordsBuilder.validateProducerState(MemoryRecordsBuilder.java:331)

    at 
org.apache.kafka.common.record.MemoryRecordsBuilder.close(MemoryRecordsBuilder.java:302)

    at 
org.apache.kafka.clients.producer.internals.ProducerBatch.close(ProducerBatch.java:407)

    at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:572)

    at 
org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:270)

    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)

    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)

    at java.lang.Thread.run(Thread.java:748)


> "IllegalArgumentException: Invalid negative sequence number used" in Kafka 
> Client
> -
>
> Key: KAFKA-7693
> URL: https://issues.apache.org/jira/browse/KAFKA-7693
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Ming Liu
>Priority: Major
>
> When operating Kafka, we have seen Kafka client logging this kind of 
> exception:
> {noformat}
> org.apache.kafka.clients.producer.internals.Sender  - 
>  [Producer 
> clientId=interaction-counter-service-2-dev-by-tweet-id-counting-dest-producer,
>  
> transactionalId=interaction-counter-service-2-dev-by-tweet-id-counting-81-transactional-id-gen-1]
>  Uncaught error in kafka producer I/O thread:
> java.lang.IllegalArgumentException: Invalid negative sequence number used
>     at 
> org.apache.kafka.common.record.MemoryRecordsBuilder.validateProducerState(MemoryRecordsBuilder.java:331)
>     at 
> org.apache.kafka.common.record.MemoryRecordsBuilder.close(MemoryRecordsBuilder.java:302)
>     at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.close(ProducerBatch.java:407)
>     at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:572)
>     at 
> org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:270)
>     at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
>     at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
>     at java.lang.Thread.run(Thread.java:748)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7692) updateFirstUnstableOffset NPE due to sequenceId overflow in ProducerStateManager.append

2018-12-02 Thread Ming Liu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706462#comment-16706462
 ] 

Ming Liu edited comment on KAFKA-7692 at 12/2/18 8:33 PM:
--

Upon further analysis, the bug is in ProducerStateManager.append, when it call:

   updatedEntry.addBatch(epoch, lastSeq, lastOffset, lastSeq - firstSeq, 
lastTimestamp)

because the sequenceId is int, the SequenceId should wrap around it is over 
Int.MaxValue, for example, as it is defined in 
defaultRecordBatch!IncrementSequence(). 

I am preparing the pull request for the fix. 


was (Author: mingaliu):
Upon the close analysis, the bug is in ProducerStateManager.append, when it 
call:

   updatedEntry.addBatch(epoch, lastSeq, lastOffset, lastSeq - firstSeq, 
lastTimestamp)

because the sequenceId is int, the SequenceId should wrap around it is over 
Int.MaxValue, for example, as it is defined in 
defaultRecordBatch!IncrementSequence(). 

I am preparing the pull request for the fix. 

> updateFirstUnstableOffset NPE due to sequenceId overflow in 
> ProducerStateManager.append
> ---
>
> Key: KAFKA-7692
> URL: https://issues.apache.org/jira/browse/KAFKA-7692
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.0.0
>Reporter: Ming Liu
>Priority: Major
> Fix For: 2.2.0
>
>
> When operating Kafka, we frequently saw this exception on Kafka server log, 
> {noformat}
> ERROR [KafkaServer id=19] Fatal error during KafkaServer startup. Prepare to 
> shutdown (kafka.server.KafkaServer)
>  java.lang.NullPointerException
>      at kafka.log.Log.kafka$log$Log$$updateFirstUnstableOffset(Log.scala:792)
>      at kafka.log.Log.kafka$log$Log$$loadProducerState(Log.scala:518)
>      at kafka.log.Log.(Log.scala:228)
>      at kafka.log.Log$.apply(Log.scala:1747)
>      at 
> kafka.log.LogManager.kafka$log$LogManager$$loadLog(LogManager.scala:255)
>      at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$11$$anonfun$apply$15$$anonfun$apply$2.apply$mcV$sp(LogManager.scala:335)
>      at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)
>      at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>      at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>      at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>      at java.lang.Thread.run(Thread.java:748)
> {noformat} 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7693) "IllegalArgumentException: Invalid negative sequence number used" in Kafka Client

2018-12-02 Thread Ming Liu (JIRA)
Ming Liu created KAFKA-7693:
---

 Summary: "IllegalArgumentException: Invalid negative sequence 
number used" in Kafka Client
 Key: KAFKA-7693
 URL: https://issues.apache.org/jira/browse/KAFKA-7693
 Project: Kafka
  Issue Type: Bug
  Components: clients
Reporter: Ming Liu


When operating Kafka, we have seen Kafka client logging this kind of exception:

org.apache.kafka.clients.producer.internals.Sender  -   
   [Producer 
clientId=interaction-counter-service-2-dev-by-tweet-id-counting-dest-producer, 
transactionalId=interaction-counter-service-2-dev-by-tweet-id-counting-81-transactional-id-gen-1]
 Uncaught error in kafka producer I/O thread:

java.lang.IllegalArgumentException: Invalid negative sequence number used

    at 
org.apache.kafka.common.record.MemoryRecordsBuilder.validateProducerState(MemoryRecordsBuilder.java:331)

    at 
org.apache.kafka.common.record.MemoryRecordsBuilder.close(MemoryRecordsBuilder.java:302)

    at 
org.apache.kafka.clients.producer.internals.ProducerBatch.close(ProducerBatch.java:407)

    at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:572)

    at 
org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:270)

    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)

    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)

    at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7692) updateFirstUnstableOffset NPE due to sequenceId overflow in ProducerStateManager.append

2018-12-02 Thread Ming Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ming Liu updated KAFKA-7692:

Description: 
When operating Kafka, we frequently saw this exception on Kafka server log, 

{noformat}

ERROR [KafkaServer id=19] Fatal error during KafkaServer startup. Prepare to 
shutdown (kafka.server.KafkaServer)
 java.lang.NullPointerException
     at kafka.log.Log.kafka$log$Log$$updateFirstUnstableOffset(Log.scala:792)
     at kafka.log.Log.kafka$log$Log$$loadProducerState(Log.scala:518)
     at kafka.log.Log.(Log.scala:228)
     at kafka.log.Log$.apply(Log.scala:1747)
     at kafka.log.LogManager.kafka$log$LogManager$$loadLog(LogManager.scala:255)
     at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$11$$anonfun$apply$15$$anonfun$apply$2.apply$mcV$sp(LogManager.scala:335)
     at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
     at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
     at java.lang.Thread.run(Thread.java:748)

{noformat} 

 

  was:
When operating Kafka, we frequently saw this exception on Kafka server log, 

Exception: 

[2018-06-04 20:44:53,789] ERROR [KafkaServer id=19] Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
java.lang.NullPointerException
    at kafka.log.Log.kafka$log$Log$$updateFirstUnstableOffset(Log.scala:792)
    at kafka.log.Log.kafka$log$Log$$loadProducerState(Log.scala:518)
    at kafka.log.Log.(Log.scala:228)
    at kafka.log.Log$.apply(Log.scala:1747)
    at kafka.log.LogManager.kafka$log$LogManager$$loadLog(LogManager.scala:255)
    at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$11$$anonfun$apply$15$$anonfun$apply$2.apply$mcV$sp(LogManager.scala:335)
    at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

 

 


> updateFirstUnstableOffset NPE due to sequenceId overflow in 
> ProducerStateManager.append
> ---
>
> Key: KAFKA-7692
> URL: https://issues.apache.org/jira/browse/KAFKA-7692
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.0.0
>Reporter: Ming Liu
>Priority: Major
> Fix For: 2.2.0
>
>
> When operating Kafka, we frequently saw this exception on Kafka server log, 
> {noformat}
> ERROR [KafkaServer id=19] Fatal error during KafkaServer startup. Prepare to 
> shutdown (kafka.server.KafkaServer)
>  java.lang.NullPointerException
>      at kafka.log.Log.kafka$log$Log$$updateFirstUnstableOffset(Log.scala:792)
>      at kafka.log.Log.kafka$log$Log$$loadProducerState(Log.scala:518)
>      at kafka.log.Log.(Log.scala:228)
>      at kafka.log.Log$.apply(Log.scala:1747)
>      at 
> kafka.log.LogManager.kafka$log$LogManager$$loadLog(LogManager.scala:255)
>      at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$11$$anonfun$apply$15$$anonfun$apply$2.apply$mcV$sp(LogManager.scala:335)
>      at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)
>      at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>      at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>      at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>      at java.lang.Thread.run(Thread.java:748)
> {noformat} 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7692) updateFirstUnstableOffset NPE due to sequenceId overflow in ProducerStateManager.append

2018-12-02 Thread Ming Liu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706462#comment-16706462
 ] 

Ming Liu commented on KAFKA-7692:
-

Upon the close analysis, the bug is in ProducerStateManager.append, when it 
call:

   updatedEntry.addBatch(epoch, lastSeq, lastOffset, lastSeq - firstSeq, 
lastTimestamp)

because the sequenceId is int, the SequenceId should wrap around it is over 
Int.MaxValue, for example, as it is defined in 
defaultRecordBatch!IncrementSequence(). 

I am preparing the pull request for the fix. 

> updateFirstUnstableOffset NPE due to sequenceId overflow in 
> ProducerStateManager.append
> ---
>
> Key: KAFKA-7692
> URL: https://issues.apache.org/jira/browse/KAFKA-7692
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.0.0
>Reporter: Ming Liu
>Priority: Major
> Fix For: 2.2.0
>
>
> When operating Kafka, we frequently saw this exception on Kafka server log, 
> Exception: 
> [2018-06-04 20:44:53,789] ERROR [KafkaServer id=19] Fatal error during 
> KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
> java.lang.NullPointerException
>     at kafka.log.Log.kafka$log$Log$$updateFirstUnstableOffset(Log.scala:792)
>     at kafka.log.Log.kafka$log$Log$$loadProducerState(Log.scala:518)
>     at kafka.log.Log.(Log.scala:228)
>     at kafka.log.Log$.apply(Log.scala:1747)
>     at 
> kafka.log.LogManager.kafka$log$LogManager$$loadLog(LogManager.scala:255)
>     at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$11$$anonfun$apply$15$$anonfun$apply$2.apply$mcV$sp(LogManager.scala:335)
>     at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7692) updateFirstUnstableOffset NPE due to sequenceId overflow in ProducerStateManager.append

2018-12-02 Thread Ming Liu (JIRA)
Ming Liu created KAFKA-7692:
---

 Summary: updateFirstUnstableOffset NPE due to sequenceId overflow 
in ProducerStateManager.append
 Key: KAFKA-7692
 URL: https://issues.apache.org/jira/browse/KAFKA-7692
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.0.0
Reporter: Ming Liu
 Fix For: 2.2.0


When operating Kafka, we frequently saw this exception on Kafka server log, 

Exception: 

[2018-06-04 20:44:53,789] ERROR [KafkaServer id=19] Fatal error during 
KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
java.lang.NullPointerException
    at kafka.log.Log.kafka$log$Log$$updateFirstUnstableOffset(Log.scala:792)
    at kafka.log.Log.kafka$log$Log$$loadProducerState(Log.scala:518)
    at kafka.log.Log.(Log.scala:228)
    at kafka.log.Log$.apply(Log.scala:1747)
    at kafka.log.LogManager.kafka$log$LogManager$$loadLog(LogManager.scala:255)
    at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$11$$anonfun$apply$15$$anonfun$apply$2.apply$mcV$sp(LogManager.scala:335)
    at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)