[jira] [Commented] (KAFKA-6881) Kafka 1.1 Broker version crashes when deleting log

2018-06-11 Thread xiaojing zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509191#comment-16509191
 ] 

xiaojing zhou commented on KAFKA-6881:
--

I have the same issue, the __consumer_offsets-7/019668089841.log does 
exist in our nas folder. My kafka version is 1.0.1.

{code}

[2018-06-11 00:04:23,282] ERROR Failed to clean up log for __consumer_offsets-7 
in dir /nas/kafka_logs/lvsp01hkf001 due to IOException 
(kafka.server.LogDirFailureChannel)

java.nio.file.NoSuchFileException: 
/nas/kafka_logs/lvsp01hkf001/__consumer_offsets-7/019668089841.log

--

java.nio.file.NoSuchFileException: 
/nas/kafka_logs/lvsp01hkf001/__consumer_offsets-7/019668089841.log

 at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)

 at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)

 at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)

 at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:409)

 at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)

 at java.nio.file.Files.move(Files.java:1395)

 at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:682)

 at org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212)

 at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:398)

 at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:1592)

 at kafka.log.Log$$anonfun$replaceSegments$1.apply(Log.scala:1644)

 at kafka.log.Log$$anonfun$replaceSegments$1.apply(Log.scala:1639)

 at scala.collection.immutable.List.foreach(List.scala:392)

{code}

> Kafka 1.1 Broker version crashes when deleting log
> --
>
> Key: KAFKA-6881
> URL: https://issues.apache.org/jira/browse/KAFKA-6881
> Project: Kafka
>  Issue Type: Bug
> Environment: Linux
>Reporter: K B Parthasarathy
>Priority: Critical
>
> Hello
> We are running Kafka 1.1 version in Linux from past 3 weeks. Today Kafka 
> crashed. When we checked server.log file the following log was found
> [2018-05-07 16:53:06,721] ERROR Failed to clean up log for 
> __consumer_offsets-24 in dir /tmp/kafka-logs due to IOException 
> (kafka.server.LogDirFailureChannel)
>  java.nio.file.NoSuchFileException: 
> /tmp/kafka-logs/__consumer_offsets-24/.log
>  at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>  at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>  at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
>  at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:409)
>  at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
>  at java.nio.file.Files.move(Files.java:1395)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697)
>  at org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212)
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:415)
>  at kafka.log.Log.asyncDeleteSegment(Log.scala:1601)
>  at kafka.log.Log.$anonfun$replaceSegments$1(Log.scala:1653)
>  at kafka.log.Log.$anonfun$replaceSegments$1$adapted(Log.scala:1648)
>  at scala.collection.immutable.List.foreach(List.scala:389)
>  at kafka.log.Log.replaceSegments(Log.scala:1648)
>  at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:535)
>  at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:462)
>  at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:461)
>  at scala.collection.immutable.List.foreach(List.scala:389)
>  at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
>  at kafka.log.Cleaner.clean(LogCleaner.scala:438)
>  at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
>  at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
>  Suppressed: java.nio.file.NoSuchFileException: 
> /tmp/kafka-logs/__consumer_offsets-24/.log -> 
> /tmp/kafka-logs/__consumer_offsets-24/.log.deleted
>  at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>  at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>  at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:396)
>  at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
>  at java.nio.file.Files.move(Files.java:1395)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:694)
>  ... 16 more
>  [2018-05-07 16:53:06,725] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir /tmp/kafka-logs (kafka.server.ReplicaManager)
>  [2018-05-07 16:53:06,762] INFO Stopping serving logs in dir /tmp/kafka-logs 
> (kafka.log.LogManager)
>  [2018-05-07 16:53:07,032] ERROR Shutdown broker because all log dirs in 
> /tmp/kafka-logs have failed 

[jira] [Commented] (KAFKA-7043) Connect isolation whitelist does not include new primitive converters (KIP-305)

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509082#comment-16509082
 ] 

ASF GitHub Bot commented on KAFKA-7043:
---

rhauch opened a new pull request #5198: KAFKA-7043: Modified plugin isolation 
whitelist with recently added converters (KIP-305)
URL: https://github.com/apache/kafka/pull/5198
 
 
   Several recently-added converters are included in the plugin isolation 
whitelist, similarly to the `StringConverter`. This is a change in the 
implementation, and does not affect the approved KIP. Several unit tests were 
added to verify they are being loaded in isolation, again similarly to 
`StringConverter`.
   
   These changes should be applied only to `trunk`, since these converters were 
added as part of KIP-305 for AK 2.0.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Connect isolation whitelist does not include new primitive converters 
> (KIP-305)
> ---
>
> Key: KAFKA-7043
> URL: https://issues.apache.org/jira/browse/KAFKA-7043
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Blocker
> Fix For: 2.0.0
>
>
> KIP-305 added several new primitive converters, but the PR did not add them 
> to the whitelist for the plugin isolation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7045) Consumer may not be able to consume all messages when down-conversion is required

2018-06-11 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509076#comment-16509076
 ] 

Dhruvil Shah commented on KAFKA-7045:
-

Attached topic data that reproduces case (1) described in the JIRA where 
messages in the first segment starting offset 70 have been cleaned out. These 
messages were part of the batch starting at offset 69.

Credit to [~omkreddy] for finding this issue and providing a reproducible test 
case.

> Consumer may not be able to consume all messages when down-conversion is 
> required
> -
>
> Key: KAFKA-7045
> URL: https://issues.apache.org/jira/browse/KAFKA-7045
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core
>Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0, 0.11.0.2, 1.1.0, 2.0.0, 1.0.1
>Reporter: Dhruvil Shah
>Priority: Major
> Fix For: 2.1.0
>
> Attachments: log-cleaner-test.zip
>
>
> When down-conversion is required, the consumer might fail consuming messages 
> under certain conditions. Couple such cases are outlined below:
> (1) When consuming from a compacted topic, it is possible that the consumer 
> wants to fetch messages that fall in the middle of a batch but the messages 
> have been compacted by the cleaner. For example, let's say we have the 
> following two segments. The brackets indicate a single batch of messages and 
> the numbers within are the message offsets.
> Segment #1: [0, 1, 2],  [3, 4, 5], [6, 7, 8]
>  Segment #2: [9, 10, 11], [12, 13, 14]
> If the cleaner were to come in now and clean up messages with offsets 7 and 
> 8, the segments would look like the following:
> Segment #1: [0, 1, 2], [3, 4, 5], [6]
>  Segment #2: [9, 10, 11], [12, 13, 14]
> A consumer attempting to fetch messages at offset 7 will start reading the 
> batch starting at offset 6. During down-conversion, we will drop the record 
> starting at 6 it is less than the current fetch start offset. However, there 
> are no messages in the log following offset 6. In such cases, we return the 
> `FileRecords` itself which would cause the consumer to throw an exception 
> because it does not understand the stored message format.
> (2) When consuming from a topic with transactional messages, down-conversion 
> usually drops control batches because these do not exist in V0 and V1 message 
> formats. If there are no message batches following the control batch in the 
> particular segment (or if we are at the end of the log), we would again get 
> no records after down-conversion and will return the `FileRecords`. Because 
> the consumer is not able to interpret control batches, it will again throw an 
> exception.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6966) Extend `TopologyDescription.Sink` to return `TopicNameExtractor`

2018-06-11 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509072#comment-16509072
 ] 

Matthias J. Sax commented on KAFKA-6966:


Thanks for your interest in contributing to Kafka! I added you to the list of 
contributors and assigned the ticket to you. You can also self-assign tickets 
now. Please prepare a KIP for this – you can also work on a PR in parallel – 
whatever works best for you. The KIP must be accepted before we can accept the 
PR. If you don't have a wiki account yet, please create one and provide your 
wiki ID so we can grant you write access to the wiki (otherwise you cannot 
create a KIP page).

> Extend `TopologyDescription.Sink` to return `TopicNameExtractor`
> 
>
> Key: KAFKA-6966
> URL: https://issues.apache.org/jira/browse/KAFKA-6966
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Matthias J. Sax
>Assignee: Nishanth Pradeep
>Priority: Major
>  Labels: beginner, needs-kip, newbie
>
> With KIP-303, a dynamic routing feature was added and 
> `TopologyDescription.Sink#topic()` returns `null` if this feature is used.
> It would be useful to get the actually used `TopicNameExtractor` class from 
> the `TopologyDescription`.
> We suggest to add `Class 
> TopologyDescription.Sink#topicNameExtractor()` and let it return `null` if 
> dynamic routing feature is not used.
> This is a public API change and requires a KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7045) Consumer may not be able to consume all messages when down-conversion is required

2018-06-11 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-7045:

Attachment: log-cleaner-test.zip

> Consumer may not be able to consume all messages when down-conversion is 
> required
> -
>
> Key: KAFKA-7045
> URL: https://issues.apache.org/jira/browse/KAFKA-7045
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core
>Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0, 0.11.0.2, 1.1.0, 2.0.0, 1.0.1
>Reporter: Dhruvil Shah
>Priority: Major
> Fix For: 2.1.0
>
> Attachments: log-cleaner-test.zip
>
>
> When down-conversion is required, the consumer might fail consuming messages 
> under certain conditions. Couple such cases are outlined below:
> (1) When consuming from a compacted topic, it is possible that the consumer 
> wants to fetch messages that fall in the middle of a batch but the messages 
> have been compacted by the cleaner. For example, let's say we have the 
> following two segments. The brackets indicate a single batch of messages and 
> the numbers within are the message offsets.
> Segment #1: [0, 1, 2],  [3, 4, 5], [6, 7, 8]
>  Segment #2: [9, 10, 11], [12, 13, 14]
> If the cleaner were to come in now and clean up messages with offsets 7 and 
> 8, the segments would look like the following:
> Segment #1: [0, 1, 2], [3, 4, 5], [6]
>  Segment #2: [9, 10, 11], [12, 13, 14]
> A consumer attempting to fetch messages at offset 7 will start reading the 
> batch starting at offset 6. During down-conversion, we will drop the record 
> starting at 6 it is less than the current fetch start offset. However, there 
> are no messages in the log following offset 6. In such cases, we return the 
> `FileRecords` itself which would cause the consumer to throw an exception 
> because it does not understand the stored message format.
> (2) When consuming from a topic with transactional messages, down-conversion 
> usually drops control batches because these do not exist in V0 and V1 message 
> formats. If there are no message batches following the control batch in the 
> particular segment (or if we are at the end of the log), we would again get 
> no records after down-conversion and will return the `FileRecords`. Because 
> the consumer is not able to interpret control batches, it will again throw an 
> exception.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7045) Consumer may not be able to consume all messages when down-conversion is required

2018-06-11 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-7045:

Description: 
When down-conversion is required, the consumer might fail consuming messages 
under certain conditions. Couple such cases are outlined below:

(1) When consuming from a compacted topic, it is possible that the consumer 
wants to fetch messages that fall in the middle of a batch but the messages 
have been compacted by the cleaner. For example, let's say we have the 
following two segments. The brackets indicate a single batch of messages and 
the numbers within are the message offsets.

Segment #1: [0, 1, 2],  [3, 4, 5], [6, 7, 8]
 Segment #2: [9, 10, 11], [12, 13, 14]

If the cleaner were to come in now and clean up messages with offsets 7 and 8, 
the segments would look like the following:

Segment #1: [0, 1, 2], [3, 4, 5], [6]
 Segment #2: [9, 10, 11], [12, 13, 14]

A consumer attempting to fetch messages at offset 7 will start reading the 
batch starting at offset 6. During down-conversion, we will drop the record 
starting at 6 it is less than the current fetch start offset. However, there 
are no messages in the log following offset 6. In such cases, we return the 
`FileRecords` itself which would cause the consumer to throw an exception 
because it does not understand the stored message format.

(2) When consuming from a topic with transactional messages, down-conversion 
usually drops control batches because these do not exist in V0 and V1 message 
formats. If there are no message batches following the control batch in the 
particular segment (or if we are at the end of the log), we would again get no 
records after down-conversion and will return the `FileRecords`. Because the 
consumer is not able to interpret control batches, it will again throw an 
exception.

  was:
When down-conversion is required, the consumer might fail consuming messages 
under certain conditions. Couple such cases are outlined below:

 

(1) When consuming from a compacted topic, it is possible that the consumer 
wants to fetch messages that fall in the middle of a batch but the messages 
have been compacted by the cleaner. For example, let's say we have the 
following two segments. The brackets indicate a single batch of messages and 
the numbers within are the message offsets.

Segment #1: [0, 1, 2],  [3, 4, 5], [6, 7, 8]
 Segment #2: [9, 10, 11], [12, 13, 14]

If the cleaner were to come in now and clean up messages with offsets 7 and 8, 
the segments would look like the following:

Segment #1: [0, 1, 2], [3, 4, 5], [6]
 Segment #2: [9, 10, 11], [12, 13, 14]

A consumer attempting to fetch messages at offset 7 will start reading the 
batch starting at offset 6. During down-conversion, we will drop the record 
starting at 6 it is less than the current fetch start offset. However, there 
are no messages in the log following offset 6. In such cases, we return the 
`FileRecords` itself which would cause the consumer to throw an exception 
because it does not understand the stored message format.

 

(2) When consuming from a topic with transactional messages, down-conversion 
usually drops control batches because these did not exist in V0 and V1 message 
formats. If there are no message batches following the control batch in the 
particular segment (or if we are at the end of the log), we would again get no 
records after down-conversion and will return the `FileRecords`. Because the 
consumer is not able to interpret control batches, it will again throw an 
exception.


> Consumer may not be able to consume all messages when down-conversion is 
> required
> -
>
> Key: KAFKA-7045
> URL: https://issues.apache.org/jira/browse/KAFKA-7045
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core
>Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0, 0.11.0.2, 1.1.0, 2.0.0, 1.0.1
>Reporter: Dhruvil Shah
>Priority: Major
> Fix For: 2.1.0
>
>
> When down-conversion is required, the consumer might fail consuming messages 
> under certain conditions. Couple such cases are outlined below:
> (1) When consuming from a compacted topic, it is possible that the consumer 
> wants to fetch messages that fall in the middle of a batch but the messages 
> have been compacted by the cleaner. For example, let's say we have the 
> following two segments. The brackets indicate a single batch of messages and 
> the numbers within are the message offsets.
> Segment #1: [0, 1, 2],  [3, 4, 5], [6, 7, 8]
>  Segment #2: [9, 10, 11], [12, 13, 14]
> If the cleaner were to come in now and clean up messages with offsets 7 and 
> 8, the segments would look like the following:
> Segment #1: [0, 1, 2], [3, 4, 5], [6]
>  Segment #2: [9, 10, 11], [12, 13, 14]
> A consumer 

[jira] [Assigned] (KAFKA-6966) Extend `TopologyDescription.Sink` to return `TopicNameExtractor`

2018-06-11 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-6966:
--

Assignee: Nishanth Pradeep

> Extend `TopologyDescription.Sink` to return `TopicNameExtractor`
> 
>
> Key: KAFKA-6966
> URL: https://issues.apache.org/jira/browse/KAFKA-6966
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Matthias J. Sax
>Assignee: Nishanth Pradeep
>Priority: Major
>  Labels: beginner, needs-kip, newbie
>
> With KIP-303, a dynamic routing feature was added and 
> `TopologyDescription.Sink#topic()` returns `null` if this feature is used.
> It would be useful to get the actually used `TopicNameExtractor` class from 
> the `TopologyDescription`.
> We suggest to add `Class 
> TopologyDescription.Sink#topicNameExtractor()` and let it return `null` if 
> dynamic routing feature is not used.
> This is a public API change and requires a KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7045) Consumer may not be able to consume all messages when down-conversion is required

2018-06-11 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-7045:

Description: 
When down-conversion is required, the consumer might fail consuming messages 
under certain conditions. Couple such cases are outlined below:

 

(1) When consuming from a compacted topic, it is possible that the consumer 
wants to fetch messages that fall in the middle of a batch but the messages 
have been compacted by the cleaner. For example, let's say we have the 
following two segments. The brackets indicate a single batch of messages and 
the numbers within are the message offsets.

Segment #1: [0, 1, 2],  [3, 4, 5], [6, 7, 8]
 Segment #2: [9, 10, 11], [12, 13, 14]

If the cleaner were to come in now and clean up messages with offsets 7 and 8, 
the segments would look like the following:

Segment #1: [0, 1, 2], [3, 4, 5], [6]
 Segment #2: [9, 10, 11], [12, 13, 14]

A consumer attempting to fetch messages at offset 7 will start reading the 
batch starting at offset 6. During down-conversion, we will drop the record 
starting at 6 it is less than the current fetch start offset. However, there 
are no messages in the log following offset 6. In such cases, we return the 
`FileRecords` itself which would cause the consumer to throw an exception 
because it does not understand the stored message format.

 

(2) When consuming from a topic with transactional messages, down-conversion 
usually drops control batches because these did not exist in V0 and V1 message 
formats. If there are no message batches following the control batch in the 
particular segment (or if we are at the end of the log), we would again get no 
records after down-conversion and will return the `FileRecords`. Because the 
consumer is not able to interpret control batches, it will again throw an 
exception.

  was:
When down-conversion is required, the consumer might fail consuming messages 
under certain conditions. Couple such cases are outlined below:
 # When consuming from a compacted topic, it is possible that the consumer 
wants to fetch messages that fall in the middle of a batch but the messages 
have been compacted by the cleaner. For example, let's say we have the 
following two segments. The brackets indicate a single batch of messages and 
the numbers within are the message offsets.

Segment #1: [0, 1, 2],  [3, 4, 5], [6, 7, 8]
 Segment #2: [9, 10, 11], [12, 13, 14]

If the cleaner were to come in now and clean up messages with offsets 7 and 8, 
the segments would look like the following:

Segment #1: [0, 1, 2], [3, 4, 5], [6]
 Segment #2: [9, 10, 11], [12, 13, 14]

A consumer attempting to fetch messages at offset 7 will start reading the 
batch starting at offset 6. During down-conversion, we will drop the record 
starting at 6 it is less than the current fetch start offset. However, there 
are no messages in the log following offset 6. In such cases, we return the 
`FileRecords` itself which would cause the consumer to throw an exception 
because it does not understand the stored message format.

(1) When consuming from a topic with transactional messages, down-conversion 
usually drops control batches because these did not exist in V0 and V1 message 
formats. If there are no message batches following the control batch in the 
particular segment (or if we are at the end of the log), we would again get no 
records after down-conversion and will return the `FileRecords`. Because the 
consumer is not able to interpret control batches, it will again throw an 
exception.

(2) When consuming from a topic with transactional messages, down-conversion 
usually drops control batches because these did not exist in V0 and V1 message 
formats. If there are no message batches following the control batch in the 
particular segment (or if we are at the end of the log), we would again get no 
records after down-conversion and will return the `FileRecords`. Because the 
consumer is not able to interpret control batches, it will again throw an 
exception.


> Consumer may not be able to consume all messages when down-conversion is 
> required
> -
>
> Key: KAFKA-7045
> URL: https://issues.apache.org/jira/browse/KAFKA-7045
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core
>Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0, 0.11.0.2, 1.1.0, 2.0.0, 1.0.1
>Reporter: Dhruvil Shah
>Priority: Major
> Fix For: 2.1.0
>
>
> When down-conversion is required, the consumer might fail consuming messages 
> under certain conditions. Couple such cases are outlined below:
>  
> (1) When consuming from a compacted topic, it is possible that the consumer 
> wants to fetch messages that fall in the middle of a batch but the messages 
> have been compacted by the 

[jira] [Updated] (KAFKA-7045) Consumer may not be able to consume all messages when down-conversion is required

2018-06-11 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-7045:

Description: 
When down-conversion is required, the consumer might fail consuming messages 
under certain conditions. Couple such cases are outlined below:
 # When consuming from a compacted topic, it is possible that the consumer 
wants to fetch messages that fall in the middle of a batch but the messages 
have been compacted by the cleaner. For example, let's say we have the 
following two segments. The brackets indicate a single batch of messages and 
the numbers within are the message offsets.

Segment #1: [0, 1, 2],  [3, 4, 5], [6, 7, 8]
 Segment #2: [9, 10, 11], [12, 13, 14]

If the cleaner were to come in now and clean up messages with offsets 7 and 8, 
the segments would look like the following:

Segment #1: [0, 1, 2], [3, 4, 5], [6]
 Segment #2: [9, 10, 11], [12, 13, 14]

A consumer attempting to fetch messages at offset 7 will start reading the 
batch starting at offset 6. During down-conversion, we will drop the record 
starting at 6 it is less than the current fetch start offset. However, there 
are no messages in the log following offset 6. In such cases, we return the 
`FileRecords` itself which would cause the consumer to throw an exception 
because it does not understand the stored message format.
 # When consuming from a topic with transactional messages, down-conversion 
usually drops control batches because these did not exist in V0 and V1 message 
formats. If there are no message batches following the control batch in the 
particular segment (or if we are at the end of the log), we would again get no 
records after down-conversion and will return the `FileRecords`. Because the 
consumer is not able to interpret control batches, it will again throw an 
exception.


 # When consuming from a topic with transactional messages, down-conversion 
usually drops control batches because these did not exist in V0 and V1 message 
formats. If there are no message batches following the control batch in the 
particular segment (or if we are at the end of the log), we would again get no 
records after down-conversion and will return the `FileRecords`. Because the 
consumer is not able to interpret control batches, it will again throw an 
exception.

  was:
When down-conversion is required, the consumer might fail consuming messages 
under certain conditions. Couple such cases are outlined below:
 # When consuming from a compacted topic, it is possible that the consumer 
wants to fetch messages that fall in the middle of a batch but the messages 
have been compacted by the cleaner. For example, let's say we have the 
following two segments. The brackets indicate a single batch of messages and 
the numbers within are the message offsets.

Segment #1: [0, 1, 2],  [3, 4, 5], [6, 7, 8]
 Segment #2: [9, 10, 11], [12, 13, 14]

If the cleaner were to come in now and clean up messages with offsets 7 and 8, 
the segments would look like the following:

Segment #1: [0, 1, 2], [3, 4, 5], [6]
 Segment #2: [9, 10, 11], [12, 13, 14]

A consumer attempting to fetch messages at offset 7 will start reading the 
batch starting at offset 6. During down-conversion, we will drop the record 
starting at 6 it is less than the current fetch start offset. However, there 
are no messages in the log following offset 6. In such cases, we return the 
`FileRecords` itself which would cause the consumer to throw an exception 
because it does not understand the stored message format.
 # When consuming from a topic with transactional messages, down-conversion 
usually drops control batches because these did not exist in V0 and V1 message 
formats. If there are no message batches following the control batch in the 
particular segment (or if we are at the end of the log), we would again get no 
records after down-conversion and will return the `FileRecords`. Because the 
consumer is not able to interpret control batches, it will again throw an 
exception.

Relevant code from 1.x release that sends `FileRecords` when we are not able to 
down-convert any messages:
{quote}public ConvertedRecords downConvert(byte toMagic, 
long firstOffset, Time time) {
 ConvertedRecords convertedRecords = downConvert(batches, 
toMagic, firstOffset, time);
 if (convertedRecords.recordsProcessingStats().numRecordsConverted() == 0) {
 // This indicates that the message is too large, which means that the buffer 
is not large
 // enough to hold a full record batch. We just return all the bytes in this 
instance.
 // Even though the record batch does not have the right format version, we 
expect old clients
 // to raise an error to the user after reading the record batch size and 
seeing that there
 // are not enough available bytes in the response to read it fully. Note that 
this is
 // only possible prior to KIP-74, after which the broker was changed to always 

[jira] [Updated] (KAFKA-7045) Consumer may not be able to consume all messages when down-conversion is required

2018-06-11 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-7045:

Description: 
When down-conversion is required, the consumer might fail consuming messages 
under certain conditions. Couple such cases are outlined below:
 # When consuming from a compacted topic, it is possible that the consumer 
wants to fetch messages that fall in the middle of a batch but the messages 
have been compacted by the cleaner. For example, let's say we have the 
following two segments. The brackets indicate a single batch of messages and 
the numbers within are the message offsets.

Segment #1: [0, 1, 2],  [3, 4, 5], [6, 7, 8]
 Segment #2: [9, 10, 11], [12, 13, 14]

If the cleaner were to come in now and clean up messages with offsets 7 and 8, 
the segments would look like the following:

Segment #1: [0, 1, 2], [3, 4, 5], [6]
 Segment #2: [9, 10, 11], [12, 13, 14]

A consumer attempting to fetch messages at offset 7 will start reading the 
batch starting at offset 6. During down-conversion, we will drop the record 
starting at 6 it is less than the current fetch start offset. However, there 
are no messages in the log following offset 6. In such cases, we return the 
`FileRecords` itself which would cause the consumer to throw an exception 
because it does not understand the stored message format.

(1) When consuming from a topic with transactional messages, down-conversion 
usually drops control batches because these did not exist in V0 and V1 message 
formats. If there are no message batches following the control batch in the 
particular segment (or if we are at the end of the log), we would again get no 
records after down-conversion and will return the `FileRecords`. Because the 
consumer is not able to interpret control batches, it will again throw an 
exception.

(2) When consuming from a topic with transactional messages, down-conversion 
usually drops control batches because these did not exist in V0 and V1 message 
formats. If there are no message batches following the control batch in the 
particular segment (or if we are at the end of the log), we would again get no 
records after down-conversion and will return the `FileRecords`. Because the 
consumer is not able to interpret control batches, it will again throw an 
exception.

  was:
When down-conversion is required, the consumer might fail consuming messages 
under certain conditions. Couple such cases are outlined below:
 # When consuming from a compacted topic, it is possible that the consumer 
wants to fetch messages that fall in the middle of a batch but the messages 
have been compacted by the cleaner. For example, let's say we have the 
following two segments. The brackets indicate a single batch of messages and 
the numbers within are the message offsets.

Segment #1: [0, 1, 2],  [3, 4, 5], [6, 7, 8]
 Segment #2: [9, 10, 11], [12, 13, 14]

If the cleaner were to come in now and clean up messages with offsets 7 and 8, 
the segments would look like the following:

Segment #1: [0, 1, 2], [3, 4, 5], [6]
 Segment #2: [9, 10, 11], [12, 13, 14]

A consumer attempting to fetch messages at offset 7 will start reading the 
batch starting at offset 6. During down-conversion, we will drop the record 
starting at 6 it is less than the current fetch start offset. However, there 
are no messages in the log following offset 6. In such cases, we return the 
`FileRecords` itself which would cause the consumer to throw an exception 
because it does not understand the stored message format.
 # When consuming from a topic with transactional messages, down-conversion 
usually drops control batches because these did not exist in V0 and V1 message 
formats. If there are no message batches following the control batch in the 
particular segment (or if we are at the end of the log), we would again get no 
records after down-conversion and will return the `FileRecords`. Because the 
consumer is not able to interpret control batches, it will again throw an 
exception.


 # When consuming from a topic with transactional messages, down-conversion 
usually drops control batches because these did not exist in V0 and V1 message 
formats. If there are no message batches following the control batch in the 
particular segment (or if we are at the end of the log), we would again get no 
records after down-conversion and will return the `FileRecords`. Because the 
consumer is not able to interpret control batches, it will again throw an 
exception.


> Consumer may not be able to consume all messages when down-conversion is 
> required
> -
>
> Key: KAFKA-7045
> URL: https://issues.apache.org/jira/browse/KAFKA-7045
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core
>Affects Versions: 0.11.0.0, 0.11.0.1, 

[jira] [Updated] (KAFKA-7045) Consumer may not be able to consume all messages when down-conversion is required

2018-06-11 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-7045:

Description: 
When down-conversion is required, the consumer might fail consuming messages 
under certain conditions. Couple such cases are outlined below:
 # When consuming from a compacted topic, it is possible that the consumer 
wants to fetch messages that fall in the middle of a batch but the messages 
have been compacted by the cleaner. For example, let's say we have the 
following two segments. The brackets indicate a single batch of messages and 
the numbers within are the message offsets.

Segment #1: [0, 1, 2],  [3, 4, 5], [6, 7, 8]
 Segment #2: [9, 10, 11], [12, 13, 14]

If the cleaner were to come in now and clean up messages with offsets 7 and 8, 
the segments would look like the following:

Segment #1: [0, 1, 2], [3, 4, 5], [6]
 Segment #2: [9, 10, 11], [12, 13, 14]

A consumer attempting to fetch messages at offset 7 will start reading the 
batch starting at offset 6. During down-conversion, we will drop the record 
starting at 6 it is less than the current fetch start offset. However, there 
are no messages in the log following offset 6. In such cases, we return the 
`FileRecords` itself which would cause the consumer to throw an exception 
because it does not understand the stored message format.
 # When consuming from a topic with transactional messages, down-conversion 
usually drops control batches because these did not exist in V0 and V1 message 
formats. If there are no message batches following the control batch in the 
particular segment (or if we are at the end of the log), we would again get no 
records after down-conversion and will return the `FileRecords`. Because the 
consumer is not able to interpret control batches, it will again throw an 
exception.

Relevant code from 1.x release that sends `FileRecords` when we are not able to 
down-convert any messages:
{quote}public ConvertedRecords downConvert(byte toMagic, 
long firstOffset, Time time) {
 ConvertedRecords convertedRecords = downConvert(batches, 
toMagic, firstOffset, time);
 if (convertedRecords.recordsProcessingStats().numRecordsConverted() == 0) {
 // This indicates that the message is too large, which means that the buffer 
is not large
 // enough to hold a full record batch. We just return all the bytes in this 
instance.
 // Even though the record batch does not have the right format version, we 
expect old clients
 // to raise an error to the user after reading the record batch size and 
seeing that there
 // are not enough available bytes in the response to read it fully. Note that 
this is
 // only possible prior to KIP-74, after which the broker was changed to always 
return at least
 // one full record batch, even if it requires exceeding the max fetch size 
requested by the client.
 return new ConvertedRecords<>(this, RecordsProcessingStats.EMPTY);
 } else {
 return convertedRecords;
 }
}
{quote}

  was:
When down-conversion is required, the consumer might fail consuming messages 
under certain conditions. Couple such cases are outlined below:
 # When consuming from a compacted topic, it is possible that the consumer 
wants to fetch messages that fall in the middle of a batch but the messages 
have been compacted by the cleaner. For example, let's say we have the 
following two segments. The brackets indicate a single batch of messages and 
the numbers within are the message offsets.

Segment #1: [0, 1, 2],  [3, 4, 5], [6, 7, 8]
 Segment #2: [9, 10, 11], [12, 13, 14]

If the cleaner were to come in now and clean up messages with offsets 7 and 8, 
the segments would look like the following:

Segment #1: [0, 1, 2], [3, 4, 5], [6]
 Segment #2: [9, 10, 11], [12, 13, 14]

A consumer attempting to fetch messages at offset 7 will start reading the 
batch starting at offset 6. During down-conversion, we will drop the record 
starting at 6 it is less than the current fetch start offset. However, there 
are no messages in the log following offset 6. In such cases, we return the 
`FileRecords` itself which would cause the consumer to throw an exception 
because it does not understand the stored message format.
 # When consuming from a topic with transactional messages, down-conversion 
usually drops control batches because these did not exist in V0 and V1 message 
formats. If there are no message batches following the control batch in the 
particular segment (or if we are at the end of the log), we would again get no 
records after down-conversion and will return the `FileRecords`. Because the 
consumer is not able to interpret control batches, it will again throw an 
exception.

Relevant code from 1.x release that sends `FileRecords` when we are not able to 
down-convert any messages:



{{public ConvertedRecords downConvert(byte toMagic, long 
firstOffset, Time time) {}}
{{ ConvertedRecords convertedRecords = 

[jira] [Updated] (KAFKA-7045) Consumer may not be able to consume all messages when down-conversion is required

2018-06-11 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-7045:

Description: 
When down-conversion is required, the consumer might fail consuming messages 
under certain conditions. Couple such cases are outlined below:
 # When consuming from a compacted topic, it is possible that the consumer 
wants to fetch messages that fall in the middle of a batch but the messages 
have been compacted by the cleaner. For example, let's say we have the 
following two segments. The brackets indicate a single batch of messages and 
the numbers within are the message offsets.

Segment #1: [0, 1, 2],  [3, 4, 5], [6, 7, 8]
 Segment #2: [9, 10, 11], [12, 13, 14]

If the cleaner were to come in now and clean up messages with offsets 7 and 8, 
the segments would look like the following:

Segment #1: [0, 1, 2], [3, 4, 5], [6]
 Segment #2: [9, 10, 11], [12, 13, 14]

A consumer attempting to fetch messages at offset 7 will start reading the 
batch starting at offset 6. During down-conversion, we will drop the record 
starting at 6 it is less than the current fetch start offset. However, there 
are no messages in the log following offset 6. In such cases, we return the 
`FileRecords` itself which would cause the consumer to throw an exception 
because it does not understand the stored message format.
 # When consuming from a topic with transactional messages, down-conversion 
usually drops control batches because these did not exist in V0 and V1 message 
formats. If there are no message batches following the control batch in the 
particular segment (or if we are at the end of the log), we would again get no 
records after down-conversion and will return the `FileRecords`. Because the 
consumer is not able to interpret control batches, it will again throw an 
exception.

Relevant code from 1.x release that sends `FileRecords` when we are not able to 
down-convert any messages:



{{public ConvertedRecords downConvert(byte toMagic, long 
firstOffset, Time time) {}}
{{ ConvertedRecords convertedRecords = downConvert(batches, 
toMagic, firstOffset, time);}}
{{ if (convertedRecords.recordsProcessingStats().numRecordsConverted() == 0) {}}
{{ // This indicates that the message is too large, which means that the buffer 
is not large}}
{{ // enough to hold a full record batch. We just return all the bytes in this 
instance.}}
{{ // Even though the record batch does not have the right format version, we 
expect old clients}}
{{ // to raise an error to the user after reading the record batch size and 
seeing that there}}
{{ // are not enough available bytes in the response to read it fully. Note 
that this is}}
{{ // only possible prior to KIP-74, after which the broker was changed to 
always return at least}}
{{ // one full record batch, even if it requires exceeding the max fetch size 
requested by the client.}}
{{ return new ConvertedRecords<>(this, RecordsProcessingStats.EMPTY);}}
{{ } else {}}
{{ return convertedRecords;}}
{{ }}}
{{}}}

  was:
When down-conversion is required, the consumer might fail consuming messages 
under certain conditions. Couple such cases are outlined below:
 # When consuming from a compacted topic, it is possible that the consumer 
wants to fetch messages that fall in the middle of a batch but the messages 
have been compacted by the cleaner. For example, let's say we have the 
following two segments. The brackets indicate a single batch of messages and 
the numbers within are the message offsets.

Segment #1: [0, 1, 2],  [3, 4, 5], [6, 7, 8]
Segment #2: [9, 10, 11], [12, 13, 14]

If the cleaner were to come in now and clean up messages with offsets 7 and 8, 
the segments would look like the following:

Segment #1: [0, 1, 2], [3, 4, 5], [6]
Segment #2: [9, 10, 11], [12, 13, 14]

A consumer attempting to fetch messages at offset 7 will start reading the 
batch starting at offset 6. During down-conversion, we will drop the record 
starting at 6 it is less than the current fetch start offset. However, there 
are no messages in the log following offset 6. In such cases, we return the 
`FileRecords` itself which would cause the consumer to throw an exception 
because it does not understand the stored message format.


 # When consuming from a topic with transactional messages, down-conversion 
usually drops control batches because these did not exist in V0 and V1 message 
formats. If there are no message batches following the control batch in the 
particular segment (or if we are at the end of the log), we would again get no 
records after down-conversion and will return the `FileRecords`. Because the 
consumer is not able to interpret control batches, it will again throw an 
exception.

Relevant code from 1.x release that sends `FileRecords` when we are not able to 
down-convert any messages:
```
public ConvertedRecords downConvert(byte toMagic, long 
firstOffset, Time time) {
 

[jira] [Created] (KAFKA-7045) Consumer may not be able to consume all messages when down-conversion is required

2018-06-11 Thread Dhruvil Shah (JIRA)
Dhruvil Shah created KAFKA-7045:
---

 Summary: Consumer may not be able to consume all messages when 
down-conversion is required
 Key: KAFKA-7045
 URL: https://issues.apache.org/jira/browse/KAFKA-7045
 Project: Kafka
  Issue Type: Bug
  Components: consumer, core
Affects Versions: 1.0.1, 1.1.0, 0.11.0.2, 1.0.0, 0.11.0.1, 0.11.0.0, 2.0.0
Reporter: Dhruvil Shah
 Fix For: 2.1.0


When down-conversion is required, the consumer might fail consuming messages 
under certain conditions. Couple such cases are outlined below:
 # When consuming from a compacted topic, it is possible that the consumer 
wants to fetch messages that fall in the middle of a batch but the messages 
have been compacted by the cleaner. For example, let's say we have the 
following two segments. The brackets indicate a single batch of messages and 
the numbers within are the message offsets.

Segment #1: [0, 1, 2],  [3, 4, 5], [6, 7, 8]
Segment #2: [9, 10, 11], [12, 13, 14]

If the cleaner were to come in now and clean up messages with offsets 7 and 8, 
the segments would look like the following:

Segment #1: [0, 1, 2], [3, 4, 5], [6]
Segment #2: [9, 10, 11], [12, 13, 14]

A consumer attempting to fetch messages at offset 7 will start reading the 
batch starting at offset 6. During down-conversion, we will drop the record 
starting at 6 it is less than the current fetch start offset. However, there 
are no messages in the log following offset 6. In such cases, we return the 
`FileRecords` itself which would cause the consumer to throw an exception 
because it does not understand the stored message format.


 # When consuming from a topic with transactional messages, down-conversion 
usually drops control batches because these did not exist in V0 and V1 message 
formats. If there are no message batches following the control batch in the 
particular segment (or if we are at the end of the log), we would again get no 
records after down-conversion and will return the `FileRecords`. Because the 
consumer is not able to interpret control batches, it will again throw an 
exception.

Relevant code from 1.x release that sends `FileRecords` when we are not able to 
down-convert any messages:
```
public ConvertedRecords downConvert(byte toMagic, long 
firstOffset, Time time) {
 ConvertedRecords convertedRecords = downConvert(batches, 
toMagic, firstOffset, time);
 if (convertedRecords.recordsProcessingStats().numRecordsConverted() == 0) {
 // This indicates that the message is too large, which means that the buffer 
is not large
 // enough to hold a full record batch. We just return all the bytes in this 
instance.
 // Even though the record batch does not have the right format version, we 
expect old clients
 // to raise an error to the user after reading the record batch size and 
seeing that there
 // are not enough available bytes in the response to read it fully. Note that 
this is
 // only possible prior to KIP-74, after which the broker was changed to always 
return at least
 // one full record batch, even if it requires exceeding the max fetch size 
requested by the client.
 return new ConvertedRecords<>(this, RecordsProcessingStats.EMPTY);
 } else {
 return convertedRecords;
 }
}
``` 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6966) Extend `TopologyDescription.Sink` to return `TopicNameExtractor`

2018-06-11 Thread Nishanth Pradeep (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509062#comment-16509062
 ] 

Nishanth Pradeep commented on KAFKA-6966:
-

Hello [~mjsax]. I would like to pick this up ticket as my first contribution to 
Kafka.

> Extend `TopologyDescription.Sink` to return `TopicNameExtractor`
> 
>
> Key: KAFKA-6966
> URL: https://issues.apache.org/jira/browse/KAFKA-6966
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: beginner, needs-kip, newbie
>
> With KIP-303, a dynamic routing feature was added and 
> `TopologyDescription.Sink#topic()` returns `null` if this feature is used.
> It would be useful to get the actually used `TopicNameExtractor` class from 
> the `TopologyDescription`.
> We suggest to add `Class 
> TopologyDescription.Sink#topicNameExtractor()` and let it return `null` if 
> dynamic routing feature is not used.
> This is a public API change and requires a KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7044) kafka-consumer-groups.sh NullPointerException describing round robin or sticky assignors

2018-06-11 Thread Vahid Hashemian (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian reassigned KAFKA-7044:
--

Assignee: Vahid Hashemian

> kafka-consumer-groups.sh NullPointerException describing round robin or 
> sticky assignors
> 
>
> Key: KAFKA-7044
> URL: https://issues.apache.org/jira/browse/KAFKA-7044
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 1.1.0
> Environment: CentOS 7.4, Oracle JDK 1.8
>Reporter: Jeff Field
>Assignee: Vahid Hashemian
>Priority: Minor
>
> We've recently moved to using the round robin assignor for one of our 
> consumer groups, and started testing the sticky assignor. In both cases, 
> using Kafka 1.1.0 we get a null pointer exception *unless* the group being 
> described is rebalancing:
> {code:java}
> kafka-consumer-groups --bootstrap-server fqdn:9092 --describe --group 
> groupname-for-consumer
> Error: Executing consumer group command failed due to null
> [2018-06-12 01:32:34,179] DEBUG Exception in consumer group command 
> (kafka.admin.ConsumerGroupCommand$)
> java.lang.NullPointerException
> at scala.Predef$.Long2long(Predef.scala:363)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$getLogEndOffsets$2.apply(ConsumerGroupCommand.scala:612)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$getLogEndOffsets$2.apply(ConsumerGroupCommand.scala:610)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:296)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.getLogEndOffsets(ConsumerGroupCommand.scala:610)
> at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describePartitions(ConsumerGroupCommand.scala:328)
> at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.collectConsumerAssignment(ConsumerGroupCommand.scala:308)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.collectConsumerAssignment(ConsumerGroupCommand.scala:544)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10$$anonfun$13.apply(ConsumerGroupCommand.scala:571)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10$$anonfun$13.apply(ConsumerGroupCommand.scala:565)
> at scala.collection.immutable.List.flatMap(List.scala:338)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10.apply(ConsumerGroupCommand.scala:565)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10.apply(ConsumerGroupCommand.scala:558)
> at scala.Option.map(Option.scala:146)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.collectGroupOffsets(ConsumerGroupCommand.scala:558)
> at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describeGroup(ConsumerGroupCommand.scala:271)
> at 
> kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:544)
> at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:77)
> at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
> [2018-06-12 01:32:34,255] DEBUG Removed sensor with name connections-closed: 
> (org.apache.kafka.common.metrics.Metrics){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7044) kafka-consumer-groups.sh NullPointerException describing round robin or sticky assignors

2018-06-11 Thread Jeff Field (JIRA)
Jeff Field created KAFKA-7044:
-

 Summary: kafka-consumer-groups.sh NullPointerException describing 
round robin or sticky assignors
 Key: KAFKA-7044
 URL: https://issues.apache.org/jira/browse/KAFKA-7044
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 1.1.0
 Environment: CentOS 7.4, Oracle JDK 1.8
Reporter: Jeff Field


We've recently moved to using the round robin assignor for one of our consumer 
groups, and started testing the sticky assignor. In both cases, using Kafka 
1.1.0 we get a null pointer exception *unless* the group being described is 
rebalancing:
{code:java}
kafka-consumer-groups --bootstrap-server fqdn:9092 --describe --group 
groupname-for-consumer

Error: Executing consumer group command failed due to null
[2018-06-12 01:32:34,179] DEBUG Exception in consumer group command 
(kafka.admin.ConsumerGroupCommand$)
java.lang.NullPointerException
at scala.Predef$.Long2long(Predef.scala:363)
at 
kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$getLogEndOffsets$2.apply(ConsumerGroupCommand.scala:612)
at 
kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$getLogEndOffsets$2.apply(ConsumerGroupCommand.scala:610)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:296)
at 
kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.getLogEndOffsets(ConsumerGroupCommand.scala:610)
at 
kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describePartitions(ConsumerGroupCommand.scala:328)
at 
kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.collectConsumerAssignment(ConsumerGroupCommand.scala:308)
at 
kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.collectConsumerAssignment(ConsumerGroupCommand.scala:544)
at 
kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10$$anonfun$13.apply(ConsumerGroupCommand.scala:571)
at 
kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10$$anonfun$13.apply(ConsumerGroupCommand.scala:565)
at scala.collection.immutable.List.flatMap(List.scala:338)
at 
kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10.apply(ConsumerGroupCommand.scala:565)
at 
kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10.apply(ConsumerGroupCommand.scala:558)
at scala.Option.map(Option.scala:146)
at 
kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.collectGroupOffsets(ConsumerGroupCommand.scala:558)
at 
kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describeGroup(ConsumerGroupCommand.scala:271)
at 
kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:544)
at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:77)
at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
[2018-06-12 01:32:34,255] DEBUG Removed sensor with name connections-closed: 
(org.apache.kafka.common.metrics.Metrics){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7023) Kafka Streams RocksDB bulk loading config may not be honored with customized RocksDBConfigSetter

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509014#comment-16509014
 ] 

ASF GitHub Bot commented on KAFKA-7023:
---

guozhangwang opened a new pull request #5197: KAFKA-7023: Add unit test
URL: https://github.com/apache/kafka/pull/5197
 
 
   Add a unit test that validates after restoreStart, the options are set with 
bulk loading configs; and after restoreEnd, it resumes to the customized configs
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Streams RocksDB bulk loading config may not be honored with customized 
> RocksDBConfigSetter 
> -
>
> Key: KAFKA-7023
> URL: https://issues.apache.org/jira/browse/KAFKA-7023
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Liquan Pei
>Assignee: Liquan Pei
>Priority: Major
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> We observed frequent L0 -> L1 compaction during Kafka Streams state recovery. 
> Some sample log:
> {code:java}
> 2018/06/08-00:04:50.892331 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892298) [db/compaction_picker_universal.cc:270] [default] 
> Universal: sorted runs files(6): files[3 0 0 0 1 1 38] max score 1.00
> 2018/06/08-00:04:50.892336 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892300) [db/compaction_picker_universal.cc:655] [default] 
> Universal: First candidate file 134[0] to reduce size amp.
> 2018/06/08-00:04:50.892338 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892302) [db/compaction_picker_universal.cc:686] [default] 
> Universal: size amp not needed. newer-files-total-size 13023497 
> earliest-file-size 2541530372
> 2018/06/08-00:04:50.892339 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892303) [db/compaction_picker_universal.cc:473] [default] 
> Universal: Possible candidate file 134[0].
> 2018/06/08-00:04:50.892341 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892304) [db/compaction_picker_universal.cc:525] [default] 
> Universal: Skipping file 134[0] with size 1007 (compensated size 1287)
> 2018/06/08-00:04:50.892343 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892306) [db/compaction_picker_universal.cc:473] [default] 
> Universal: Possible candidate file 133[1].
> 2018/06/08-00:04:50.892344 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892307) [db/compaction_picker_universal.cc:525] [default] 
> Universal: Skipping file 133[1] with size 4644 (compensated size 16124)
> 2018/06/08-00:04:50.892346 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892307) [db/compaction_picker_universal.cc:473] [default] 
> Universal: Possible candidate file 126[2].
> 2018/06/08-00:04:50.892348 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892308) [db/compaction_picker_universal.cc:525] [default] 
> Universal: Skipping file 126[2] with size 319764 (compensated size 319764)
> 2018/06/08-00:04:50.892349 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892309) [db/compaction_picker_universal.cc:473] [default] 
> Universal: Possible candidate level 4[3].
> 2018/06/08-00:04:50.892351 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892310) [db/compaction_picker_universal.cc:525] [default] 
> Universal: Skipping level 4[3] with size 2815574 (compensated size 2815574)
> 2018/06/08-00:04:50.892352 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892311) [db/compaction_picker_universal.cc:473] [default] 
> Universal: Possible candidate level 5[4].
> 2018/06/08-00:04:50.892357 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892311) [db/compaction_picker_universal.cc:525] [default] 
> Universal: Skipping level 5[4] with size 9870748 (compensated size 9870748)
> 2018/06/08-00:04:50.892358 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892313) [db/compaction_picker_universal.cc:473] [default] 
> Universal: Possible candidate level 6[5].
> {code}
> In customized RocksDBConfigSetter, we set 
> {code:java}
> level0_file_num_compaction_trigger=6 {code}
> During bulk loading, the following options are set: 
> [https://github.com/facebook/rocksdb/blob/master/options/options.cc] 
> {code:java}
> Options*
> Options::PrepareForBulkLoad()
> {
> // never slowdown ingest.
> level0_file_num_compaction_trigger 

[jira] [Commented] (KAFKA-6906) Kafka Streams does not commit transactions if data is produced via wall-clock punctuation

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508991#comment-16508991
 ] 

ASF GitHub Bot commented on KAFKA-6906:
---

mjsax opened a new pull request #5196: MINOR: code cleanup follow up for 
KAFKA-6906
URL: https://github.com/apache/kafka/pull/5196
 
 
   Intellij warned that the condition of the `if` will always be `false` -- 
thinking about this, it makes sense. After refactoring the code in KAFKA-6906 
and moving the EOS-part out of the `commitOffsetNeeded` block L433-440 cover 
this case already and we can remove redundant code.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Streams does not commit transactions if data is produced via wall-clock 
> punctuation
> -
>
> Key: KAFKA-6906
> URL: https://issues.apache.org/jira/browse/KAFKA-6906
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: Jagadesh Adireddi
>Priority: Major
> Fix For: 2.0.0, 1.1.1
>
>
> Committing in Kafka Streams happens in regular intervals. However, committing 
> only happens if new input records got processed since the last commit (via 
> setting flag `commitOffsetNeeded` within `StreamTask#process()`)
> However, data could also be emitted via wall-clock based punctuation calls. 
> Especially if EOS is enabled, this is an issue (maybe also for non-EOS) 
> because the current running transaction is not committed and thus might time 
> out leading to a fatal error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7021) Source KTable checkpoint is not correct

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508982#comment-16508982
 ] 

ASF GitHub Bot commented on KAFKA-7021:
---

guozhangwang opened a new pull request #5195: KAFKA-7021: Update upgrade guide 
section for reusing source topic
URL: https://github.com/apache/kafka/pull/5195
 
 
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Source KTable checkpoint is not correct
> ---
>
> Key: KAFKA-7021
> URL: https://issues.apache.org/jira/browse/KAFKA-7021
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 0.10.2.2, 2.0.0, 0.11.0.3, 1.0.2, 1.1.1
>
>
> Kafka Streams treats source KTables,ie, table created via `builder.table()`, 
> differently. Instead of creating a changelog topic, the original source topic 
> is use to avoid unnecessary data redundancy.
> However, Kafka Streams does not write a correct local state checkpoint file. 
> This results in unnecessary state restore after a rebalance. Instead of the 
> latest committed offset, the latest restored offset is written into the 
> checkpoint file in `ProcessorStateManager#close()`



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6906) Kafka Streams does not commit transactions if data is produced via wall-clock punctuation

2018-06-11 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6906.

   Resolution: Fixed
Fix Version/s: 1.1.1
   2.0.0

> Kafka Streams does not commit transactions if data is produced via wall-clock 
> punctuation
> -
>
> Key: KAFKA-6906
> URL: https://issues.apache.org/jira/browse/KAFKA-6906
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: Jagadesh Adireddi
>Priority: Major
> Fix For: 2.0.0, 1.1.1
>
>
> Committing in Kafka Streams happens in regular intervals. However, committing 
> only happens if new input records got processed since the last commit (via 
> setting flag `commitOffsetNeeded` within `StreamTask#process()`)
> However, data could also be emitted via wall-clock based punctuation calls. 
> Especially if EOS is enabled, this is an issue (maybe also for non-EOS) 
> because the current running transaction is not committed and thus might time 
> out leading to a fatal error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7021) Source KTable checkpoint is not correct

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508927#comment-16508927
 ] 

ASF GitHub Bot commented on KAFKA-7021:
---

guozhangwang closed pull request #5163: KAFKA-7021: Reuse source based on config
URL: https://github.com/apache/kafka/pull/5163
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index d6002ff016b..6a707ff986d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -599,7 +599,7 @@ public KafkaStreams(final Topology topology,
 @Deprecated
 public KafkaStreams(final Topology topology,
 final StreamsConfig config) {
-this(topology.internalTopologyBuilder, config, new 
DefaultKafkaClientSupplier());
+this(topology, config, new DefaultKafkaClientSupplier());
 }
 
 /**
@@ -635,6 +635,10 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 this.config = config;
 this.time = time;
 
+// adjust the topology if optimization is turned on.
+// TODO: to be removed post 2.0
+internalTopologyBuilder.adjust(config);
+
 // The application ID is a required config and hence should always 
have value
 processId = UUID.randomUUID();
 final String userClientId = 
config.getString(StreamsConfig.CLIENT_ID_CONFIG);
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 517104da323..ae6d44c449e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -302,11 +302,10 @@
 Objects.requireNonNull(materialized, "materialized can't be null");
 final MaterializedInternal> 
materializedInternal = new MaterializedInternal<>(materialized);
 materializedInternal.generateStoreNameIfNeeded(internalStreamsBuilder, 
topic + "-");
+final ConsumedInternal consumedInternal =
+new 
ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(), 
materializedInternal.valueSerde()));
 
-return internalStreamsBuilder.table(topic,
-new 
ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(),
-   
  materializedInternal.valueSerde())),
-materializedInternal);
+return internalStreamsBuilder.table(topic, consumedInternal, 
materializedInternal);
 }
 
 /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java 
b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index 22f6ea8362b..753185c2164 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -776,5 +776,4 @@ public synchronized Topology 
connectProcessorAndStateStores(final String process
 public synchronized TopologyDescription describe() {
 return internalTopologyBuilder.describe();
 }
-
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 0a19b4eb0c0..c7bf2fac8f7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -72,11 +72,7 @@ public InternalStreamsBuilder(final InternalTopologyBuilder 
internalTopologyBuil
 public  KTable table(final String topic,
  final ConsumedInternal consumed,
  final MaterializedInternal> materialized) {
-// explicitly disable logging for source table materialized stores
-materialized.withLoggingDisabled();
-
-final StoreBuilder> storeBuilder = new 
KeyValueStoreMaterializer<>(materialized)
-.materialize();
+final StoreBuilder> storeBuilder = new 
KeyValueStoreMaterializer<>(materialized).materialize();
 
 final String source = newProcessorName(KStreamImpl.SOURCE_NAME);
 final String name = newProcessorName(KTableImpl.SOURCE_NAME);
@@ -88,7 +84,7 @@ public InternalStreamsBuilder(final InternalTopologyBuilder 

[jira] [Commented] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions

2018-06-11 Thread Lucas Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508907#comment-16508907
 ] 

Lucas Wang commented on KAFKA-7040:
---

To be more specific, I think the following sequence of events may cause a 
truncation below HW.

Say currently both broker0 and broker1 have finished processing of a 
LeaderAndISR request with leader being broker1, and leader epoch 10, and both 
of them have 100 messages in their respective log (with the largest offset 99, 
and LEO of 100).

1. The replica fetcher thread on a broker0 issues a LeaderEpoch request with 
leader epoch 10 to broker1, and broker1 replies with a LEO of 100 given it's 
the latest leader epoch. Before the replica fetcher on broker0 acquires the 
AbstractFetcherThread.partitionMapLock and processes the LeaderEpoch response, 
it goes through steps 2-4 first.
2. A LeaderAndISR request causes broker0 to become the leader for one partition 
t1p0, which in turn will remove the partition t1p0 from the replica fetcher 
thread
3. Broker0 accepts one message at offset 100 from a producer, and the message 
gets replicated to broker1, causing the HW on broker0 to go up to 100.
4. A 2nd LeaderAndISR request causes broker1 to become the leader, and broker0 
to become the follower for partition t1p0. This will cause the partition t1p0 
to be added back to the replica fetcher thread on broker0.
5. The replica fetcher thread on broker0 processes the LeaderEpoch response 
received in step 1, and truncates the accepted message with offset 100 in step3.

> The replica fetcher thread may truncate accepted messages during multiple 
> fast leadership transitions
> -
>
> Key: KAFKA-7040
> URL: https://issues.apache.org/jira/browse/KAFKA-7040
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lucas Wang
>Priority: Minor
>
> Problem Statement:
> Consider the scenario where there are two brokers, broker0, and broker1, and 
> there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as 
> the leader and broker0 as the follower. The following sequence of events 
> happened on broker0
> 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to 
> broker1, and awaits to get the response
> 2. A LeaderAndISR request causes broker0 to become the leader for one 
> partition t1p0, which in turn will remove the partition t1p0 from the replica 
> fetcher thread
> 3. Broker0 accepts some messages from a producer
> 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and 
> broker0 to become the follower for partition t1p0. This will cause the 
> partition t1p0 to be added back to the replica fetcher thread on broker0.
> 5. The replica fetcher thread on broker0 receives a response for the 
> LeaderEpoch request issued in step 1, and truncates the accepted messages in 
> step3.
> The issue can be reproduced with the test from 
> https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea
> [1] Initially we set up broker0 to be the follower of two partitions instead 
> of just one, to avoid the shutting down of the replica fetcher thread when it 
> becomes idle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6977) Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 while fetching data

2018-06-11 Thread Eugen Feller (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508902#comment-16508902
 ] 

Eugen Feller commented on KAFKA-6977:
-

I think it was 0.11.0.1 consumer as I have used the kafka-console-consumer CLI 
(from Kafka 0.11.0.1) to do the test. The consumer was running on my local 
machine via VPN. I will give it a try on ECS. I think that topic is being 
consumed by at least one downstream service on ECS. 

>  Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 
> while fetching data
> -
>
> Key: KAFKA-6977
> URL: https://issues.apache.org/jira/browse/KAFKA-6977
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Eugen Feller
>Priority: Blocker
>  Labels: streams
>
> We are running Kafka Streams 0.11.0.1 with Kafka Broker 0.10.2.1 and 
> constantly run into the following exception: 
> {code:java}
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> partition assignment took 40 ms.
> current active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, 0_15, 
> 0_17, 0_20, 0_21, 0_23, 0_25, 0_28, 0_2, 0_6, 0_7, 0_8, 0_9, 0_11, 0_16, 
> 0_18, 0_19, 0_22, 0_24, 0_26, 0_27, 0_29, 0_30, 0_31]
> current standby tasks: []
> previous active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, 
> 0_15, 0_17, 0_20, 0_21, 0_23, 0_25, 0_28]
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> State transition from PARTITIONS_ASSIGNED to RUNNING.
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> INFO org.apache.kafka.streams.KafkaStreams - stream-client 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e] State 
> transition from REBALANCING to RUNNING.
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> ERROR org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> Encountered the following error during processing:
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:886)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1086)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490)
> java.lang.IllegalStateException: Unexpected error code 2 while fetching data
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> Shutting down
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> State transition from RUNNING to PENDING_SHUTDOWN.
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka 
> producer with timeoutMillis = 9223372036854775807 ms.
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> Stream thread shutdown complete
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> State transition from PENDING_SHUTDOWN to DEAD.
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.KafkaStreams - stream-client 
> 

[jira] [Commented] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions

2018-06-11 Thread Lucas Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508867#comment-16508867
 ] 

Lucas Wang commented on KAFKA-7040:
---

[~lindong] Thanks for pointing out the truncation may happen below high 
watermark.

Also your suggested change seems to work by remembering that some partition 
state has been used, so that we can distinguish
between different generations of the partition state.

> The replica fetcher thread may truncate accepted messages during multiple 
> fast leadership transitions
> -
>
> Key: KAFKA-7040
> URL: https://issues.apache.org/jira/browse/KAFKA-7040
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lucas Wang
>Priority: Minor
>
> Problem Statement:
> Consider the scenario where there are two brokers, broker0, and broker1, and 
> there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as 
> the leader and broker0 as the follower. The following sequence of events 
> happened on broker0
> 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to 
> broker1, and awaits to get the response
> 2. A LeaderAndISR request causes broker0 to become the leader for one 
> partition t1p0, which in turn will remove the partition t1p0 from the replica 
> fetcher thread
> 3. Broker0 accepts some messages from a producer
> 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and 
> broker0 to become the follower for partition t1p0. This will cause the 
> partition t1p0 to be added back to the replica fetcher thread on broker0.
> 5. The replica fetcher thread on broker0 receives a response for the 
> LeaderEpoch request issued in step 1, and truncates the accepted messages in 
> step3.
> The issue can be reproduced with the test from 
> https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea
> [1] Initially we set up broker0 to be the follower of two partitions instead 
> of just one, to avoid the shutting down of the replica fetcher thread when it 
> becomes idle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7003) Add headers with error context in messages written to the Connect DeadLetterQueue topic

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508849#comment-16508849
 ] 

ASF GitHub Bot commented on KAFKA-7003:
---

ewencp closed pull request #5159: KAFKA-7003: Set error context in message 
headers (KIP-298)
URL: https://github.com/apache/kafka/pull/5159
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
index 6e9bd6b9e71..d9d140b9cdc 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
@@ -57,11 +57,19 @@
 public static final short DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT = 3;
 private static final String DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY = 
"Dead Letter Queue Topic Replication Factor";
 
+public static final String DLQ_CONTEXT_HEADERS_ENABLE_CONFIG = DLQ_PREFIX 
+ "context.headers.enable";
+public static final boolean DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT = false;
+public static final String DLQ_CONTEXT_HEADERS_ENABLE_DOC = "If true, add 
headers containing error context to the messages " +
+"written to the dead letter queue. To avoid clashing with headers 
from the original record, all error context header " +
+"keys, all error context header keys will start with 
__connect.errors.";
+private static final String DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY = "Enable 
Error Context Headers";
+
 static ConfigDef config = ConnectorConfig.configDef()
 .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, 
ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, 
TOPICS_DISPLAY)
 .define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, 
TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, 
TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY)
 .define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, 
DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, 
ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY)
-.define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, 
DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, 
DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, 
ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY);
+.define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, 
DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, 
DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, 
ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY)
+.define(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, 
DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT, Importance.MEDIUM, 
DLQ_CONTEXT_HEADERS_ENABLE_DOC, ERROR_GROUP, 8, ConfigDef.Width.MEDIUM, 
DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY);
 
 public static ConfigDef configDef() {
 return config;
@@ -107,4 +115,8 @@ public String dlqTopicName() {
 public short dlqTopicReplicationFactor() {
 return getShort(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG);
 }
+
+public boolean isDlqContextHeadersEnabled() {
+return getBoolean(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG);
+}
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 97e68faa4ca..c794eb8c807 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -530,7 +530,7 @@ ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId 
id) {
 // check if topic for dead letter queue exists
 String topic = connConfig.dlqTopicName();
 if (topic != null && !topic.isEmpty()) {
-DeadLetterQueueReporter reporter = 
DeadLetterQueueReporter.createAndSetup(config, connConfig, producerProps);
+DeadLetterQueueReporter reporter = 
DeadLetterQueueReporter.createAndSetup(config, id, connConfig, producerProps);
 reporters.add(reporter);
 }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
index 459eeae1ff4..d36ec22ec88 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
+++ 

[jira] [Commented] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions

2018-06-11 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508847#comment-16508847
 ] 

Dong Lin commented on KAFKA-7040:
-

[~luwang] Nice catch. It is indeed problematic (with some probability of losing 
data) that a replica fetcher thread can truncate to an offset smaller than the 
high watermark due to receiving an "outdated" OffsetsForLeaderEpochResponse.

One possible solution is to add one more field, say `initialized`, in 
AbstractFetcherThread.partitionStates. The value is set to false after the 
partition is added to partitionStates. ReplicaFetcherThread will update 
`initialized` to true the first time it reads this partition. Later, after 
ReplicaFetcherThread receives either FetchResponse or 
OffsetsForLeaderEpochResponse, it should discard this partitions in the 
response whose `initialized` is false. This seems to fix the issue here.

 

 

> The replica fetcher thread may truncate accepted messages during multiple 
> fast leadership transitions
> -
>
> Key: KAFKA-7040
> URL: https://issues.apache.org/jira/browse/KAFKA-7040
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lucas Wang
>Priority: Minor
>
> Problem Statement:
> Consider the scenario where there are two brokers, broker0, and broker1, and 
> there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as 
> the leader and broker0 as the follower. The following sequence of events 
> happened on broker0
> 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to 
> broker1, and awaits to get the response
> 2. A LeaderAndISR request causes broker0 to become the leader for one 
> partition t1p0, which in turn will remove the partition t1p0 from the replica 
> fetcher thread
> 3. Broker0 accepts some messages from a producer
> 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and 
> broker0 to become the follower for partition t1p0. This will cause the 
> partition t1p0 to be added back to the replica fetcher thread on broker0.
> 5. The replica fetcher thread on broker0 receives a response for the 
> LeaderEpoch request issued in step 1, and truncates the accepted messages in 
> step3.
> The issue can be reproduced with the test from 
> https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea
> [1] Initially we set up broker0 to be the follower of two partitions instead 
> of just one, to avoid the shutting down of the replica fetcher thread when it 
> becomes idle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7003) Add headers with error context in messages written to the Connect DeadLetterQueue topic

2018-06-11 Thread Ewen Cheslack-Postava (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava resolved KAFKA-7003.
--
   Resolution: Fixed
Fix Version/s: 2.1.0
   2.0.0

Issue resolved by pull request 5159
[https://github.com/apache/kafka/pull/5159]

> Add headers with error context in messages written to the Connect 
> DeadLetterQueue topic
> ---
>
> Key: KAFKA-7003
> URL: https://issues.apache.org/jira/browse/KAFKA-7003
> Project: Kafka
>  Issue Type: Task
>Reporter: Arjun Satish
>Priority: Major
> Fix For: 2.0.0, 2.1.0
>
>
> This was added to the KIP after the feature freeze. 
> If the property {{errors.deadletterqueue.}}{{context.headers.enable}} is set 
> to {{*true*}}, the following headers will be added to the produced raw 
> message (only if they don't already exist in the message). All values will be 
> serialized as UTF-8 strings.
> ||Header Name||Description||
> |__connect.errors.topic|Name of the topic that contained the message.|
> |__connect.errors.task.id|The numeric ID of the task that encountered the 
> error (encoded as a UTF-8 string).|
> |__connect.errors.stage|The name of the stage where the error occurred.|
> |__connect.errors.partition|The numeric ID of the partition in the original 
> topic that contained the message (encoded as a UTF-8 string).|
> |__connect.errors.offset|The numeric value of the message offset in the 
> original topic (encoded as a UTF-8 string).|
> |__connect.errors.exception.stacktrace|The stacktrace of the exception.|
> |__connect.errors.exception.message|The message in the exception.|
> |__connect.errors.exception.class.name|The fully qualified classname of the 
> exception that was thrown during the execution.|
> |__connect.errors.connector.name|The name of the connector which encountered 
> the error.|
> |__connect.errors.class.name|The fully qualified name of the class that 
> caused the error.|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7042) Fall back to the old behavior when the broker is too old to recognize LIST_OFFSET versions

2018-06-11 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508845#comment-16508845
 ] 

Ismael Juma commented on KAFKA-7042:


Maybe they should not turn it on for older brokers?

> Fall back to the old behavior when the broker is too old to recognize 
> LIST_OFFSET versions
> --
>
> Key: KAFKA-7042
> URL: https://issues.apache.org/jira/browse/KAFKA-7042
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Guozhang Wang
>Priority: Major
>
> When READ_COMMITTED is turned on (since 0.11.0), LIST_OFFSET requires 
> min_version to be 2 on the consumer client side. On the other hand, if broker 
> is no newer than 0.10.2 it can only recognize the version of LIST_OFFSET up 
> to 1. In this case the consumer talking to such an old broker will throw an 
> exception right away.
> What we can improve though, is that when the consumer realized broker does 
> not recognize LIST_OFFSET of at least 2, it can fall back to the old behavior 
> of READ_UNCOMMITTED since the data on that broker should not have any txn 
> markers anyways. By doing this we would lift the hard restriction that 
> consumers with READ_COMMITTED cannot work with an older version of broker 
> (remember we are trying to achieve broker compatibility since 0.10.0).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6990) CommitFailedException; this task may be no longer owned by the thread

2018-06-11 Thread Eugen Feller (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508843#comment-16508843
 ] 

Eugen Feller commented on KAFKA-6990:
-

Thanks a lot. For sake of completeness. Logs sent via Slack. :)

> CommitFailedException; this task may be no longer owned by the thread
> -
>
> Key: KAFKA-6990
> URL: https://issues.apache.org/jira/browse/KAFKA-6990
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Eugen Feller
>Priority: Blocker
>
> We are seeing a lot of CommitFailedExceptions on one of our Kafka stream 
> apps. Running Kafka Streams 0.11.0.1 and Kafka Broker 0.10.2.1. Full error 
> message:
> {code:java}
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] INFO 
> org.apache.kafka.streams.KafkaStreams - stream-client 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d] State transition from 
> REBALANCING to RUNNING.
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_0] Failed 
> offset commits 
> {sightings_sighting_byclientmac_0-0=OffsetAndMetadata{offset=11, 
> metadata=''}, 
> mactocontactmappings_contactmapping_byclientmac_0-0=OffsetAndMetadata{offset=26,
>  metadata=''}} due to CommitFailedException
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_0 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_1 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_1] Failed 
> offset commits 
> {mactocontactmappings_contactmapping_byclientmac_0-1=OffsetAndMetadata{offset=38,
>  metadata=''}, sightings_sighting_byclientmac_0-1=OffsetAndMetadata{offset=1, 
> metadata=''}} due to CommitFailedException
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_2] Failed 
> offset commits 
> {mactocontactmappings_contactmapping_byclientmac_0-2=OffsetAndMetadata{offset=8,
>  metadata=''}, 
> sightings_sighting_byclientmac_0-2=OffsetAndMetadata{offset=24, metadata=''}} 
> due to CommitFailedException
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_2 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_3] Failed 
> offset commits 
> {sightings_sighting_byclientmac_0-3=OffsetAndMetadata{offset=21, 
> metadata=''}, 
> mactocontactmappings_contactmapping_byclientmac_0-3=OffsetAndMetadata{offset=102,
>  metadata=''}} due to CommitFailedException
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_3 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_4] Failed 
> offset commits 
> {sightings_sighting_byclientmac_0-4=OffsetAndMetadata{offset=5, metadata=''}, 
> mactocontactmappings_contactmapping_byclientmac_0-4=OffsetAndMetadata{offset=20,
>  metadata=''}} due to CommitFailedException
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_4 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_5] Failed 
> offset commits 
> 

[jira] [Commented] (KAFKA-5570) Join request's timeout should be slightly higher than the rebalance timeout

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508828#comment-16508828
 ] 

ASF GitHub Bot commented on KAFKA-5570:
---

ijuma closed pull request #3503: KAFKA-5570: Join request's timeout should be 
slightly higher than the rebalance timeout
URL: https://github.com/apache/kafka/pull/3503
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
index 9646f4a..3d6c2819ae9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
@@ -30,6 +30,7 @@
 private final int correlationId;
 private final String clientId;
 private final long createdTimeMs;
+private final int timeoutMs;
 private final boolean expectResponse;
 private final RequestCompletionHandler callback;
 
@@ -39,6 +40,7 @@
  * @param correlationId The correlation id for this client request
  * @param clientId The client ID to use for the header
  * @param createdTimeMs The unix timestamp in milliseconds for the time at 
which this request was created.
+*  @param timeoutMs The request timeout in milliseconds.
  * @param expectResponse Should we expect a response message or is this 
request complete once it is sent?
  * @param callback A callback to execute when the response has been 
received (or null if no callback is necessary)
  */
@@ -47,6 +49,7 @@ public ClientRequest(String destination,
  int correlationId,
  String clientId,
  long createdTimeMs,
+ int timeoutMs,
  boolean expectResponse,
  RequestCompletionHandler callback) {
 this.destination = destination;
@@ -54,6 +57,7 @@ public ClientRequest(String destination,
 this.correlationId = correlationId;
 this.clientId = clientId;
 this.createdTimeMs = createdTimeMs;
+this.timeoutMs = timeoutMs;
 this.expectResponse = expectResponse;
 this.callback = callback;
 }
@@ -66,6 +70,7 @@ public String toString() {
 ", correlationId=" + correlationId +
 ", clientId=" + clientId +
 ", createdTimeMs=" + createdTimeMs +
+", timeoutMs=" + timeoutMs +
 ", requestBuilder=" + requestBuilder +
 ")";
 }
@@ -98,6 +103,10 @@ public long createdTimeMs() {
 return createdTimeMs;
 }
 
+public int timeoutMs() {
+return timeoutMs;
+}
+
 public int correlationId() {
 return correlationId;
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
index f9773297dbb..d58827fcd0f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
+++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
@@ -17,10 +17,10 @@
 package org.apache.kafka.clients;
 
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -31,6 +31,7 @@
 
 private final int maxInFlightRequestsPerConnection;
 private final Map> requests = 
new HashMap<>();
+private Integer minTimeoutMs;
 
 public InFlightRequests(int maxInFlightRequestsPerConnection) {
 this.maxInFlightRequestsPerConnection = 
maxInFlightRequestsPerConnection;
@@ -46,6 +47,8 @@ public void add(NetworkClient.InFlightRequest request) {
 reqs = new ArrayDeque<>();
 this.requests.put(destination, reqs);
 }
+if (minTimeoutMs != null)
+minTimeoutMs = Math.min(request.timeoutMs, minTimeoutMs);
 reqs.addFirst(request);
 }
 
@@ -60,10 +63,12 @@ public void add(NetworkClient.InFlightRequest request) {
 }
 
 /**
- * Get the oldest request (the one that that will be completed next) for 
the given node
+ * Complete the oldest request (the one that that will be completed next) 
for the given node
  */
 public NetworkClient.InFlightRequest completeNext(String node) {
-return requestQueue(node).pollLast();
+NetworkClient.InFlightRequest request = requestQueue(node).pollLast();
+minTimeoutMs = null;
+return request;
 }
 
 /**
@@ -80,7 +85,9 @@ public void add(NetworkClient.InFlightRequest request) {
  * 

[jira] [Commented] (KAFKA-7042) Fall back to the old behavior when the broker is too old to recognize LIST_OFFSET versions

2018-06-11 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508815#comment-16508815
 ] 

Guozhang Wang commented on KAFKA-7042:
--

System tests.

> Fall back to the old behavior when the broker is too old to recognize 
> LIST_OFFSET versions
> --
>
> Key: KAFKA-7042
> URL: https://issues.apache.org/jira/browse/KAFKA-7042
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Guozhang Wang
>Priority: Major
>
> When READ_COMMITTED is turned on (since 0.11.0), LIST_OFFSET requires 
> min_version to be 2 on the consumer client side. On the other hand, if broker 
> is no newer than 0.10.2 it can only recognize the version of LIST_OFFSET up 
> to 1. In this case the consumer talking to such an old broker will throw an 
> exception right away.
> What we can improve though, is that when the consumer realized broker does 
> not recognize LIST_OFFSET of at least 2, it can fall back to the old behavior 
> of READ_UNCOMMITTED since the data on that broker should not have any txn 
> markers anyways. By doing this we would lift the hard restriction that 
> consumers with READ_COMMITTED cannot work with an older version of broker 
> (remember we are trying to achieve broker compatibility since 0.10.0).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6906) Kafka Streams does not commit transactions if data is produced via wall-clock punctuation

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508805#comment-16508805
 ] 

ASF GitHub Bot commented on KAFKA-6906:
---

mjsax closed pull request #5105: KAFKA-6906: Fixed to commit transactions if 
data is produced via wall clock punctuation
URL: https://github.com/apache/kafka/pull/5105
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index e2be3e29172..4cea5280f86 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -424,17 +424,22 @@ private void commitOffsets(final boolean 
startNewTransaction) {
 
 if (eosEnabled) {
 
producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, applicationId);
-producer.commitTransaction();
-transactionInFlight = false;
-if (startNewTransaction) {
-producer.beginTransaction();
-transactionInFlight = true;
-}
 } else {
 consumer.commitSync(consumedOffsetsAndMetadata);
 }
 commitOffsetNeeded = false;
-} else if (eosEnabled && !startNewTransaction && 
transactionInFlight) { // need to make sure to commit txn for suspend case
+}
+
+if (eosEnabled) {
+producer.commitTransaction();
+transactionInFlight = false;
+if (startNewTransaction) {
+producer.beginTransaction();
+transactionInFlight = true;
+}
+}
+
+if (eosEnabled && !startNewTransaction && transactionInFlight) { 
// need to make sure to commit txn for suspend case
 producer.commitTransaction();
 transactionInFlight = false;
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 5537335b221..bfbb2a00270 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -35,6 +35,7 @@
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
@@ -913,6 +914,7 @@ public void 
shouldNotAbortTransactionOnDirtyClosedIfEosDisabled() {
 @Test
 public void shouldCloseProducerOnCloseWhenEosEnabled() {
 task = createStatelessTask(createConfig(true));
+task.initializeTopology();
 task.close(true, false);
 task = null;
 
@@ -1028,6 +1030,39 @@ public void 
shouldReturnOffsetsForRepartitionTopicsForPurging() {
 assertThat(map, equalTo(Collections.singletonMap(repartition, 11L)));
 }
 
+@Test
+public void 
shouldThrowOnCleanCloseTaskWhenEosEnabledIfTransactionInFlight() {
+task = createStatelessTask(createConfig(true));
+try {
+task.close(true, false);
+fail("should have throw IllegalStateException");
+} catch (final IllegalStateException expected) {
+// pass
+}
+task = null;
+
+assertTrue(producer.closed());
+}
+
+@Test
+public void shouldAlwaysCommitIfEosEnabled() {
+final RecordCollectorImpl recordCollector =  new 
RecordCollectorImpl(producer, "StreamTask",
+new LogContext("StreamTaskTest "), new 
DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
+
+task = createStatelessTask(createConfig(true));
+task.initializeStateStores();
+task.initializeTopology();
+task.punctuate(processorSystemTime, 5, 
PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
+@Override
+public void punctuate(final long timestamp) {
+recordCollector.send("result-topic1", 3, 5, null, 0, 
time.milliseconds(),
+new IntegerSerializer(),  new IntegerSerializer());
+}

[jira] [Created] (KAFKA-7043) Connect isolation whitelist does not include new primitive converters (KIP-305)

2018-06-11 Thread Randall Hauch (JIRA)
Randall Hauch created KAFKA-7043:


 Summary: Connect isolation whitelist does not include new 
primitive converters (KIP-305)
 Key: KAFKA-7043
 URL: https://issues.apache.org/jira/browse/KAFKA-7043
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.0.0
Reporter: Randall Hauch
Assignee: Randall Hauch
 Fix For: 2.0.0


KIP-305 added several new primitive converters, but the PR did not add them to 
the whitelist for the plugin isolation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions

2018-06-11 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508771#comment-16508771
 ] 

Ismael Juma commented on KAFKA-7040:


cc [~apovzner]

> The replica fetcher thread may truncate accepted messages during multiple 
> fast leadership transitions
> -
>
> Key: KAFKA-7040
> URL: https://issues.apache.org/jira/browse/KAFKA-7040
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lucas Wang
>Priority: Minor
>
> Problem Statement:
> Consider the scenario where there are two brokers, broker0, and broker1, and 
> there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as 
> the leader and broker0 as the follower. The following sequence of events 
> happened on broker0
> 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to 
> broker1, and awaits to get the response
> 2. A LeaderAndISR request causes broker0 to become the leader for one 
> partition t1p0, which in turn will remove the partition t1p0 from the replica 
> fetcher thread
> 3. Broker0 accepts some messages from a producer
> 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and 
> broker0 to become the follower for partition t1p0. This will cause the 
> partition t1p0 to be added back to the replica fetcher thread on broker0.
> 5. The replica fetcher thread on broker0 receives a response for the 
> LeaderEpoch request issued in step 1, and truncates the accepted messages in 
> step3.
> The issue can be reproduced with the test from 
> https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea
> [1] Initially we set up broker0 to be the follower of two partitions instead 
> of just one, to avoid the shutting down of the replica fetcher thread when it 
> becomes idle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7042) Fall back to the old behavior when the broker is too old to recognize LIST_OFFSET versions

2018-06-11 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508768#comment-16508768
 ] 

Ismael Juma commented on KAFKA-7042:


Who is turning on READ_COMMITTED in this case?

> Fall back to the old behavior when the broker is too old to recognize 
> LIST_OFFSET versions
> --
>
> Key: KAFKA-7042
> URL: https://issues.apache.org/jira/browse/KAFKA-7042
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Guozhang Wang
>Priority: Major
>
> When READ_COMMITTED is turned on (since 0.11.0), LIST_OFFSET requires 
> min_version to be 2 on the consumer client side. On the other hand, if broker 
> is no newer than 0.10.2 it can only recognize the version of LIST_OFFSET up 
> to 1. In this case the consumer talking to such an old broker will throw an 
> exception right away.
> What we can improve though, is that when the consumer realized broker does 
> not recognize LIST_OFFSET of at least 2, it can fall back to the old behavior 
> of READ_UNCOMMITTED since the data on that broker should not have any txn 
> markers anyways. By doing this we would lift the hard restriction that 
> consumers with READ_COMMITTED cannot work with an older version of broker 
> (remember we are trying to achieve broker compatibility since 0.10.0).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions

2018-06-11 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508763#comment-16508763
 ] 

Dong Lin edited comment on KAFKA-7040 at 6/11/18 9:15 PM:
--

[~luwang] Say message m0 is truncated in step 5. Is that message acked by 
broker1 before broker0 accepts m0 in step 3? If no, it seems that the message 
is produced with ack=1 and it is within Kafka's contract to truncate it. This 
is a known behavior when we have leadership change in Kafka.

If m0 has been acked by broker1 in step 3, then broker0 should still be able to 
fetch it again from broker1 after step 5, right?


was (Author: lindong):
[~luwang] Say message m0 is truncated in step 5. Is that message acked by 
broker1 in before Broker0 accepts m0 in step 3? If no, it seems that the 
message is produced with ack=1 and it is within Kafka's contract to truncate 
it. This is a known behavior when we have leadership change in Kafka.

If m0 has been acked by broker1 in step 3, then broker0 should still be able to 
fetch it again from broker1 after step 5, right?

> The replica fetcher thread may truncate accepted messages during multiple 
> fast leadership transitions
> -
>
> Key: KAFKA-7040
> URL: https://issues.apache.org/jira/browse/KAFKA-7040
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lucas Wang
>Priority: Minor
>
> Problem Statement:
> Consider the scenario where there are two brokers, broker0, and broker1, and 
> there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as 
> the leader and broker0 as the follower. The following sequence of events 
> happened on broker0
> 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to 
> broker1, and awaits to get the response
> 2. A LeaderAndISR request causes broker0 to become the leader for one 
> partition t1p0, which in turn will remove the partition t1p0 from the replica 
> fetcher thread
> 3. Broker0 accepts some messages from a producer
> 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and 
> broker0 to become the follower for partition t1p0. This will cause the 
> partition t1p0 to be added back to the replica fetcher thread on broker0.
> 5. The replica fetcher thread on broker0 receives a response for the 
> LeaderEpoch request issued in step 1, and truncates the accepted messages in 
> step3.
> The issue can be reproduced with the test from 
> https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea
> [1] Initially we set up broker0 to be the follower of two partitions instead 
> of just one, to avoid the shutting down of the replica fetcher thread when it 
> becomes idle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions

2018-06-11 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508763#comment-16508763
 ] 

Dong Lin commented on KAFKA-7040:
-

[~luwang] Say message m0 is truncated in step 5. Is that message acked by 
broker1 in before Broker0 accepts m0 in step 3? If no, it seems that the 
message is produced with ack=1 and it is within Kafka's contract to truncate 
it. This is a known behavior when we have leadership change in Kafka.

 

If m0 has been acked by broker1 in step 3, then broker0 should still be able to 
fetch it again after broker1 after step 5, right?

> The replica fetcher thread may truncate accepted messages during multiple 
> fast leadership transitions
> -
>
> Key: KAFKA-7040
> URL: https://issues.apache.org/jira/browse/KAFKA-7040
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lucas Wang
>Priority: Minor
>
> Problem Statement:
> Consider the scenario where there are two brokers, broker0, and broker1, and 
> there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as 
> the leader and broker0 as the follower. The following sequence of events 
> happened on broker0
> 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to 
> broker1, and awaits to get the response
> 2. A LeaderAndISR request causes broker0 to become the leader for one 
> partition t1p0, which in turn will remove the partition t1p0 from the replica 
> fetcher thread
> 3. Broker0 accepts some messages from a producer
> 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and 
> broker0 to become the follower for partition t1p0. This will cause the 
> partition t1p0 to be added back to the replica fetcher thread on broker0.
> 5. The replica fetcher thread on broker0 receives a response for the 
> LeaderEpoch request issued in step 1, and truncates the accepted messages in 
> step3.
> The issue can be reproduced with the test from 
> https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea
> [1] Initially we set up broker0 to be the follower of two partitions instead 
> of just one, to avoid the shutting down of the replica fetcher thread when it 
> becomes idle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions

2018-06-11 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508763#comment-16508763
 ] 

Dong Lin edited comment on KAFKA-7040 at 6/11/18 9:14 PM:
--

[~luwang] Say message m0 is truncated in step 5. Is that message acked by 
broker1 in before Broker0 accepts m0 in step 3? If no, it seems that the 
message is produced with ack=1 and it is within Kafka's contract to truncate 
it. This is a known behavior when we have leadership change in Kafka.

If m0 has been acked by broker1 in step 3, then broker0 should still be able to 
fetch it again after broker1 after step 5, right?


was (Author: lindong):
[~luwang] Say message m0 is truncated in step 5. Is that message acked by 
broker1 in before Broker0 accepts m0 in step 3? If no, it seems that the 
message is produced with ack=1 and it is within Kafka's contract to truncate 
it. This is a known behavior when we have leadership change in Kafka.

 

If m0 has been acked by broker1 in step 3, then broker0 should still be able to 
fetch it again after broker1 after step 5, right?

> The replica fetcher thread may truncate accepted messages during multiple 
> fast leadership transitions
> -
>
> Key: KAFKA-7040
> URL: https://issues.apache.org/jira/browse/KAFKA-7040
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lucas Wang
>Priority: Minor
>
> Problem Statement:
> Consider the scenario where there are two brokers, broker0, and broker1, and 
> there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as 
> the leader and broker0 as the follower. The following sequence of events 
> happened on broker0
> 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to 
> broker1, and awaits to get the response
> 2. A LeaderAndISR request causes broker0 to become the leader for one 
> partition t1p0, which in turn will remove the partition t1p0 from the replica 
> fetcher thread
> 3. Broker0 accepts some messages from a producer
> 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and 
> broker0 to become the follower for partition t1p0. This will cause the 
> partition t1p0 to be added back to the replica fetcher thread on broker0.
> 5. The replica fetcher thread on broker0 receives a response for the 
> LeaderEpoch request issued in step 1, and truncates the accepted messages in 
> step3.
> The issue can be reproduced with the test from 
> https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea
> [1] Initially we set up broker0 to be the follower of two partitions instead 
> of just one, to avoid the shutting down of the replica fetcher thread when it 
> becomes idle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions

2018-06-11 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508763#comment-16508763
 ] 

Dong Lin edited comment on KAFKA-7040 at 6/11/18 9:14 PM:
--

[~luwang] Say message m0 is truncated in step 5. Is that message acked by 
broker1 in before Broker0 accepts m0 in step 3? If no, it seems that the 
message is produced with ack=1 and it is within Kafka's contract to truncate 
it. This is a known behavior when we have leadership change in Kafka.

If m0 has been acked by broker1 in step 3, then broker0 should still be able to 
fetch it again from broker1 after step 5, right?


was (Author: lindong):
[~luwang] Say message m0 is truncated in step 5. Is that message acked by 
broker1 in before Broker0 accepts m0 in step 3? If no, it seems that the 
message is produced with ack=1 and it is within Kafka's contract to truncate 
it. This is a known behavior when we have leadership change in Kafka.

If m0 has been acked by broker1 in step 3, then broker0 should still be able to 
fetch it again after broker1 after step 5, right?

> The replica fetcher thread may truncate accepted messages during multiple 
> fast leadership transitions
> -
>
> Key: KAFKA-7040
> URL: https://issues.apache.org/jira/browse/KAFKA-7040
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lucas Wang
>Priority: Minor
>
> Problem Statement:
> Consider the scenario where there are two brokers, broker0, and broker1, and 
> there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as 
> the leader and broker0 as the follower. The following sequence of events 
> happened on broker0
> 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to 
> broker1, and awaits to get the response
> 2. A LeaderAndISR request causes broker0 to become the leader for one 
> partition t1p0, which in turn will remove the partition t1p0 from the replica 
> fetcher thread
> 3. Broker0 accepts some messages from a producer
> 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and 
> broker0 to become the follower for partition t1p0. This will cause the 
> partition t1p0 to be added back to the replica fetcher thread on broker0.
> 5. The replica fetcher thread on broker0 receives a response for the 
> LeaderEpoch request issued in step 1, and truncates the accepted messages in 
> step3.
> The issue can be reproduced with the test from 
> https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea
> [1] Initially we set up broker0 to be the follower of two partitions instead 
> of just one, to avoid the shutting down of the replica fetcher thread when it 
> becomes idle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7042) Fall back to the old behavior when the broker is too old to recognize LIST_OFFSET versions

2018-06-11 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508761#comment-16508761
 ] 

Guozhang Wang commented on KAFKA-7042:
--

cc [~hachikuji]

> Fall back to the old behavior when the broker is too old to recognize 
> LIST_OFFSET versions
> --
>
> Key: KAFKA-7042
> URL: https://issues.apache.org/jira/browse/KAFKA-7042
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Guozhang Wang
>Priority: Major
>
> When READ_COMMITTED is turned on (since 0.11.0), LIST_OFFSET requires 
> min_version to be 2 on the consumer client side. On the other hand, if broker 
> is no newer than 0.10.2 it can only recognize the version of LIST_OFFSET up 
> to 1. In this case the consumer talking to such an old broker will throw an 
> exception right away.
> What we can improve though, is that when the consumer realized broker does 
> not recognize LIST_OFFSET of at least 2, it can fall back to the old behavior 
> of READ_UNCOMMITTED since the data on that broker should not have any txn 
> markers anyways. By doing this we would lift the hard restriction that 
> consumers with READ_COMMITTED cannot work with an older version of broker 
> (remember we are trying to achieve broker compatibility since 0.10.0).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7042) Fall back to the old behavior when the broker is too old to recognize LIST_OFFSET versions

2018-06-11 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-7042:


 Summary: Fall back to the old behavior when the broker is too old 
to recognize LIST_OFFSET versions
 Key: KAFKA-7042
 URL: https://issues.apache.org/jira/browse/KAFKA-7042
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Guozhang Wang


When READ_COMMITTED is turned on (since 0.11.0), LIST_OFFSET requires 
min_version to be 2 on the consumer client side. On the other hand, if broker 
is no newer than 0.10.2 it can only recognize the version of LIST_OFFSET up to 
1. In this case the consumer talking to such an old broker will throw an 
exception right away.

What we can improve though, is that when the consumer realized broker does not 
recognize LIST_OFFSET of at least 2, it can fall back to the old behavior of 
READ_UNCOMMITTED since the data on that broker should not have any txn markers 
anyways. By doing this we would lift the hard restriction that consumers with 
READ_COMMITTED cannot work with an older version of broker (remember we are 
trying to achieve broker compatibility since 0.10.0).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7024) Rocksdb state directory should be created before opening the DB

2018-06-11 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-7024:
-
Labels: user-experience  (was: )

> Rocksdb state directory should be created before opening the DB
> ---
>
> Key: KAFKA-7024
> URL: https://issues.apache.org/jira/browse/KAFKA-7024
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Abhishek Agarwal
>Priority: Minor
>  Labels: user-experience
>
> After enabling rocksDB logging, We continually see these errors in kafka 
> stream logs, everytime a new window segment is created
> ```
> Error when reading  
> ```
> While its not a problem in itself, since rocksDB internally will create the 
> directory but It will do so only after logging the above error. It would 
> avoid unnecessary logging if the segment directory can be created in advance. 
> Right now, only the parent directories are created for a rocksDB segment. 
> Logging is more prominent when there are many partitions and segment size is 
> smaller (minute or two). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7041) Using RocksDB bulk loading for StandbyTasks

2018-06-11 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-7041:
--

 Summary: Using RocksDB bulk loading for StandbyTasks
 Key: KAFKA-7041
 URL: https://issues.apache.org/jira/browse/KAFKA-7041
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


In KAFKA-5363 we introduced RocksDB bulk loading to speed up store recovery. We 
could do the same optimization for StandbyTasks to make them more efficient and 
to reduce the likelihood that StandbyTasks lag behind.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7037) delete topic command replaces '+' from the topic name which leads incorrect topic deletion

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508735#comment-16508735
 ] 

ASF GitHub Bot commented on KAFKA-7037:
---

vahidhashemian opened a new pull request #5193: KAFKA-7037: Escape regex symbol 
in topic name for topic operations
URL: https://github.com/apache/kafka/pull/5193
 
 
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> delete topic command replaces '+' from the topic name which leads incorrect 
> topic deletion
> --
>
> Key: KAFKA-7037
> URL: https://issues.apache.org/jira/browse/KAFKA-7037
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0, 1.0.0
>Reporter: Sandeep Nemuri
>Assignee: Vahid Hashemian
>Priority: Major
>
> While executing a delete command kafka cli tool is removing the "+" symbol 
> and deleting the incorrect topic. In below case if  _"*test+topic"*_ is 
> deleted kafka deletes  _*testtopic.*_
> {code:java}
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --create --zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 
> --topic testtopic
> Created topic "testtopic".
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --zookeeper `hostname`:2181 --topic test+topic --delete
> Topic testtopic is marked for deletion.{code}
>  delete topic replaces '+' from the topic name  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7023) Kafka Streams RocksDB bulk loading config may not be honored with customized RocksDBConfigSetter

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508694#comment-16508694
 ] 

ASF GitHub Bot commented on KAFKA-7023:
---

guozhangwang closed pull request #5166: KAFKA-7023: Move prepareForBulkLoad() 
call after customized RocksDBConfigSetter
URL: https://github.com/apache/kafka/pull/5166
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index cfef035a4fd..6084ecbf1e0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -130,10 +130,6 @@ public void openDB(final ProcessorContext context) {
 // (this could be a bug in the RocksDB code and their devs have been 
contacted).
 
options.setIncreaseParallelism(Math.max(Runtime.getRuntime().availableProcessors(),
 2));
 
-if (prepareForBulkload) {
-options.prepareForBulkLoad();
-}
-
 wOptions = new WriteOptions();
 wOptions.setDisableWAL(true);
 
@@ -148,6 +144,11 @@ public void openDB(final ProcessorContext context) {
 final RocksDBConfigSetter configSetter = 
Utils.newInstance(configSetterClass);
 configSetter.setConfig(name, options, configs);
 }
+
+if (prepareForBulkload) {
+options.prepareForBulkLoad();
+}
+
 this.dbDir = new File(new File(context.stateDir(), parentDir), 
this.name);
 
 try {


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Streams RocksDB bulk loading config may not be honored with customized 
> RocksDBConfigSetter 
> -
>
> Key: KAFKA-7023
> URL: https://issues.apache.org/jira/browse/KAFKA-7023
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Liquan Pei
>Assignee: Liquan Pei
>Priority: Major
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> We observed frequent L0 -> L1 compaction during Kafka Streams state recovery. 
> Some sample log:
> {code:java}
> 2018/06/08-00:04:50.892331 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892298) [db/compaction_picker_universal.cc:270] [default] 
> Universal: sorted runs files(6): files[3 0 0 0 1 1 38] max score 1.00
> 2018/06/08-00:04:50.892336 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892300) [db/compaction_picker_universal.cc:655] [default] 
> Universal: First candidate file 134[0] to reduce size amp.
> 2018/06/08-00:04:50.892338 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892302) [db/compaction_picker_universal.cc:686] [default] 
> Universal: size amp not needed. newer-files-total-size 13023497 
> earliest-file-size 2541530372
> 2018/06/08-00:04:50.892339 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892303) [db/compaction_picker_universal.cc:473] [default] 
> Universal: Possible candidate file 134[0].
> 2018/06/08-00:04:50.892341 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892304) [db/compaction_picker_universal.cc:525] [default] 
> Universal: Skipping file 134[0] with size 1007 (compensated size 1287)
> 2018/06/08-00:04:50.892343 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892306) [db/compaction_picker_universal.cc:473] [default] 
> Universal: Possible candidate file 133[1].
> 2018/06/08-00:04:50.892344 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892307) [db/compaction_picker_universal.cc:525] [default] 
> Universal: Skipping file 133[1] with size 4644 (compensated size 16124)
> 2018/06/08-00:04:50.892346 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892307) [db/compaction_picker_universal.cc:473] [default] 
> Universal: Possible candidate file 126[2].
> 2018/06/08-00:04:50.892348 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892308) [db/compaction_picker_universal.cc:525] [default] 
> Universal: Skipping file 126[2] with size 319764 (compensated size 319764)
> 2018/06/08-00:04:50.892349 7f8a6d7fa700 (Original Log Time 
> 2018/06/08-00:04:50.892309) [db/compaction_picker_universal.cc:473] [default] 
> 

[jira] [Commented] (KAFKA-6990) CommitFailedException; this task may be no longer owned by the thread

2018-06-11 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508693#comment-16508693
 ] 

Matthias J. Sax commented on KAFKA-6990:


There are two thread one for heartbeats and one for processing. The heartbeat 
thread is subject to heartbeat interval and session.timeout.ms – as you have 
session timeout with 30 seconds and heartbeat are sent every 5 second, you 
would need to miss 6 consecutive heartbeat to drop out of the group. For the 
main processing thread, max.poll.interval.ms is the timeout, ie, poll() must be 
called every 20 minutes in your case – this should actually be sufficient. Even 
if you wait max 5 seconds for MongoDB to response, you have maximum 50 records 
to process and thus, it should not take longer than 250 seconds until poll() is 
called again... Really hard to say what it happening. Let's see if Kafka 
Streams/Consumer logs reveal the issue.

> CommitFailedException; this task may be no longer owned by the thread
> -
>
> Key: KAFKA-6990
> URL: https://issues.apache.org/jira/browse/KAFKA-6990
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Eugen Feller
>Priority: Blocker
>
> We are seeing a lot of CommitFailedExceptions on one of our Kafka stream 
> apps. Running Kafka Streams 0.11.0.1 and Kafka Broker 0.10.2.1. Full error 
> message:
> {code:java}
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] INFO 
> org.apache.kafka.streams.KafkaStreams - stream-client 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d] State transition from 
> REBALANCING to RUNNING.
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_0] Failed 
> offset commits 
> {sightings_sighting_byclientmac_0-0=OffsetAndMetadata{offset=11, 
> metadata=''}, 
> mactocontactmappings_contactmapping_byclientmac_0-0=OffsetAndMetadata{offset=26,
>  metadata=''}} due to CommitFailedException
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_0 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_1 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_1] Failed 
> offset commits 
> {mactocontactmappings_contactmapping_byclientmac_0-1=OffsetAndMetadata{offset=38,
>  metadata=''}, sightings_sighting_byclientmac_0-1=OffsetAndMetadata{offset=1, 
> metadata=''}} due to CommitFailedException
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_2] Failed 
> offset commits 
> {mactocontactmappings_contactmapping_byclientmac_0-2=OffsetAndMetadata{offset=8,
>  metadata=''}, 
> sightings_sighting_byclientmac_0-2=OffsetAndMetadata{offset=24, metadata=''}} 
> due to CommitFailedException
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_2 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_3] Failed 
> offset commits 
> {sightings_sighting_byclientmac_0-3=OffsetAndMetadata{offset=21, 
> metadata=''}, 
> mactocontactmappings_contactmapping_byclientmac_0-3=OffsetAndMetadata{offset=102,
>  metadata=''}} due to CommitFailedException
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_3 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_4] Failed 
> offset commits 
> {sightings_sighting_byclientmac_0-4=OffsetAndMetadata{offset=5, metadata=''}, 
> 

[jira] [Resolved] (KAFKA-7005) Remove duplicate Java Resource class.

2018-06-11 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-7005.

Resolution: Fixed

merged the PR to trunk and 2.0 branch.

> Remove duplicate Java Resource class.
> -
>
> Key: KAFKA-7005
> URL: https://issues.apache.org/jira/browse/KAFKA-7005
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core, security
>Reporter: Andy Coates
>Assignee: Andy Coates
>Priority: Major
> Fix For: 2.0.0
>
>
> Relating to one of the outstanding work items in PR 
> [#5117|[https://github.com/apache/kafka/pull/5117]...]
> The o.a.k.c.request.Resource class could be dropped in favour of 
> o.a.k.c..config.ConfigResource.
> This will remove the duplication of `Resource` classes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7030) Add configuration to disable message down-conversion

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508688#comment-16508688
 ] 

ASF GitHub Bot commented on KAFKA-7030:
---

dhruvilshah3 opened a new pull request #5192: KAFKA-7030: Add configuration to 
disable message down-conversion (KIP-283)
URL: https://github.com/apache/kafka/pull/5192
 
 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add configuration to disable message down-conversion
> 
>
> Key: KAFKA-7030
> URL: https://issues.apache.org/jira/browse/KAFKA-7030
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Major
>
> Add configuration to disable message down-conversion as described in 
> [KIP-283|https://cwiki.apache.org/confluence/display/KAFKA/KIP-283%3A+Efficient+Memory+Usage+for+Down-Conversion].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7005) Remove duplicate Java Resource class.

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508684#comment-16508684
 ] 

ASF GitHub Bot commented on KAFKA-7005:
---

junrao closed pull request #5184: KAFKA-7005: Remove duplicate resource class.
URL: https://github.com/apache/kafka/pull/5184
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 450de06fcd1..495095a9276 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -112,8 +112,6 @@
 import org.apache.kafka.common.requests.OffsetFetchResponse;
 import org.apache.kafka.common.requests.RenewDelegationTokenRequest;
 import org.apache.kafka.common.requests.RenewDelegationTokenResponse;
-import org.apache.kafka.common.requests.Resource;
-import org.apache.kafka.common.requests.ResourceType;
 import org.apache.kafka.common.security.token.delegation.DelegationToken;
 import org.apache.kafka.common.security.token.delegation.TokenInformation;
 import org.apache.kafka.common.utils.AppInfoParser;
@@ -1683,19 +1681,19 @@ public DescribeConfigsResult 
describeConfigs(Collection configRe
 
 // The BROKER resources which we want to describe.  We must make a 
separate DescribeConfigs
 // request for every BROKER resource we want to describe.
-final Collection brokerResources = new ArrayList<>();
+final Collection brokerResources = new ArrayList<>();
 
 // The non-BROKER resources which we want to describe.  These 
resources can be described by a
 // single, unified DescribeConfigs request.
-final Collection unifiedRequestResources = new 
ArrayList<>(configResources.size());
+final Collection unifiedRequestResources = new 
ArrayList<>(configResources.size());
 
 for (ConfigResource resource : configResources) {
 if (resource.type() == ConfigResource.Type.BROKER && 
!resource.isDefault()) {
-brokerFutures.put(resource, new KafkaFutureImpl());
-brokerResources.add(configResourceToResource(resource));
+brokerFutures.put(resource, new KafkaFutureImpl<>());
+brokerResources.add(resource);
 } else {
-unifiedRequestFutures.put(resource, new 
KafkaFutureImpl());
-
unifiedRequestResources.add(configResourceToResource(resource));
+unifiedRequestFutures.put(resource, new KafkaFutureImpl<>());
+unifiedRequestResources.add(resource);
 }
 }
 
@@ -1716,7 +1714,7 @@ void handleResponse(AbstractResponse abstractResponse) {
 for (Map.Entry> 
entry : unifiedRequestFutures.entrySet()) {
 ConfigResource configResource = entry.getKey();
 KafkaFutureImpl future = entry.getValue();
-DescribeConfigsResponse.Config config = 
response.config(configResourceToResource(configResource));
+DescribeConfigsResponse.Config config = 
response.config(configResource);
 if (config == null) {
 future.completeExceptionally(new 
UnknownServerException(
 "Malformed broker response: missing config for 
" + configResource));
@@ -1746,7 +1744,7 @@ void handleFailure(Throwable throwable) {
 
 for (Map.Entry> entry : 
brokerFutures.entrySet()) {
 final KafkaFutureImpl brokerFuture = entry.getValue();
-final Resource resource = configResourceToResource(entry.getKey());
+final ConfigResource resource = entry.getKey();
 final int nodeId = Integer.parseInt(resource.name());
 runnable.call(new Call("describeBrokerConfigs", 
calcDeadlineMs(now, options.timeoutMs()),
 new ConstantNodeIdProvider(nodeId)) {
@@ -1792,21 +1790,6 @@ void handleFailure(Throwable throwable) {
 return new DescribeConfigsResult(allFutures);
 }
 
-private Resource configResourceToResource(ConfigResource configResource) {
-ResourceType resourceType;
-switch (configResource.type()) {
-case TOPIC:
-resourceType = ResourceType.TOPIC;
-break;
-case BROKER:
-resourceType = ResourceType.BROKER;
-break;
-default:
-throw new IllegalArgumentException("Unexpected resource type " 
+ configResource.type());
-

[jira] [Commented] (KAFKA-6977) Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 while fetching data

2018-06-11 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508683#comment-16508683
 ] 

Matthias J. Sax commented on KAFKA-6977:


Hard to tell where the corruption occurs. I could be on the wire... Maybe 
[~hachikuji] can help out here. It does not seem to be a Kafka Streams issues. 
Did you use a 0.11.0 or 0.10.2 consumer? Did the consumer run in the same 
environment (ie, AWS ECS as mentioned in the description)? Can you consumer the 
data with Kafka Streams in a different environment?

>  Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 
> while fetching data
> -
>
> Key: KAFKA-6977
> URL: https://issues.apache.org/jira/browse/KAFKA-6977
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Eugen Feller
>Priority: Blocker
>  Labels: streams
>
> We are running Kafka Streams 0.11.0.1 with Kafka Broker 0.10.2.1 and 
> constantly run into the following exception: 
> {code:java}
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> partition assignment took 40 ms.
> current active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, 0_15, 
> 0_17, 0_20, 0_21, 0_23, 0_25, 0_28, 0_2, 0_6, 0_7, 0_8, 0_9, 0_11, 0_16, 
> 0_18, 0_19, 0_22, 0_24, 0_26, 0_27, 0_29, 0_30, 0_31]
> current standby tasks: []
> previous active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, 
> 0_15, 0_17, 0_20, 0_21, 0_23, 0_25, 0_28]
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> State transition from PARTITIONS_ASSIGNED to RUNNING.
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> INFO org.apache.kafka.streams.KafkaStreams - stream-client 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e] State 
> transition from REBALANCING to RUNNING.
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> ERROR org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> Encountered the following error during processing:
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:886)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1086)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490)
> java.lang.IllegalStateException: Unexpected error code 2 while fetching data
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> Shutting down
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> State transition from RUNNING to PENDING_SHUTDOWN.
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka 
> producer with timeoutMillis = 9223372036854775807 ms.
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> Stream thread shutdown complete
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> INFO org.apache.kafka.streams.processor.internals.StreamThread - 
> stream-thread 
> [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] 
> State transition from PENDING_SHUTDOWN to DEAD.
> 

[jira] [Created] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions

2018-06-11 Thread Lucas Wang (JIRA)
Lucas Wang created KAFKA-7040:
-

 Summary: The replica fetcher thread may truncate accepted messages 
during multiple fast leadership transitions
 Key: KAFKA-7040
 URL: https://issues.apache.org/jira/browse/KAFKA-7040
 Project: Kafka
  Issue Type: Bug
Reporter: Lucas Wang


Problem Statement:
Consider the scenario where there are two brokers, broker0, and broker1, and 
there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as 
the leader and broker0 as the follower. The following sequence of events 
happened on broker0

1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to 
broker1, and awaits to get the response
2. A LeaderAndISR request causes broker0 to become the leader for one partition 
t1p0, which in turn will remove the partition t1p0 from the replica fetcher 
thread
3. Broker0 accepts some messages from a producer
4. A 2nd LeaderAndISR request causes broker1 to become the leader, and broker0 
to become the follower for partition t1p0. This will cause the partition t1p0 
to be added back to the replica fetcher thread on broker0.
5. The replica fetcher thread on broker0 receives a response for the 
LeaderEpoch request issued in step 1, and truncates the accepted messages in 
step3.

The issue can be reproduced with the test from 
https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea

[1] Initially we set up broker0 to be the follower of two partitions instead of 
just one, to avoid the shutting down of the replica fetcher thread when it 
becomes idle.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6977) Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 while fetching data

2018-06-11 Thread Eugen Feller (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508661#comment-16508661
 ] 

Eugen Feller commented on KAFKA-6977:
-

Hi [~mjsax]. Interesting. Thanks a lot. I think the topic itself is likely 
consumed by multiple downstream consumers. However, only this job actually 
consumes it for the purposes of writing out to a database. Just tested 
consuming the topic with plain KafkaConsumer and it works. I also went over all 
the stack traces seen in that context and found the following:
{code:java}
KafkaException: Record for partition our_stats_topic_0-17 at offset 217641273 
is invalid, cause: Record is corrupt (stored crc = 3302026163, computed crc = 
3432237873)
1
at maybeEnsureValid 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1002)
2
at nextFetchedRecord 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1059)
3
at fetchRecords 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1090)
4
at access$1200 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:944)
5
at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:567)
6
at fetchedRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:528)
7
at pollOnce (org.apache.kafka.clients.consumer.KafkaConsumer.java:1086)
8
at poll (org.apache.kafka.clients.consumer.KafkaConsumer.java:1043)
9
at pollRequests 
(org.apache.kafka.streams.processor.internals.StreamThread.java:536)
10
at runOnce (org.apache.kafka.streams.processor.internals.StreamThread.java:490)
11
at runLoop (org.apache.kafka.streams.processor.internals.StreamThread.java:480)
12
at run (org.apache.kafka.streams.processor.internals.StreamThread.java:457)
KafkaException: Received exception when fetching the next record from 
our_stats_topic_0-17. If needed, please seek past the record to continue 
consumption.
1
at fetchRecords 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1110)
2
at access$1200 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:944)
3
at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:567)
4
at fetchedRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:528)
5
at pollOnce (org.apache.kafka.clients.consumer.KafkaConsumer.java:1086)
6
at poll (org.apache.kafka.clients.consumer.KafkaConsumer.java:1043)
7
at pollRequests 
(org.apache.kafka.streams.processor.internals.StreamThread.java:536)
8
at runOnce (org.apache.kafka.streams.processor.internals.StreamThread.java:490)
9
at runLoop (org.apache.kafka.streams.processor.internals.StreamThread.java:480)
10
at run (org.apache.kafka.streams.processor.internals.StreamThread.java:457)
{code}
In this particular instance I think the following stack trace was seen at the 
same time with error code 2:

 
{code:java}
CorruptRecordException: Record size is less than the minimum record overhead 
(14)
KafkaException: Received exception when fetching the next record from 
our_stats_topic_0-16. If needed, please seek past the record to continue 
consumption.
1
at fetchRecords 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1076)
2
at access$1200 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:944)
3
at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:567)
4
at fetchedRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:528)
5
at pollOnce (org.apache.kafka.clients.consumer.KafkaConsumer.java:1086)
6
at poll (org.apache.kafka.clients.consumer.KafkaConsumer.java:1043)
7
at pollRequests 
(org.apache.kafka.streams.processor.internals.StreamThread.java:536)
8
at runOnce (org.apache.kafka.streams.processor.internals.StreamThread.java:490)
9
at runLoop (org.apache.kafka.streams.processor.internals.StreamThread.java:480)
10
at run (org.apache.kafka.streams.processor.internals.StreamThread.java:457)
{code}
I am wondering what could have lead to error code 2 and maybe the above 
ConcurrentRecordException and what would be the best way to mitigate them? Do 
the records get corrupt on the wire somehow?

 

>  Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 
> while fetching data
> -
>
> Key: KAFKA-6977
> URL: https://issues.apache.org/jira/browse/KAFKA-6977
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Eugen Feller
>Priority: Blocker
>  Labels: streams
>
> We are running Kafka Streams 0.11.0.1 with Kafka Broker 0.10.2.1 and 
> constantly run into the following exception: 
> {code:java}
> [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] 
> INFO 

[jira] [Comment Edited] (KAFKA-6977) Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 while fetching data

2018-06-11 Thread Eugen Feller (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508661#comment-16508661
 ] 

Eugen Feller edited comment on KAFKA-6977 at 6/11/18 8:13 PM:
--

Hi [~mjsax]. Interesting. Thanks a lot. I think the topic itself is likely 
consumed by multiple downstream consumers. However, only this job actually 
consumes it for the purposes of writing out to a database. Just tested 
consuming the topic with plain KafkaConsumer and it works. I also went over all 
the stack traces seen in that context and found the following:
{code:java}
KafkaException: Record for partition our_stats_topic_0-17 at offset 217641273 
is invalid, cause: Record is corrupt (stored crc = 3302026163, computed crc = 
3432237873)
1
at maybeEnsureValid 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1002)
2
at nextFetchedRecord 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1059)
3
at fetchRecords 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1090)
4
at access$1200 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:944)
5
at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:567)
6
at fetchedRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:528)
7
at pollOnce (org.apache.kafka.clients.consumer.KafkaConsumer.java:1086)
8
at poll (org.apache.kafka.clients.consumer.KafkaConsumer.java:1043)
9
at pollRequests 
(org.apache.kafka.streams.processor.internals.StreamThread.java:536)
10
at runOnce (org.apache.kafka.streams.processor.internals.StreamThread.java:490)
11
at runLoop (org.apache.kafka.streams.processor.internals.StreamThread.java:480)
12
at run (org.apache.kafka.streams.processor.internals.StreamThread.java:457)
KafkaException: Received exception when fetching the next record from 
our_stats_topic_0-17. If needed, please seek past the record to continue 
consumption.
1
at fetchRecords 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1110)
2
at access$1200 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:944)
3
at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:567)
4
at fetchedRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:528)
5
at pollOnce (org.apache.kafka.clients.consumer.KafkaConsumer.java:1086)
6
at poll (org.apache.kafka.clients.consumer.KafkaConsumer.java:1043)
7
at pollRequests 
(org.apache.kafka.streams.processor.internals.StreamThread.java:536)
8
at runOnce (org.apache.kafka.streams.processor.internals.StreamThread.java:490)
9
at runLoop (org.apache.kafka.streams.processor.internals.StreamThread.java:480)
10
at run (org.apache.kafka.streams.processor.internals.StreamThread.java:457)
{code}
In this particular instance I think the following stack trace was seen at the 
same time with error code 2:
{code:java}
CorruptRecordException: Record size is less than the minimum record overhead 
(14)
KafkaException: Received exception when fetching the next record from 
our_stats_topic_0-16. If needed, please seek past the record to continue 
consumption.
1
at fetchRecords 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1076)
2
at access$1200 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:944)
3
at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:567)
4
at fetchedRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:528)
5
at pollOnce (org.apache.kafka.clients.consumer.KafkaConsumer.java:1086)
6
at poll (org.apache.kafka.clients.consumer.KafkaConsumer.java:1043)
7
at pollRequests 
(org.apache.kafka.streams.processor.internals.StreamThread.java:536)
8
at runOnce (org.apache.kafka.streams.processor.internals.StreamThread.java:490)
9
at runLoop (org.apache.kafka.streams.processor.internals.StreamThread.java:480)
10
at run (org.apache.kafka.streams.processor.internals.StreamThread.java:457)
{code}
I am wondering what could have lead to error code 2 and maybe the above 
ConcurrentRecordException and what would be the best way to mitigate them? Do 
the records get corrupt on the wire somehow?

 


was (Author: efeller):
Hi [~mjsax]. Interesting. Thanks a lot. I think the topic itself is likely 
consumed by multiple downstream consumers. However, only this job actually 
consumes it for the purposes of writing out to a database. Just tested 
consuming the topic with plain KafkaConsumer and it works. I also went over all 
the stack traces seen in that context and found the following:
{code:java}
KafkaException: Record for partition our_stats_topic_0-17 at offset 217641273 
is invalid, cause: Record is corrupt (stored crc = 3302026163, computed crc = 
3432237873)
1
at maybeEnsureValid 
(org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1002)
2
at nextFetchedRecord 

[jira] [Comment Edited] (KAFKA-6990) CommitFailedException; this task may be no longer owned by the thread

2018-06-11 Thread Eugen Feller (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508566#comment-16508566
 ] 

Eugen Feller edited comment on KAFKA-6990 at 6/11/18 7:11 PM:
--

Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. 
My current defaults are as follows:
{code:java}
requestTimeout: Duration = 30 minutes,
maxPollInterval: Duration = 20 minutes,
maxPollRecords: Long = 50,
fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES,
fetchMaxWait: Duration = 10 minutes,
sessionTimeout: Duration = 30 seconds,
heartbeatInterval: Duration = 5 seconds,
stateDirCleanupDelay: Long = Long.MaxValue,
numStreamThreads: Long = 1,
numStandbyReplicas: Long = 0,
cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code}
Should I further increase the max.poll.interval.ms (currently 20 minutes)? This 
job implements the following logic:
{code:java}
stream().foreach(record => {
// Buffered write this record to MongoDB 
// Await.result(record, 5 seconds)
}){code}
Can it be that writes to Mongo take too long and that leads to problems? I was 
under the impression that by setting heartbeat interval to 5 seconds in Kafka 
0.11.0.1, we should have two threads, one that does heartbeat sending (in order 
for the consumer to considered alive) and one that actually calls poll(). In 
that case blocking too long inside foreach should not kick us out of the 
consumer group?

I will try to collect DEBUG level logs today.

 


was (Author: efeller):
Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. 
My current defaults are as follows:
{code:java}
requestTimeout: Duration = 30 minutes,
maxPollInterval: Duration = 20 minutes,
maxPollRecords: Long = 50,
fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES,
fetchMaxWait: Duration = 10 minutes,
sessionTimeout: Duration = 30 seconds,
heartbeatInterval: Duration = 5 seconds,
stateDirCleanupDelay: Long = Long.MaxValue,
numStreamThreads: Long = 1,
numStandbyReplicas: Long = 0,
cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code}
Should I further increase the max.poll.interval.ms (currently 20 minutes)? This 
job implements the following logic:
{code:java}
stream().foreach(record => {
// Buffered write this record to MongoDB 
// Await.result(record, 5 seconds)
}){code}
Can it be that writes to Mongo take too long and that leads to problems? I was 
under the impression that by setting heartbeat interval to 5 seconds in Kafka 
0.11.0.1, we should have two threads, one that does heartbeat sending (to 
considered alive) and one that actually calls poll(). In that case blocking too 
long inside foreach should not kick us out of the consumer group?

I will try to collect DEBUG level logs today.

 

> CommitFailedException; this task may be no longer owned by the thread
> -
>
> Key: KAFKA-6990
> URL: https://issues.apache.org/jira/browse/KAFKA-6990
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Eugen Feller
>Priority: Blocker
>
> We are seeing a lot of CommitFailedExceptions on one of our Kafka stream 
> apps. Running Kafka Streams 0.11.0.1 and Kafka Broker 0.10.2.1. Full error 
> message:
> {code:java}
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] INFO 
> org.apache.kafka.streams.KafkaStreams - stream-client 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d] State transition from 
> REBALANCING to RUNNING.
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_0] Failed 
> offset commits 
> {sightings_sighting_byclientmac_0-0=OffsetAndMetadata{offset=11, 
> metadata=''}, 
> mactocontactmappings_contactmapping_byclientmac_0-0=OffsetAndMetadata{offset=26,
>  metadata=''}} due to CommitFailedException
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_0 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_1 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_1] Failed 
> offset commits 
> {mactocontactmappings_contactmapping_byclientmac_0-1=OffsetAndMetadata{offset=38,

[jira] [Comment Edited] (KAFKA-6990) CommitFailedException; this task may be no longer owned by the thread

2018-06-11 Thread Eugen Feller (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508566#comment-16508566
 ] 

Eugen Feller edited comment on KAFKA-6990 at 6/11/18 7:10 PM:
--

Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. 
My current defaults are as follows:
{code:java}
requestTimeout: Duration = 30 minutes,
maxPollInterval: Duration = 20 minutes,
maxPollRecords: Long = 50,
fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES,
fetchMaxWait: Duration = 10 minutes,
sessionTimeout: Duration = 30 seconds,
heartbeatInterval: Duration = 5 seconds,
stateDirCleanupDelay: Long = Long.MaxValue,
numStreamThreads: Long = 1,
numStandbyReplicas: Long = 0,
cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code}
Should I further increase the max.poll.interval.ms (currently 20 minutes)? This 
job implements the following logic:
{code:java}
stream().foreach(record => {
// Buffered write this record to MongoDB 
// Await.result(record, 5 seconds)
}){code}
Can it be that writes to Mongo take too long and that leads to problems? I was 
under the impression that by setting heartbeat interval to 5 seconds in Kafka 
0.11.0.1, we should have two threads, one that does heartbeat sending (to 
considered alive) and one that actually calls poll(). In that case blocking too 
long inside foreach should not kick us out of the consumer group?

I will try to collect DEBUG level logs today.

 


was (Author: efeller):
Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. 
My current defaults are as follows:
{code:java}
requestTimeout: Duration = 30 minutes,
maxPollInterval: Duration = 20 minutes,
maxPollRecords: Long = 50,
fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES,
fetchMaxWait: Duration = 10 minutes,
sessionTimeout: Duration = 30 seconds,
heartbeatInterval: Duration = 5 seconds,
stateDirCleanupDelay: Long = Long.MaxValue,
numStreamThreads: Long = 1,
numStandbyReplicas: Long = 0,
cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code}
Should I further increase the max.poll.interval.ms (currently 20 minutes)? This 
job implements the following logic:
{code:java}
stream().foreach(record => {
// Buffered write to MongoDB 
// Await.result(5 seconds)
}){code}
Can it be that writes to Mongo take too long and that leads to problems? I was 
under the impression that by setting heartbeat interval to 5 seconds in Kafka 
0.11.0.1, we should have two threads, one that does heartbeat sending (to 
considered alive) and one that actually calls poll(). In that case blocking too 
long inside foreach should not kick us out of the consumer group?

I will try to collect DEBUG level logs today.

 

> CommitFailedException; this task may be no longer owned by the thread
> -
>
> Key: KAFKA-6990
> URL: https://issues.apache.org/jira/browse/KAFKA-6990
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Eugen Feller
>Priority: Blocker
>
> We are seeing a lot of CommitFailedExceptions on one of our Kafka stream 
> apps. Running Kafka Streams 0.11.0.1 and Kafka Broker 0.10.2.1. Full error 
> message:
> {code:java}
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] INFO 
> org.apache.kafka.streams.KafkaStreams - stream-client 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d] State transition from 
> REBALANCING to RUNNING.
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_0] Failed 
> offset commits 
> {sightings_sighting_byclientmac_0-0=OffsetAndMetadata{offset=11, 
> metadata=''}, 
> mactocontactmappings_contactmapping_byclientmac_0-0=OffsetAndMetadata{offset=26,
>  metadata=''}} due to CommitFailedException
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_0 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_1 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_1] Failed 
> offset commits 
> {mactocontactmappings_contactmapping_byclientmac_0-1=OffsetAndMetadata{offset=38,
>  metadata=''}, 

[jira] [Comment Edited] (KAFKA-6990) CommitFailedException; this task may be no longer owned by the thread

2018-06-11 Thread Eugen Feller (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508566#comment-16508566
 ] 

Eugen Feller edited comment on KAFKA-6990 at 6/11/18 7:09 PM:
--

Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. 
My current defaults are as follows:
{code:java}
requestTimeout: Duration = 30 minutes,
maxPollInterval: Duration = 20 minutes,
maxPollRecords: Long = 50,
fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES,
fetchMaxWait: Duration = 10 minutes,
sessionTimeout: Duration = 30 seconds,
heartbeatInterval: Duration = 5 seconds,
stateDirCleanupDelay: Long = Long.MaxValue,
numStreamThreads: Long = 1,
numStandbyReplicas: Long = 0,
cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code}
Should I further increase the max.poll.interval.ms (currently 20 minutes)? This 
job implements the following logic:
{code:java}
stream().foreach(record => {
// Buffered write to MongoDB 
// Await.result(5 seconds)
}){code}
Can it be that writes to Mongo take too long and that leads to problems? I was 
under the impression that by setting heartbeat interval to 5 seconds in Kafka 
0.11.0.1, we should have two threads, one that does heartbeat sending (to 
considered alive) and one that actually calls poll(). In that case blocking too 
long inside foreach should not kick us out of the consumer group?

I will try to collect DEBUG level logs today.

 


was (Author: efeller):
Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. 
My current defaults are as follows:
{code:java}
requestTimeout: Duration = 30 minutes,
maxPollInterval: Duration = 20 minutes,
maxPollRecords: Long = 1000,
fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES,
fetchMaxWait: Duration = 10 minutes,
sessionTimeout: Duration = 30 seconds,
heartbeatInterval: Duration = 5 seconds,
stateDirCleanupDelay: Long = Long.MaxValue,
numStreamThreads: Long = 1,
numStandbyReplicas: Long = 0,
cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code}
Should I further increase the max.poll.interval.ms (currently 20 minutes)? For 
this particular job max poll records is set to 50. This job implements the 
following logic:
{code:java}
stream().foreach(record => {
// Buffered write to MongoDB 
// Await.result(5 seconds)
}){code}
Can it be that writes to Mongo take too long and that leads to problems? I was 
under the impression that by setting heartbeat interval to 5 seconds in Kafka 
0.11.0.1, we should have two threads, one that does heartbeat sending (to 
considered alive) and one that actually calls poll(). In that case blocking too 
long inside foreach should not kick us out of the consumer group?

I will try to collect DEBUG level logs today.

 

> CommitFailedException; this task may be no longer owned by the thread
> -
>
> Key: KAFKA-6990
> URL: https://issues.apache.org/jira/browse/KAFKA-6990
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Eugen Feller
>Priority: Blocker
>
> We are seeing a lot of CommitFailedExceptions on one of our Kafka stream 
> apps. Running Kafka Streams 0.11.0.1 and Kafka Broker 0.10.2.1. Full error 
> message:
> {code:java}
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] INFO 
> org.apache.kafka.streams.KafkaStreams - stream-client 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d] State transition from 
> REBALANCING to RUNNING.
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_0] Failed 
> offset commits 
> {sightings_sighting_byclientmac_0-0=OffsetAndMetadata{offset=11, 
> metadata=''}, 
> mactocontactmappings_contactmapping_byclientmac_0-0=OffsetAndMetadata{offset=26,
>  metadata=''}} due to CommitFailedException
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_0 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_1 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_1] Failed 
> offset commits 
> {mactocontactmappings_contactmapping_byclientmac_0-1=OffsetAndMetadata{offset=38,
>  

[jira] [Comment Edited] (KAFKA-6990) CommitFailedException; this task may be no longer owned by the thread

2018-06-11 Thread Eugen Feller (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508566#comment-16508566
 ] 

Eugen Feller edited comment on KAFKA-6990 at 6/11/18 7:08 PM:
--

Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. 
My current defaults are as follows:
{code:java}
requestTimeout: Duration = 30 minutes,
maxPollInterval: Duration = 20 minutes,
maxPollRecords: Long = 1000,
fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES,
fetchMaxWait: Duration = 10 minutes,
sessionTimeout: Duration = 30 seconds,
heartbeatInterval: Duration = 5 seconds,
stateDirCleanupDelay: Long = Long.MaxValue,
numStreamThreads: Long = 1,
numStandbyReplicas: Long = 0,
cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code}
Should I further increase the max.poll.interval.ms (currently 20 minutes)? For 
this particular job max poll records is set to 50. This job implements the 
following logic:
{code:java}
stream().foreach(record => {
// Buffered write to MongoDB 
// Await.result(5 seconds)
}){code}
Can it be that writes to Mongo take too long and that leads to problems? I was 
under the impression that by setting heartbeat interval to 5 seconds in Kafka 
0.11.0.1, we should have two threads, one that does heartbeat sending (to 
considered alive) and one that actually calls poll(). In that case blocking too 
long inside foreach should not kick us out of the consumer group?

I will try to collect DEBUG level logs today.

 


was (Author: efeller):
Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. 
My current defaults are as follows:
{code:java}
requestTimeout: Duration = 30 minutes,
maxPollInterval: Duration = 20 minutes,
maxPollRecords: Long = 1000,
fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES,
fetchMaxWait: Duration = 10 minutes,
sessionTimeout: Duration = 30 seconds,
heartbeatInterval: Duration = 5 seconds,
stateDirCleanupDelay: Long = Long.MaxValue,
numStreamThreads: Long = 1,
numStandbyReplicas: Long = 0,
cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code}
Should I further increase the max.poll.interval.ms (currently 20 minutes)? For 
this particular job max poll records is set to 50. This job implements the 
following logic:
{code:java}
stream().foreach(record => {
// Buffered write to MongoDB 
// Await.result(5 seconds)
}){code}
 

Can it be that writes to Mongo take too long and that leads to problems? I was 
under the impression that by setting heartbeat interval to 5 seconds in Kafka 
0.11.0.1, we should have two threads, one that does heartbeat sending (to 
considered alive) and one that actually calls poll(). In that case blocking too 
long inside foreach should not kick us out of the consumer group?

I will try to collect DEBUG level logs today.

 

> CommitFailedException; this task may be no longer owned by the thread
> -
>
> Key: KAFKA-6990
> URL: https://issues.apache.org/jira/browse/KAFKA-6990
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Eugen Feller
>Priority: Blocker
>
> We are seeing a lot of CommitFailedExceptions on one of our Kafka stream 
> apps. Running Kafka Streams 0.11.0.1 and Kafka Broker 0.10.2.1. Full error 
> message:
> {code:java}
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] INFO 
> org.apache.kafka.streams.KafkaStreams - stream-client 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d] State transition from 
> REBALANCING to RUNNING.
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_0] Failed 
> offset commits 
> {sightings_sighting_byclientmac_0-0=OffsetAndMetadata{offset=11, 
> metadata=''}, 
> mactocontactmappings_contactmapping_byclientmac_0-0=OffsetAndMetadata{offset=26,
>  metadata=''}} due to CommitFailedException
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_0 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_1 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_1] Failed 
> offset commits 
> 

[jira] [Comment Edited] (KAFKA-6990) CommitFailedException; this task may be no longer owned by the thread

2018-06-11 Thread Eugen Feller (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508566#comment-16508566
 ] 

Eugen Feller edited comment on KAFKA-6990 at 6/11/18 7:08 PM:
--

Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. 
My current defaults are as follows:
{code:java}
requestTimeout: Duration = 30 minutes,
maxPollInterval: Duration = 20 minutes,
maxPollRecords: Long = 1000,
fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES,
fetchMaxWait: Duration = 10 minutes,
sessionTimeout: Duration = 30 seconds,
heartbeatInterval: Duration = 5 seconds,
stateDirCleanupDelay: Long = Long.MaxValue,
numStreamThreads: Long = 1,
numStandbyReplicas: Long = 0,
cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code}
Should I further increase the max.poll.interval.ms (currently 20 minutes)? For 
this particular job max poll records is set to 50. This job implements the 
following logic:
{code:java}
stream().foreach(record => {
// Buffered write to MongoDB 
// Await.result(5 seconds)
}){code}
 

Can it be that writes to Mongo take too long and that leads to problems? I was 
under the impression that by setting heartbeat interval to 5 seconds in Kafka 
0.11.0.1, we should have two threads, one that does heartbeat sending (to 
considered alive) and one that actually calls poll(). In that case blocking too 
long inside foreach should not kick us out of the consumer group?

I will try to collect DEBUG level logs today.

 


was (Author: efeller):
Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. 
My current defaults are as follows:
{code:java}
requestTimeout: Duration = 30 minutes,
maxPollInterval: Duration = 20 minutes,
maxPollRecords: Long = 1000,
fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES,
fetchMaxWait: Duration = 10 minutes,
sessionTimeout: Duration = 30 seconds,
heartbeatInterval: Duration = 5 seconds,
stateDirCleanupDelay: Long = Long.MaxValue,
numStreamThreads: Long = 1,
numStandbyReplicas: Long = 0,
cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code}
Should I further increase the max.poll.interval.ms (currently 20 minutes)? This 
particular job does the following logic:

 
{code:java}
stream().foreach(record => {
// Buffered write to MongoDB 
// Await.result(5 seconds)
}){code}
 

Can it be that writes to Mongo take too long and that leads to problems? I was 
under the impression that by setting heartbeat interval to 5 seconds in Kafka 
0.11.0.1, we should have two threads, one that does heartbeat sending (to 
considered alive) and one that actually calls poll(). In that case blocking too 
long inside foreach should not kick us out of the consumer group?

I will try to collect DEBUG level logs today.

 

> CommitFailedException; this task may be no longer owned by the thread
> -
>
> Key: KAFKA-6990
> URL: https://issues.apache.org/jira/browse/KAFKA-6990
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Eugen Feller
>Priority: Blocker
>
> We are seeing a lot of CommitFailedExceptions on one of our Kafka stream 
> apps. Running Kafka Streams 0.11.0.1 and Kafka Broker 0.10.2.1. Full error 
> message:
> {code:java}
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] INFO 
> org.apache.kafka.streams.KafkaStreams - stream-client 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d] State transition from 
> REBALANCING to RUNNING.
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_0] Failed 
> offset commits 
> {sightings_sighting_byclientmac_0-0=OffsetAndMetadata{offset=11, 
> metadata=''}, 
> mactocontactmappings_contactmapping_byclientmac_0-0=OffsetAndMetadata{offset=26,
>  metadata=''}} due to CommitFailedException
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_0 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_1 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_1] Failed 
> offset commits 
> 

[jira] [Commented] (KAFKA-6990) CommitFailedException; this task may be no longer owned by the thread

2018-06-11 Thread Eugen Feller (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508566#comment-16508566
 ] 

Eugen Feller commented on KAFKA-6990:
-

Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. 
My current defaults are as follows:
{code:java}
requestTimeout: Duration = 30 minutes,
maxPollInterval: Duration = 20 minutes,
maxPollRecords: Long = 1000,
fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES,
fetchMaxWait: Duration = 10 minutes,
sessionTimeout: Duration = 30 seconds,
heartbeatInterval: Duration = 5 seconds,
stateDirCleanupDelay: Long = Long.MaxValue,
numStreamThreads: Long = 1,
numStandbyReplicas: Long = 0,
cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code}
Should I further increase the max.poll.interval.ms (currently 20 minutes)? This 
particular job does the following logic:

 
{code:java}
stream().foreach(record => {
// Buffered write to MongoDB 
// Await.result(5 seconds)
}){code}
 

Can it be that writes to Mongo take too long and that leads to problems? I was 
under the impression that by setting heartbeat interval to 5 seconds in Kafka 
0.11.0.1, we should have two threads, one that does heartbeat sending (to 
considered alive) and one that actually calls poll(). In that case blocking too 
long inside foreach should not kick us out of the consumer group?

I will try to collect DEBUG level logs today.

 

> CommitFailedException; this task may be no longer owned by the thread
> -
>
> Key: KAFKA-6990
> URL: https://issues.apache.org/jira/browse/KAFKA-6990
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Eugen Feller
>Priority: Blocker
>
> We are seeing a lot of CommitFailedExceptions on one of our Kafka stream 
> apps. Running Kafka Streams 0.11.0.1 and Kafka Broker 0.10.2.1. Full error 
> message:
> {code:java}
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] INFO 
> org.apache.kafka.streams.KafkaStreams - stream-client 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d] State transition from 
> REBALANCING to RUNNING.
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_0] Failed 
> offset commits 
> {sightings_sighting_byclientmac_0-0=OffsetAndMetadata{offset=11, 
> metadata=''}, 
> mactocontactmappings_contactmapping_byclientmac_0-0=OffsetAndMetadata{offset=26,
>  metadata=''}} due to CommitFailedException
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_0 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_1 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_1] Failed 
> offset commits 
> {mactocontactmappings_contactmapping_byclientmac_0-1=OffsetAndMetadata{offset=38,
>  metadata=''}, sightings_sighting_byclientmac_0-1=OffsetAndMetadata{offset=1, 
> metadata=''}} due to CommitFailedException
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_2] Failed 
> offset commits 
> {mactocontactmappings_contactmapping_byclientmac_0-2=OffsetAndMetadata{offset=8,
>  metadata=''}, 
> sightings_sighting_byclientmac_0-2=OffsetAndMetadata{offset=24, metadata=''}} 
> due to CommitFailedException
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_2 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_3] Failed 
> offset commits 
> {sightings_sighting_byclientmac_0-3=OffsetAndMetadata{offset=21, 
> metadata=''}, 
> mactocontactmappings_contactmapping_byclientmac_0-3=OffsetAndMetadata{offset=102,
>  metadata=''}} due to CommitFailedException
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> 

[jira] [Commented] (KAFKA-7039) DelegatingClassLoader creates plugin instance even if its not Versioned

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508556#comment-16508556
 ] 

ASF GitHub Bot commented on KAFKA-7039:
---

mageshn opened a new pull request #5191: KAFKA-7039 : Create an instance of the 
plugin only it's a Versioned Plugin
URL: https://github.com/apache/kafka/pull/5191
 
 
   Create an instance of the plugin only it's a Versioned Plugin.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behavior change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> DelegatingClassLoader creates plugin instance even if its not Versioned
> ---
>
> Key: KAFKA-7039
> URL: https://issues.apache.org/jira/browse/KAFKA-7039
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.0.0
>Reporter: Magesh kumar Nandakumar
>Assignee: Magesh kumar Nandakumar
>Priority: Critical
> Fix For: 2.0.0
>
>
> The versioned interface was introduced as part of 
> [KIP-285|https://cwiki.apache.org/confluence/display/KAFKA/KIP-285%3A+Connect+Rest+Extension+Plugin].
>  DelegatingClassLoader is now attempting to create an instance of all the 
> plugins, even if it's not required.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7039) DelegatingClassLoader creates plugin instance even if its not Versioned

2018-06-11 Thread Magesh kumar Nandakumar (JIRA)
Magesh kumar Nandakumar created KAFKA-7039:
--

 Summary: DelegatingClassLoader creates plugin instance even if its 
not Versioned
 Key: KAFKA-7039
 URL: https://issues.apache.org/jira/browse/KAFKA-7039
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.0.0
Reporter: Magesh kumar Nandakumar
Assignee: Magesh kumar Nandakumar
 Fix For: 2.0.0


The versioned interface was introduced as part of 
[KIP-285|https://cwiki.apache.org/confluence/display/KAFKA/KIP-285%3A+Connect+Rest+Extension+Plugin].
 DelegatingClassLoader is now attempting to create an instance of all the 
plugins, even if it's not required.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6546) Add ENDPOINT_NOT_FOUND_ON_LEADER error code for missing listener

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508476#comment-16508476
 ] 

ASF GitHub Bot commented on KAFKA-6546:
---

rajinisivaram opened a new pull request #5189: KAFKA-6546: Use 
LISTENER_NOT_FOUND_ON_LEADER error for missing listener
URL: https://github.com/apache/kafka/pull/5189
 
 
   For metadata request version 6 and above, use a different error code to 
indicate missing listener on leader broker to enable diagnosis of listener 
configuration issues.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add ENDPOINT_NOT_FOUND_ON_LEADER error code for missing listener
> 
>
> Key: KAFKA-6546
> URL: https://issues.apache.org/jira/browse/KAFKA-6546
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.1.0
>
>
> In 1,1, if an endpoint is available on the broker processing a metadata 
> request, but the corresponding listener is not available on the leader of a 
> partition, LEADER_NOT_AVAILABLE is returned (earlier versions returned 
> UNKNOWN_SERVER_ERROR). This could indicate broker misconfiguration where some 
> brokers are not configured with all listeners or it could indicate a 
> transient error when listeners are dynamically added, We want to treat the 
> error as a transient error to process dynamic updates, but we should notify 
> clients of the actual error. This change should be made when MetadataRequest 
> version is updated so that LEADER_NOT_AVAILABLE is returned to older clients.
> See 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration]
>  and  [https://github.com/apache/kafka/pull/4539] for details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7029) ReplicaVerificationTool should not use the deprecated SimpleConsumer

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508472#comment-16508472
 ] 

ASF GitHub Bot commented on KAFKA-7029:
---

omkreddy opened a new pull request #5188: KAFKA-7029: Update 
ReplicaVerificationTool to use Java Consumer
URL: https://github.com/apache/kafka/pull/5188
 
 
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ReplicaVerificationTool should not use the deprecated SimpleConsumer
> 
>
> Key: KAFKA-7029
> URL: https://issues.apache.org/jira/browse/KAFKA-7029
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>Assignee: Manikumar
>Priority: Major
>
> Unless there's a reason not to, the simplest would be to use KafkaConsumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6697) JBOD configured broker should not die if log directory is invalid

2018-06-11 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-6697:

Issue Type: Bug  (was: Improvement)

> JBOD configured broker should not die if log directory is invalid
> -
>
> Key: KAFKA-6697
> URL: https://issues.apache.org/jira/browse/KAFKA-6697
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
> Fix For: 2.0.0
>
>
> Currently JBOD configured broker will still die on startup if 
> dir.getCanonicalPath() throws IOException. We should mark such log directory 
> as offline and broker should still run if there is good disk.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7037) delete topic command replaces '+' from the topic name which leads incorrect topic deletion

2018-06-11 Thread Vahid Hashemian (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian reassigned KAFKA-7037:
--

Assignee: Vahid Hashemian

> delete topic command replaces '+' from the topic name which leads incorrect 
> topic deletion
> --
>
> Key: KAFKA-7037
> URL: https://issues.apache.org/jira/browse/KAFKA-7037
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0, 1.0.0
>Reporter: Sandeep Nemuri
>Assignee: Vahid Hashemian
>Priority: Major
>
> While executing a delete command kafka cli tool is removing the "+" symbol 
> and deleting the incorrect topic. In below case if  _"*test+topic"*_ is 
> deleted kafka deletes  _*testtopic.*_
> {code:java}
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --create --zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 
> --topic testtopic
> Created topic "testtopic".
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --zookeeper `hostname`:2181 --topic test+topic --delete
> Topic testtopic is marked for deletion.{code}
>  delete topic replaces '+' from the topic name  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6946) Keep the session id for incremental fetch when fetch responses are throttled

2018-06-11 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-6946.
-
Resolution: Fixed

> Keep the session id for incremental fetch when fetch responses are throttled 
> -
>
> Key: KAFKA-6946
> URL: https://issues.apache.org/jira/browse/KAFKA-6946
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.0.0
>Reporter: Jon Lee
>Priority: Major
>
> The current patch for KAFKA-6028 (KIP-219) sends a FetchResponse with 
> INVALID_SESSION_ID if the response needs to be throttled due to quota 
> violation. If it is for incremental fetch, this will make the client reset 
> its session and send a full fetch request next time. This is not a 
> correctness issue, but it may affect performance when fetches are throttled.
> In case of incremental fetch, a throttled response should use the same 
> session id as before so that the next unthrottled response can be in the same 
> session. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6860) NPE when reinitializeStateStores with eos enabled

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508424#comment-16508424
 ] 

ASF GitHub Bot commented on KAFKA-6860:
---

mjsax opened a new pull request #5187: KAFKA-6860: Fix NPE in Kafka Streams 
with EOS enabled
URL: https://github.com/apache/kafka/pull/5187
 
 
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> NPE when reinitializeStateStores with eos enabled
> -
>
> Key: KAFKA-6860
> URL: https://issues.apache.org/jira/browse/KAFKA-6860
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
> Environment: mac, kafka1.1
>Reporter: ko byoung kwon
>Assignee: Matthias J. Sax
>Priority: Major
> Fix For: 2.0.0
>
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> *Symptom*
>  With EOS enabled , Reinitializing stateStores get an NPE because checkpoint 
> is null.
> {code:java}
> 2018-05-02 18:05:17.156 ERROR 60836 --- [-StreamThread-1] 
> o.a.k.s.p.internals.StreamThread : stream-thread 
> [kafka-stream-application-d6ec1dfb-9b7f-42dd-8b28-899ff3d1ad98-StreamThread-1]
>  Encountered the following error during processing:
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions(AbstractStateManager.java:66)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:155)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:230)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94)
>  ~[kafka-streams-1.1.0.jar:na]
> {code}
> *How to reproduce*
> *configure as*
>  - changelog topic with short `retention.ms` and `delete` policy (just to 
> reproduce the symptom easily)
>  ex) retention.ms=3,cleanup.policy=delete 
>  - exaclty once semantic enabled
>  - no cleanup
> *Step*
>  - two task[0_0],[0,1] , two Spring Boot (assign was#1:task[0_0], 
> was#2:task[0_1])
>  - write some data each state store(changelog topic will soon erase those 
> messages. by short "retentin.ms")
>  - when was#2 is killed, then was#1 will restore task[0_1]'s data on its own 
> rocksDB
>  - In the process, it finds a checkpoint and an error 
> occurs.(AbstractStateManager #66)
> {code:java}
> // My code
> Map topicConfiguration = new HashMap<>();
> topicConfiguration.putIfAbsent("cleanup.policy", "delete");
> topicConfiguration.putIfAbsent("file.delete.delay.ms", "0");
> topicConfiguration.putIfAbsent("retention.ms", "3000");
> builder.stream(properties.getSourceTopic(),
>Consumed.with(Serdes.Long(), Serdes.String()))
>.groupByKey()
>.count(Materialized
>   . byte[]>>as(ORDER_STORE_NAME)
>   .withKeySerde(Serdes.Long())
>   .withValueSerde(Serdes.Long())
>   .withLoggingEnabled(topicConfiguration));
> {code}
> *Suggestion*
> When EOS is enabled, the checkpoint will be null.
>  I think , need to add some code to create a Checkpoint. 
>  As follows
> {code:java}
> // # At org.apache.kafka.streams.processor.internals.AbstractStateManager #66
> // # suggestion start
> if (checkpoint == null) {
> checkpoint = new OffsetCheckpoint(new File(baseDir, 
> CHECKPOINT_FILE_NAME));
> }
> // # suggestion end
> try {
> checkpoint.write(checkpointableOffsets);
> } catch (final IOException fatalException) {
> log.error("Failed to write offset checkpoint file to {} while 
> re-initializing {}: {}", checkpoint, stateStores, fatalException);
>  throw new StreamsException("Failed to reinitialize global store.", 
> 

[jira] [Commented] (KAFKA-7035) Kafka Processor's init() method sometimes is not called

2018-06-11 Thread Oleksandr Konopko (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508425#comment-16508425
 ] 

Oleksandr Konopko commented on KAFKA-7035:
--

ok, but processor constructor and processor.init() are not atomic piece of 
logic, right ?
 * data processed by A and B is not reprocessed by C and D
 * will check...

> Kafka Processor's init() method sometimes is not called
> ---
>
> Key: KAFKA-7035
> URL: https://issues.apache.org/jira/browse/KAFKA-7035
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Oleksandr Konopko
>Priority: Critical
> Attachments: TransformProcessor.java
>
>
> Scenario:
> 1. We have processing of Kafka Topic which is implemented with Processor API
> 2. We want to collect metrics (lets say just count number of processed 
> entities for simplicity)
> 3. How we tried to organize this
>  * process data with process() method and send it down the stream with context
>  * on each call of process() method update the counter
>  * schedule puctuate function which will send metric to special topic. Metric 
> is build with counter
> You can find the code (we removed all business sensitive code out of it, so 
> it should be easy to read) in attachment
>  
> Problematic Kafka Streams behaviour that i can see by logging every step:
> 1. We have 80 messages in the input topic
> 2. Kafka Streams creates 4 Processor instances. Lets name them: ProcessorA, 
> ProcessorB, ProcessorC and ProcessorD
> 3. ProcessorA and ProcessorB receive 1-5% of data. Data is processed 
> correctly, results are sent down the stream. Counter is upated
> 4. init() method was not called for ProcessorA and ProcessorB
> 5. ProcessorC and ProcessorD are created and they start to receive all the 
> rest of data. 95-99%
> 6. init() method is called for both ProcessorC and ProcessorD. It initiates 
> punctuation, which causes Metrics message be created and sent down the metric 
> stream periodically
> 7. ProcessorA and ProcessorB are closed. init() was never called for them. So 
> Metric entity was not sent to metrics topic
> 8. Processing is finished.
>  
> In the end:
> Expected:
>  * 80 entities were processed and sent to the Sink
>  * Metrics entities contain counters which sum up to 80
> Actual results:
>  * 80 entities were processed and sent to the Sink
>  * Metrics entities contain counters which sum up to some number 3-6% less 
> than 80, for example 786543
>  
> Problem:
>  * init() method call is not guaranteed
>  * there is no way to guarantee that all work was done by punctuate method 
> before close()
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6946) Keep the session id for incremental fetch when fetch responses are throttled

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508420#comment-16508420
 ] 

ASF GitHub Bot commented on KAFKA-6946:
---

lindong28 closed pull request #5164: KAFKA-6946: Keep the session id for 
incremental fetch when fetch responses are throttled
URL: https://github.com/apache/kafka/pull/5164
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/server/FetchSession.scala 
b/core/src/main/scala/kafka/server/FetchSession.scala
index 7a47780a135..68f79cace38 100644
--- a/core/src/main/scala/kafka/server/FetchSession.scala
+++ b/core/src/main/scala/kafka/server/FetchSession.scala
@@ -290,6 +290,12 @@ trait FetchContext extends Logging {
 
   def partitionsToLogString(partitions: util.Collection[TopicPartition]): 
String =
 FetchSession.partitionsToLogString(partitions, isTraceEnabled)
+
+  /**
+* Return an empty throttled response due to quota violation.
+*/
+  def getThrottledResponse(throttleTimeMs: Int): FetchResponse[Records] =
+new FetchResponse(Errors.NONE, new FetchSession.RESP_MAP, throttleTimeMs, 
INVALID_SESSION_ID)
 }
 
 /**
@@ -474,6 +480,21 @@ class IncrementalFetchContext(private val time: Time,
   }
 }
   }
+
+  override def getThrottledResponse(throttleTimeMs: Int): 
FetchResponse[Records] = {
+session.synchronized {
+  // Check to make sure that the session epoch didn't change in between
+  // creating this fetch context and generating this response.
+  val expectedEpoch = JFetchMetadata.nextEpoch(reqMetadata.epoch())
+  if (session.epoch != expectedEpoch) {
+info(s"Incremental fetch session ${session.id} expected epoch 
$expectedEpoch, but " +
+  s"got ${session.epoch}.  Possible duplicate request.")
+new FetchResponse(Errors.INVALID_FETCH_SESSION_EPOCH, new 
FetchSession.RESP_MAP, throttleTimeMs, session.id)
+  } else {
+new FetchResponse(Errors.NONE, new FetchSession.RESP_MAP, 
throttleTimeMs, session.id)
+  }
+}
+  }
 }
 
 case class LastUsedKey(val lastUsedMs: Long,
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6d9e3d115b8..f7e9ec98fdb 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -655,8 +655,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 quotas.request.throttle(request, requestThrottleTimeMs, 
sendResponse)
   }
   // If throttling is required, return an empty response.
-  unconvertedFetchResponse = new FetchResponse(Errors.NONE, new 
util.LinkedHashMap[TopicPartition,
-FetchResponse.PartitionData[Records]](), maxThrottleTimeMs, 
INVALID_SESSION_ID)
+  unconvertedFetchResponse = 
fetchContext.getThrottledResponse(maxThrottleTimeMs)
 } else {
   // Get the actual response. This will update the fetch context.
   unconvertedFetchResponse = 
fetchContext.updateAndGenerateResponseData(partitions)
diff --git a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala 
b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
index 84efa6b684d..b79692d69d9 100755
--- a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
@@ -201,25 +201,34 @@ class FetchSessionTest {
 assertEquals(Errors.INVALID_FETCH_SESSION_EPOCH,
   context6.updateAndGenerateResponseData(respData2).error())
 
+// Test generating a throttled response for the incremental fetch session
+val reqData7 = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
+val context7 = fetchManager.newContext(
+  new JFetchMetadata(resp2.sessionId(), 2), reqData7, EMPTY_PART_LIST, 
false)
+val resp7 = context7.getThrottledResponse(100)
+assertEquals(Errors.NONE, resp7.error())
+assertEquals(resp2.sessionId(), resp7.sessionId())
+assertEquals(100, resp7.throttleTimeMs())
+
 // Close the incremental fetch session.
 val prevSessionId = resp5.sessionId
 var nextSessionId = prevSessionId
 do {
-  val reqData7 = new util.LinkedHashMap[TopicPartition, 
FetchRequest.PartitionData]
-  reqData7.put(new TopicPartition("bar", 0), new 
FetchRequest.PartitionData(0, 0, 100))
-  reqData7.put(new TopicPartition("bar", 1), new 
FetchRequest.PartitionData(10, 0, 100))
-  val context7 = fetchManager.newContext(
-new JFetchMetadata(prevSessionId, FINAL_EPOCH), reqData7, 
EMPTY_PART_LIST, false)
-  assertEquals(classOf[SessionlessFetchContext], context7.getClass)
+  val reqData8 = new 

[jira] [Commented] (KAFKA-7035) Kafka Processor's init() method sometimes is not called

2018-06-11 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508402#comment-16508402
 ] 

Guozhang Wang commented on KAFKA-7035:
--

Hi [~akonopko], processor.init() should always be called when the task was 
initialized for the first time. I've just checked the code in 1.0.0 and 
confirmed it is still the case. 

BTW, could you elaborate the scenario why 1) four processors were created, and 
processorC and D takes over the data from processorA and B? Are there only two 
input partitions available? How many total num.threads existed for this 
application?

> Kafka Processor's init() method sometimes is not called
> ---
>
> Key: KAFKA-7035
> URL: https://issues.apache.org/jira/browse/KAFKA-7035
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Oleksandr Konopko
>Priority: Critical
> Attachments: TransformProcessor.java
>
>
> Scenario:
> 1. We have processing of Kafka Topic which is implemented with Processor API
> 2. We want to collect metrics (lets say just count number of processed 
> entities for simplicity)
> 3. How we tried to organize this
>  * process data with process() method and send it down the stream with context
>  * on each call of process() method update the counter
>  * schedule puctuate function which will send metric to special topic. Metric 
> is build with counter
> You can find the code (we removed all business sensitive code out of it, so 
> it should be easy to read) in attachment
>  
> Problematic Kafka Streams behaviour that i can see by logging every step:
> 1. We have 80 messages in the input topic
> 2. Kafka Streams creates 4 Processor instances. Lets name them: ProcessorA, 
> ProcessorB, ProcessorC and ProcessorD
> 3. ProcessorA and ProcessorB receive 1-5% of data. Data is processed 
> correctly, results are sent down the stream. Counter is upated
> 4. init() method was not called for ProcessorA and ProcessorB
> 5. ProcessorC and ProcessorD are created and they start to receive all the 
> rest of data. 95-99%
> 6. init() method is called for both ProcessorC and ProcessorD. It initiates 
> punctuation, which causes Metrics message be created and sent down the metric 
> stream periodically
> 7. ProcessorA and ProcessorB are closed. init() was never called for them. So 
> Metric entity was not sent to metrics topic
> 8. Processing is finished.
>  
> In the end:
> Expected:
>  * 80 entities were processed and sent to the Sink
>  * Metrics entities contain counters which sum up to 80
> Actual results:
>  * 80 entities were processed and sent to the Sink
>  * Metrics entities contain counters which sum up to some number 3-6% less 
> than 80, for example 786543
>  
> Problem:
>  * init() method call is not guaranteed
>  * there is no way to guarantee that all work was done by punctuate method 
> before close()
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6860) NPE when reinitializeStateStores with eos enabled

2018-06-11 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6860:
---
Fix Version/s: 2.0.0

> NPE when reinitializeStateStores with eos enabled
> -
>
> Key: KAFKA-6860
> URL: https://issues.apache.org/jira/browse/KAFKA-6860
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
> Environment: mac, kafka1.1
>Reporter: ko byoung kwon
>Assignee: Matthias J. Sax
>Priority: Major
> Fix For: 2.0.0
>
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> *Symptom*
>  With EOS enabled , Reinitializing stateStores get an NPE because checkpoint 
> is null.
> {code:java}
> 2018-05-02 18:05:17.156 ERROR 60836 --- [-StreamThread-1] 
> o.a.k.s.p.internals.StreamThread : stream-thread 
> [kafka-stream-application-d6ec1dfb-9b7f-42dd-8b28-899ff3d1ad98-StreamThread-1]
>  Encountered the following error during processing:
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions(AbstractStateManager.java:66)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:155)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:230)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94)
>  ~[kafka-streams-1.1.0.jar:na]
> {code}
> *How to reproduce*
> *configure as*
>  - changelog topic with short `retention.ms` and `delete` policy (just to 
> reproduce the symptom easily)
>  ex) retention.ms=3,cleanup.policy=delete 
>  - exaclty once semantic enabled
>  - no cleanup
> *Step*
>  - two task[0_0],[0,1] , two Spring Boot (assign was#1:task[0_0], 
> was#2:task[0_1])
>  - write some data each state store(changelog topic will soon erase those 
> messages. by short "retentin.ms")
>  - when was#2 is killed, then was#1 will restore task[0_1]'s data on its own 
> rocksDB
>  - In the process, it finds a checkpoint and an error 
> occurs.(AbstractStateManager #66)
> {code:java}
> // My code
> Map topicConfiguration = new HashMap<>();
> topicConfiguration.putIfAbsent("cleanup.policy", "delete");
> topicConfiguration.putIfAbsent("file.delete.delay.ms", "0");
> topicConfiguration.putIfAbsent("retention.ms", "3000");
> builder.stream(properties.getSourceTopic(),
>Consumed.with(Serdes.Long(), Serdes.String()))
>.groupByKey()
>.count(Materialized
>   . byte[]>>as(ORDER_STORE_NAME)
>   .withKeySerde(Serdes.Long())
>   .withValueSerde(Serdes.Long())
>   .withLoggingEnabled(topicConfiguration));
> {code}
> *Suggestion*
> When EOS is enabled, the checkpoint will be null.
>  I think , need to add some code to create a Checkpoint. 
>  As follows
> {code:java}
> // # At org.apache.kafka.streams.processor.internals.AbstractStateManager #66
> // # suggestion start
> if (checkpoint == null) {
> checkpoint = new OffsetCheckpoint(new File(baseDir, 
> CHECKPOINT_FILE_NAME));
> }
> // # suggestion end
> try {
> checkpoint.write(checkpointableOffsets);
> } catch (final IOException fatalException) {
> log.error("Failed to write offset checkpoint file to {} while 
> re-initializing {}: {}", checkpoint, stateStores, fatalException);
>  throw new StreamsException("Failed to reinitialize global store.", 
> fatalException);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6860) NPE when reinitializeStateStores with eos enabled

2018-06-11 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-6860:
--

Assignee: Matthias J. Sax

> NPE when reinitializeStateStores with eos enabled
> -
>
> Key: KAFKA-6860
> URL: https://issues.apache.org/jira/browse/KAFKA-6860
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
> Environment: mac, kafka1.1
>Reporter: ko byoung kwon
>Assignee: Matthias J. Sax
>Priority: Major
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> *Symptom*
>  With EOS enabled , Reinitializing stateStores get an NPE because checkpoint 
> is null.
> {code:java}
> 2018-05-02 18:05:17.156 ERROR 60836 --- [-StreamThread-1] 
> o.a.k.s.p.internals.StreamThread : stream-thread 
> [kafka-stream-application-d6ec1dfb-9b7f-42dd-8b28-899ff3d1ad98-StreamThread-1]
>  Encountered the following error during processing:
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions(AbstractStateManager.java:66)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:155)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:230)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94)
>  ~[kafka-streams-1.1.0.jar:na]
> {code}
> *How to reproduce*
> *configure as*
>  - changelog topic with short `retention.ms` and `delete` policy (just to 
> reproduce the symptom easily)
>  ex) retention.ms=3,cleanup.policy=delete 
>  - exaclty once semantic enabled
>  - no cleanup
> *Step*
>  - two task[0_0],[0,1] , two Spring Boot (assign was#1:task[0_0], 
> was#2:task[0_1])
>  - write some data each state store(changelog topic will soon erase those 
> messages. by short "retentin.ms")
>  - when was#2 is killed, then was#1 will restore task[0_1]'s data on its own 
> rocksDB
>  - In the process, it finds a checkpoint and an error 
> occurs.(AbstractStateManager #66)
> {code:java}
> // My code
> Map topicConfiguration = new HashMap<>();
> topicConfiguration.putIfAbsent("cleanup.policy", "delete");
> topicConfiguration.putIfAbsent("file.delete.delay.ms", "0");
> topicConfiguration.putIfAbsent("retention.ms", "3000");
> builder.stream(properties.getSourceTopic(),
>Consumed.with(Serdes.Long(), Serdes.String()))
>.groupByKey()
>.count(Materialized
>   . byte[]>>as(ORDER_STORE_NAME)
>   .withKeySerde(Serdes.Long())
>   .withValueSerde(Serdes.Long())
>   .withLoggingEnabled(topicConfiguration));
> {code}
> *Suggestion*
> When EOS is enabled, the checkpoint will be null.
>  I think , need to add some code to create a Checkpoint. 
>  As follows
> {code:java}
> // # At org.apache.kafka.streams.processor.internals.AbstractStateManager #66
> // # suggestion start
> if (checkpoint == null) {
> checkpoint = new OffsetCheckpoint(new File(baseDir, 
> CHECKPOINT_FILE_NAME));
> }
> // # suggestion end
> try {
> checkpoint.write(checkpointableOffsets);
> } catch (final IOException fatalException) {
> log.error("Failed to write offset checkpoint file to {} while 
> re-initializing {}: {}", checkpoint, stateStores, fatalException);
>  throw new StreamsException("Failed to reinitialize global store.", 
> fatalException);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6860) NPE when reinitializeStateStores with eos enabled

2018-06-11 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508317#comment-16508317
 ] 

Guozhang Wang commented on KAFKA-6860:
--

I see. Thanks for the explanation [~mjsax]. And your proposed fix makes sense 
to me. I think a more general solution would involve also fixing the double 
checkpointing for non-EOS case: today we checkpoint in `suspend` if EOS is not 
turned in and in `closeSuspended` always. So for EOS, we only checkpoint in 
`closeSuspended`, while in non EOS we checkpoint in both, hence we have 
unnecessarily written twice of the checkpoints when closing. But for this 
general fix, I think it may be better to consider fixing with some refactoring 
on the ProcessorStateManager code, and hence not necessarily to be included for 
this JIRA.

> NPE when reinitializeStateStores with eos enabled
> -
>
> Key: KAFKA-6860
> URL: https://issues.apache.org/jira/browse/KAFKA-6860
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
> Environment: mac, kafka1.1
>Reporter: ko byoung kwon
>Priority: Major
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> *Symptom*
>  With EOS enabled , Reinitializing stateStores get an NPE because checkpoint 
> is null.
> {code:java}
> 2018-05-02 18:05:17.156 ERROR 60836 --- [-StreamThread-1] 
> o.a.k.s.p.internals.StreamThread : stream-thread 
> [kafka-stream-application-d6ec1dfb-9b7f-42dd-8b28-899ff3d1ad98-StreamThread-1]
>  Encountered the following error during processing:
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions(AbstractStateManager.java:66)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:155)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:230)
>  ~[kafka-streams-1.1.0.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94)
>  ~[kafka-streams-1.1.0.jar:na]
> {code}
> *How to reproduce*
> *configure as*
>  - changelog topic with short `retention.ms` and `delete` policy (just to 
> reproduce the symptom easily)
>  ex) retention.ms=3,cleanup.policy=delete 
>  - exaclty once semantic enabled
>  - no cleanup
> *Step*
>  - two task[0_0],[0,1] , two Spring Boot (assign was#1:task[0_0], 
> was#2:task[0_1])
>  - write some data each state store(changelog topic will soon erase those 
> messages. by short "retentin.ms")
>  - when was#2 is killed, then was#1 will restore task[0_1]'s data on its own 
> rocksDB
>  - In the process, it finds a checkpoint and an error 
> occurs.(AbstractStateManager #66)
> {code:java}
> // My code
> Map topicConfiguration = new HashMap<>();
> topicConfiguration.putIfAbsent("cleanup.policy", "delete");
> topicConfiguration.putIfAbsent("file.delete.delay.ms", "0");
> topicConfiguration.putIfAbsent("retention.ms", "3000");
> builder.stream(properties.getSourceTopic(),
>Consumed.with(Serdes.Long(), Serdes.String()))
>.groupByKey()
>.count(Materialized
>   . byte[]>>as(ORDER_STORE_NAME)
>   .withKeySerde(Serdes.Long())
>   .withValueSerde(Serdes.Long())
>   .withLoggingEnabled(topicConfiguration));
> {code}
> *Suggestion*
> When EOS is enabled, the checkpoint will be null.
>  I think , need to add some code to create a Checkpoint. 
>  As follows
> {code:java}
> // # At org.apache.kafka.streams.processor.internals.AbstractStateManager #66
> // # suggestion start
> if (checkpoint == null) {
> checkpoint = new OffsetCheckpoint(new File(baseDir, 
> CHECKPOINT_FILE_NAME));
> }
> // # suggestion end
> try {
> checkpoint.write(checkpointableOffsets);
> } catch (final IOException fatalException) {
> log.error("Failed to write offset checkpoint file to {} while 
> re-initializing {}: {}", checkpoint, stateStores, fatalException);
>  throw new StreamsException("Failed to reinitialize global store.", 
> fatalException);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6889) Fix Streams system test to only specify used log files

2018-06-11 Thread Stanislav Kozlovski (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-6889:
---
Attachment: (was: Screen Shot 2018-06-11 at 7.15.02 PM.png)

> Fix Streams system test to only specify used log files
> --
>
> Key: KAFKA-6889
> URL: https://issues.apache.org/jira/browse/KAFKA-6889
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: beginner, easyfix
>
> In `streams.py` the class `StreamsTestBaseService` lists many log files that 
> are only used in certain tests. For all tests, that do not use those log 
> files, during the test run WARN messages are produced:
> {noformat}
> [WARNING - 2018-05-09 13:51:22,065 - test - compress_service_logs - 
> lineno:131]: Error compressing log /mnt/streams/streams.stdout.0-6: service 
>  ['worker7']>: ubuntu@worker7: Command 'cd "$(dirname 
> /mnt/streams/streams.stdout.0-6)" && f="$(basename 
> /mnt/streams/streams.stdout.0-6)" && tar czf "$f.tgz" "$f" && rm -rf 
> /mnt/streams/streams.stdout.0-6' returned non-zero exit status 2. Remote 
> error message: tar: streams.stdout.0-6: Cannot stat: No such file or directory
> tar: Exiting with failure status due to previous errors
> {noformat}
> Those message spam the output and might be miss leading. We should update the 
> Streams system tests accordingly such that each test only specifies the 
> log-file names it actually uses to avoid the WARN message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6889) Fix Streams system test to only specify used log files

2018-06-11 Thread Stanislav Kozlovski (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508285#comment-16508285
 ] 

Stanislav Kozlovski commented on KAFKA-6889:


What do you mean the Jenkins has read access? 

I personally cannot open the link sent by Matthias -  https://imgur.com/yt7Su2e

> Fix Streams system test to only specify used log files
> --
>
> Key: KAFKA-6889
> URL: https://issues.apache.org/jira/browse/KAFKA-6889
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: beginner, easyfix
>
> In `streams.py` the class `StreamsTestBaseService` lists many log files that 
> are only used in certain tests. For all tests, that do not use those log 
> files, during the test run WARN messages are produced:
> {noformat}
> [WARNING - 2018-05-09 13:51:22,065 - test - compress_service_logs - 
> lineno:131]: Error compressing log /mnt/streams/streams.stdout.0-6: service 
>  ['worker7']>: ubuntu@worker7: Command 'cd "$(dirname 
> /mnt/streams/streams.stdout.0-6)" && f="$(basename 
> /mnt/streams/streams.stdout.0-6)" && tar czf "$f.tgz" "$f" && rm -rf 
> /mnt/streams/streams.stdout.0-6' returned non-zero exit status 2. Remote 
> error message: tar: streams.stdout.0-6: Cannot stat: No such file or directory
> tar: Exiting with failure status due to previous errors
> {noformat}
> Those message spam the output and might be miss leading. We should update the 
> Streams system tests accordingly such that each test only specifies the 
> log-file names it actually uses to avoid the WARN message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6889) Fix Streams system test to only specify used log files

2018-06-11 Thread Stanislav Kozlovski (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-6889:
---
Attachment: Screen Shot 2018-06-11 at 7.15.02 PM.png

> Fix Streams system test to only specify used log files
> --
>
> Key: KAFKA-6889
> URL: https://issues.apache.org/jira/browse/KAFKA-6889
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: beginner, easyfix
> Attachments: Screen Shot 2018-06-11 at 7.15.02 PM.png
>
>
> In `streams.py` the class `StreamsTestBaseService` lists many log files that 
> are only used in certain tests. For all tests, that do not use those log 
> files, during the test run WARN messages are produced:
> {noformat}
> [WARNING - 2018-05-09 13:51:22,065 - test - compress_service_logs - 
> lineno:131]: Error compressing log /mnt/streams/streams.stdout.0-6: service 
>  ['worker7']>: ubuntu@worker7: Command 'cd "$(dirname 
> /mnt/streams/streams.stdout.0-6)" && f="$(basename 
> /mnt/streams/streams.stdout.0-6)" && tar czf "$f.tgz" "$f" && rm -rf 
> /mnt/streams/streams.stdout.0-6' returned non-zero exit status 2. Remote 
> error message: tar: streams.stdout.0-6: Cannot stat: No such file or directory
> tar: Exiting with failure status due to previous errors
> {noformat}
> Those message spam the output and might be miss leading. We should update the 
> Streams system tests accordingly such that each test only specifies the 
> log-file names it actually uses to avoid the WARN message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6889) Fix Streams system test to only specify used log files

2018-06-11 Thread Stanislav Kozlovski (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-6889:
---
Attachment: Screen Shot 2018-06-11 at 7.15.02 PM.png

> Fix Streams system test to only specify used log files
> --
>
> Key: KAFKA-6889
> URL: https://issues.apache.org/jira/browse/KAFKA-6889
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: beginner, easyfix
> Attachments: Screen Shot 2018-06-11 at 7.15.02 PM.png
>
>
> In `streams.py` the class `StreamsTestBaseService` lists many log files that 
> are only used in certain tests. For all tests, that do not use those log 
> files, during the test run WARN messages are produced:
> {noformat}
> [WARNING - 2018-05-09 13:51:22,065 - test - compress_service_logs - 
> lineno:131]: Error compressing log /mnt/streams/streams.stdout.0-6: service 
>  ['worker7']>: ubuntu@worker7: Command 'cd "$(dirname 
> /mnt/streams/streams.stdout.0-6)" && f="$(basename 
> /mnt/streams/streams.stdout.0-6)" && tar czf "$f.tgz" "$f" && rm -rf 
> /mnt/streams/streams.stdout.0-6' returned non-zero exit status 2. Remote 
> error message: tar: streams.stdout.0-6: Cannot stat: No such file or directory
> tar: Exiting with failure status due to previous errors
> {noformat}
> Those message spam the output and might be miss leading. We should update the 
> Streams system tests accordingly such that each test only specifies the 
> log-file names it actually uses to avoid the WARN message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6889) Fix Streams system test to only specify used log files

2018-06-11 Thread Stanislav Kozlovski (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski updated KAFKA-6889:
---
Attachment: (was: Screen Shot 2018-06-11 at 7.15.02 PM.png)

> Fix Streams system test to only specify used log files
> --
>
> Key: KAFKA-6889
> URL: https://issues.apache.org/jira/browse/KAFKA-6889
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: beginner, easyfix
> Attachments: Screen Shot 2018-06-11 at 7.15.02 PM.png
>
>
> In `streams.py` the class `StreamsTestBaseService` lists many log files that 
> are only used in certain tests. For all tests, that do not use those log 
> files, during the test run WARN messages are produced:
> {noformat}
> [WARNING - 2018-05-09 13:51:22,065 - test - compress_service_logs - 
> lineno:131]: Error compressing log /mnt/streams/streams.stdout.0-6: service 
>  ['worker7']>: ubuntu@worker7: Command 'cd "$(dirname 
> /mnt/streams/streams.stdout.0-6)" && f="$(basename 
> /mnt/streams/streams.stdout.0-6)" && tar czf "$f.tgz" "$f" && rm -rf 
> /mnt/streams/streams.stdout.0-6' returned non-zero exit status 2. Remote 
> error message: tar: streams.stdout.0-6: Cannot stat: No such file or directory
> tar: Exiting with failure status due to previous errors
> {noformat}
> Those message spam the output and might be miss leading. We should update the 
> Streams system tests accordingly such that each test only specifies the 
> log-file names it actually uses to avoid the WARN message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7038) Support AdminClient Example

2018-06-11 Thread darion yaphet (JIRA)
darion yaphet created KAFKA-7038:


 Summary: Support AdminClient Example
 Key: KAFKA-7038
 URL: https://issues.apache.org/jira/browse/KAFKA-7038
 Project: Kafka
  Issue Type: New Feature
  Components: admin
Reporter: darion yaphet






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6889) Fix Streams system test to only specify used log files

2018-06-11 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508243#comment-16508243
 ] 

Guozhang Wang commented on KAFKA-6889:
--

Confluent jenkins has read access but not write access: i.e. you can check the 
results of a certain job, but cannot create new jobs.

> Fix Streams system test to only specify used log files
> --
>
> Key: KAFKA-6889
> URL: https://issues.apache.org/jira/browse/KAFKA-6889
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: beginner, easyfix
>
> In `streams.py` the class `StreamsTestBaseService` lists many log files that 
> are only used in certain tests. For all tests, that do not use those log 
> files, during the test run WARN messages are produced:
> {noformat}
> [WARNING - 2018-05-09 13:51:22,065 - test - compress_service_logs - 
> lineno:131]: Error compressing log /mnt/streams/streams.stdout.0-6: service 
>  ['worker7']>: ubuntu@worker7: Command 'cd "$(dirname 
> /mnt/streams/streams.stdout.0-6)" && f="$(basename 
> /mnt/streams/streams.stdout.0-6)" && tar czf "$f.tgz" "$f" && rm -rf 
> /mnt/streams/streams.stdout.0-6' returned non-zero exit status 2. Remote 
> error message: tar: streams.stdout.0-6: Cannot stat: No such file or directory
> tar: Exiting with failure status due to previous errors
> {noformat}
> Those message spam the output and might be miss leading. We should update the 
> Streams system tests accordingly such that each test only specifies the 
> log-file names it actually uses to avoid the WARN message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7037) delete topic command replaces '+' from the topic name which leads incorrect topic deletion

2018-06-11 Thread Sandeep Nemuri (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sandeep Nemuri updated KAFKA-7037:
--
Description: 
While executing a delete command kafka cli tool is removing the "+" symbol and 
deleting the incorrect topic. In below case if  _"*test+topic"*_ is deleted 
kafka deletes  _*testtopic.*_
{code:java}
[kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create 
--zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 --topic 
testtopic
Created topic "testtopic".
[kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
--zookeeper `hostname`:2181 --topic test+topic --delete
Topic testtopic is marked for deletion.{code}
 delete topic replaces '+' from the topic name  

  was:
While executing a delete command kafka cli tool is removing the "+" symbol and 
deleting the incorrect topic. In below case if  _"*test+topic"*_ is deleted 
kafka deletes  _*testtopic.*_
{code:java}
[kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create 
--zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 --topic 
testtopic
Created topic "testtopic".
[kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
--zookeeper `hostname`:2181 --topic test+topic --delete
Topic testtopic is marked for deletion.{code}
 delete topic replaces '+' and few other special characters from the topic name 
: 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/consumer/TopicFilter.scala#L28-L33]

 


> delete topic command replaces '+' from the topic name which leads incorrect 
> topic deletion
> --
>
> Key: KAFKA-7037
> URL: https://issues.apache.org/jira/browse/KAFKA-7037
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0, 1.0.0
>Reporter: Sandeep Nemuri
>Priority: Major
>
> While executing a delete command kafka cli tool is removing the "+" symbol 
> and deleting the incorrect topic. In below case if  _"*test+topic"*_ is 
> deleted kafka deletes  _*testtopic.*_
> {code:java}
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --create --zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 
> --topic testtopic
> Created topic "testtopic".
> [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
> --zookeeper `hostname`:2181 --topic test+topic --delete
> Topic testtopic is marked for deletion.{code}
>  delete topic replaces '+' from the topic name  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6788) Grouping consumer requests per consumer coordinator in admin client

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508215#comment-16508215
 ] 

ASF GitHub Bot commented on KAFKA-6788:
---

cyrusv closed pull request #4878: KAFKA-6788: Combine queries for describe and 
delete groups in AdminCl…
URL: https://github.com/apache/kafka/pull/4878
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index fa3f943555b..cd79453d29a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -2252,14 +2252,17 @@ public DescribeConsumerGroupsResult 
describeConsumerGroups(final Collection describedGroupIds = new HashSet<>();
+
 for (final Map.Entry> entry : futures.entrySet()) {
 // skip sending request for those futures that already failed.
 if (entry.getValue().isCompletedExceptionally())
 continue;
 
 final String groupId = entry.getKey();
+if (describedGroupIds.contains(groupId)) {
+continue;
+}
 
 final long startFindCoordinatorMs = time.milliseconds();
 final long deadline = calcDeadlineMs(startFindCoordinatorMs, 
options.timeoutMs());
@@ -2274,53 +2277,82 @@ public DescribeConsumerGroupsResult 
describeConsumerGroups(final Collection future = 
futures.get(groupId);
-final DescribeGroupsResponse.GroupMetadata 
groupMetadata = response.groups().get(groupId);
+final Set groupIdsToDescribe = new 
HashSet<>();
+for (ListGroupsResponse.Group group : 
listResponse.groups()) {
+groupIdsToDescribe.add(group.groupId());
+}
 
-final Errors groupError = groupMetadata.error();
-if (groupError != Errors.NONE) {
-// TODO: KAFKA-6789, we can retry based on the 
error code
-
future.completeExceptionally(groupError.exception());
-} else {
-final String protocolType = 
groupMetadata.protocolType();
-if 
(protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) 
{
-final 
List members = groupMetadata.members();
-final List consumers = 
new ArrayList<>(members.size());
-
-for (DescribeGroupsResponse.GroupMember 
groupMember : members) {
-final PartitionAssignor.Assignment 
assignment =
-
ConsumerProtocol.deserializeAssignment(
-
ByteBuffer.wrap(Utils.readBytes(groupMember.memberAssignment(;
-
-final MemberDescription 
memberDescription =
-new MemberDescription(
-groupMember.memberId(),
-groupMember.clientId(),
-
groupMember.clientHost(),
-new 
MemberAssignment(assignment.partitions()));
-consumers.add(memberDescription);
+runnable.call(new Call("describeConsumerGroups", 
deadline, new ConstantNodeIdProvider(nodeId)) {
+
+@Override
+AbstractRequest.Builder createRequest(int 
timeoutMs) {
+return new 
DescribeGroupsRequest.Builder(Collections.singletonList(groupId));
+}
+
+@Override
+void handleResponse(AbstractResponse 
abstractResponse) {
+final DescribeGroupsResponse response = 
(DescribeGroupsResponse) abstractResponse;
+for (String describedCandidate : 
groupIdsToDescribe) {
+if 
(response.groups().containsKey(describedCandidate)) {
+
describedGroupIds.add(describedCandidate);
+}
+

[jira] [Commented] (KAFKA-7033) Modify AbstractOptions's timeoutMs as Long type

2018-06-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508205#comment-16508205
 ] 

ASF GitHub Bot commented on KAFKA-7033:
---

darionyaphet opened a new pull request #5186: [KAFKA-7033] Modify 
AbstractOptions's timeoutMs as Long type
URL: https://github.com/apache/kafka/pull/5186
 
 
   [KAFKA-7033 | Modify AbstractOptions's timeoutMs as Long 
type](https://issues.apache.org/jira/browse/KAFKA-7033)
   
   Currently AbstractOptions's timeoutMs is Integer and using Long to represent 
timeout Millisecond maybe better .
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Modify AbstractOptions's timeoutMs as Long type
> ---
>
> Key: KAFKA-7033
> URL: https://issues.apache.org/jira/browse/KAFKA-7033
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 1.1.0
>Reporter: darion yaphet
>Priority: Minor
>
> Currently AbstractOptions's timeoutMs is Integer and using Long  to represent 
> timeout Millisecond maybe better .



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-3042:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Moving this out to 2.1.0 since there is no fix yet.

> updateIsr should stop after failed several times due to zkVersion issue
> ---
>
> Key: KAFKA-3042
> URL: https://issues.apache.org/jira/browse/KAFKA-3042
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.10.0.0
> Environment: jdk 1.7
> centos 6.4
>Reporter: Jiahongchao
>Assignee: Dong Lin
>Priority: Major
>  Labels: reliability
> Fix For: 2.1.0
>
> Attachments: controller.log, server.log.2016-03-23-01, 
> state-change.log
>
>
> sometimes one broker may repeatly log
> "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR"
> I think this is because the broker consider itself as the leader in fact it's 
> a follower.
> So after several failed tries, it need to find out who is the leader



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4682) Committed offsets should not be deleted if a consumer is still active (KIP-211)

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-4682:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Moving this out to 2.1.0.

> Committed offsets should not be deleted if a consumer is still active 
> (KIP-211)
> ---
>
> Key: KAFKA-4682
> URL: https://issues.apache.org/jira/browse/KAFKA-4682
> Project: Kafka
>  Issue Type: Bug
>Reporter: James Cheng
>Assignee: Vahid Hashemian
>Priority: Major
>  Labels: kip
> Fix For: 2.1.0
>
>
> Kafka will delete committed offsets that are older than 
> offsets.retention.minutes
> If there is an active consumer on a low traffic partition, it is possible 
> that Kafka will delete the committed offset for that consumer. Once the 
> offset is deleted, a restart or a rebalance of that consumer will cause the 
> consumer to not find any committed offset and start consuming from 
> earliest/latest (depending on auto.offset.reset). I'm not sure, but a broker 
> failover might also cause you to start reading from auto.offset.reset (due to 
> broker restart, or coordinator failover).
> I think that Kafka should only delete offsets for inactive consumers. The 
> timer should only start after a consumer group goes inactive. For example, if 
> a consumer group goes inactive, then after 1 week, delete the offsets for 
> that consumer group. This is a solution that [~junrao] mentioned in 
> https://issues.apache.org/jira/browse/KAFKA-3806?focusedCommentId=15323521=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15323521
> The current workarounds are to:
> # Commit an offset on every partition you own on a regular basis, making sure 
> that it is more frequent than offsets.retention.minutes (a broker-side 
> setting that a consumer might not be aware of)
> or
> # Turn the value of offsets.retention.minutes up really really high. You have 
> to make sure it is higher than any valid low-traffic rate that you want to 
> support. For example, if you want to support a topic where someone produces 
> once a month, you would have to set offsetes.retention.mintues to 1 month. 
> or
> # Turn on enable.auto.commit (this is essentially #1, but easier to 
> implement).
> None of these are ideal. 
> #1 can be spammy. It requires your consumers know something about how the 
> brokers are configured. Sometimes it is out of your control. Mirrormaker, for 
> example, only commits offsets on partitions where it receives data. And it is 
> duplication that you need to put into all of your consumers.
> #2 has disk-space impact on the broker (in __consumer_offsets) as well as 
> memory-size on the broker (to answer OffsetFetch).
> #3 I think has the potential for message loss (the consumer might commit on 
> messages that are not yet fully processed)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5054) ChangeLoggingKeyValueByteStore delete and putIfAbsent should be synchronized

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-5054:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on this for a long time, moving out to 2.1.0.

> ChangeLoggingKeyValueByteStore delete and putIfAbsent should be synchronized
> 
>
> Key: KAFKA-5054
> URL: https://issues.apache.org/jira/browse/KAFKA-5054
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
>Priority: Critical
> Fix For: 2.1.0
>
>
> {{putIfAbsent}} and {{delete}} should be synchronized as they involve at 
> least 2 operations on the underlying store and may result in inconsistent 
> results if someone were to query via IQ



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3190) KafkaProducer should not invoke callback in send()

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-3190:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on the PR for several months, moving this out 
to 2.1.0.

> KafkaProducer should not invoke callback in send()
> --
>
> Key: KAFKA-3190
> URL: https://issues.apache.org/jira/browse/KAFKA-3190
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.9.0.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Critical
> Fix For: 2.1.0
>
>
> Currently KafkaProducer will invoke callback.onComplete() if it receives an 
> ApiException during send(). This breaks the guarantee that callback will be 
> invoked in order. It seems ApiException in send() only comes from metadata 
> refresh. If so, we can probably simply throw it instead of invoking 
> callback().



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4701) Allow kafka brokers to dynamically reload truststore without restarting.

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-4701:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

We have added dynamic update of truststore using AdminClient/kafka-configs.sh 
with new truststore files. Moving this out to 2.1.0 to see if we need to 
support updates without filename changes.

> Allow kafka brokers to dynamically reload truststore without restarting.
> 
>
> Key: KAFKA-4701
> URL: https://issues.apache.org/jira/browse/KAFKA-4701
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Reporter: Allen Xiang
>Priority: Major
>  Labels: security
> Fix For: 2.1.0
>
>
> Right now in order to add SSL clients(update broker truststores), a rolling 
> restart of all brokers is required. This is very time consuming and 
> unnecessary. A dynamic truststore manager is needed to reload truststore from 
> file system without restarting brokers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4808) send of null key to a compacted topic should throw error back to user

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-4808:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on the PR for several months, moving this out 
to 2.1.0.

> send of null key to a compacted topic should throw error back to user
> -
>
> Key: KAFKA-4808
> URL: https://issues.apache.org/jira/browse/KAFKA-4808
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.10.2.0
>Reporter: Ismael Juma
>Assignee: Mayuresh Gharat
>Priority: Major
> Fix For: 2.1.0
>
>
> If a message with a null key is produced to a compacted topic, the broker 
> returns `CorruptRecordException`, which is a retriable exception. As such, 
> the producer keeps retrying until retries are exhausted or request.timeout.ms 
> expires and eventually throws a TimeoutException. This is confusing and not 
> user-friendly.
> We should throw a meaningful error back to the user. From an implementation 
> perspective, we would have to use a non retriable error code to avoid this 
> issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4850) RocksDb cannot use Bloom Filters

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-4850:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on the PR for several months, moving out to 
2.1.0.

> RocksDb cannot use Bloom Filters
> 
>
> Key: KAFKA-4850
> URL: https://issues.apache.org/jira/browse/KAFKA-4850
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Eno Thereska
>Assignee: Bharat Viswanadham
>Priority: Major
> Fix For: 2.1.0
>
>
> Bloom Filters would speed up RocksDb lookups. However they currently do not 
> work in RocksDb 5.0.2. This has been fixed in trunk, but we'll have to wait 
> until that is released and tested. 
> Then we can add the line in RocksDbStore.java in openDb:
> tableConfig.setFilter(new BloomFilter(10));



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3297) More optimally balanced partition assignment strategy (new consumer)

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-3297:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on the PR for several months, moving this out 
to 2.1.0.

> More optimally balanced partition assignment strategy (new consumer)
> 
>
> Key: KAFKA-3297
> URL: https://issues.apache.org/jira/browse/KAFKA-3297
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Andrew Olson
>Assignee: Andrew Olson
>Priority: Major
> Fix For: 2.1.0
>
>
> While the roundrobin partition assignment strategy is an improvement over the 
> range strategy, when the consumer topic subscriptions are not identical 
> (previously disallowed but will be possible as of KAFKA-2172) it can produce 
> heavily skewed assignments. As suggested 
> [here|https://issues.apache.org/jira/browse/KAFKA-2172?focusedCommentId=14530767=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14530767]
>  it would be nice to have a strategy that attempts to assign an equal number 
> of partitions to each consumer in a group, regardless of how similar their 
> individual topic subscriptions are. We can accomplish this by tracking the 
> number of partitions assigned to each consumer, and having the partition 
> assignment loop assign each partition to a consumer interested in that topic 
> with the least number of partitions assigned. 
> Additionally, we can optimize the distribution fairness by adjusting the 
> partition assignment order:
> * Topics with fewer consumers are assigned first.
> * In the event of a tie for least consumers, the topic with more partitions 
> is assigned first.
> The general idea behind these two rules is to keep the most flexible 
> assignment choices available as long as possible by starting with the most 
> constrained partitions/consumers.
> This JIRA addresses the new consumer. For the original high-level consumer, 
> see KAFKA-2435.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5952) Refactor Consumer Fetcher metrics

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-5952:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on the PR for several months, moving this out 
to 2.1.0.

> Refactor Consumer Fetcher metrics
> -
>
> Key: KAFKA-5952
> URL: https://issues.apache.org/jira/browse/KAFKA-5952
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: James Cheng
>Assignee: James Cheng
>Priority: Major
> Fix For: 2.1.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5951) Autogenerate Producer RecordAccumulator metrics

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-5951:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on this for several months, moving this out to 
2.1.0.

> Autogenerate Producer RecordAccumulator metrics
> ---
>
> Key: KAFKA-5951
> URL: https://issues.apache.org/jira/browse/KAFKA-5951
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: James Cheng
>Assignee: James Cheng
>Priority: Major
> Fix For: 2.1.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5403) Transactions system test should dedup consumed messages by offset

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-5403:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on this for several months, moving this out to 
2.1.0.

> Transactions system test should dedup consumed messages by offset
> -
>
> Key: KAFKA-5403
> URL: https://issues.apache.org/jira/browse/KAFKA-5403
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
>Reporter: Apurva Mehta
>Assignee: Apurva Mehta
>Priority: Major
> Fix For: 2.1.0
>
>
> In KAFKA-5396, we saw that the consumers which verify the data in multiple 
> topics could read the same offsets multiple times, for instance when a 
> rebalance happens. 
> This would detect spurious duplicates, causing the test to fail. We should 
> dedup the consumed messages by offset and only fail the test if we have 
> duplicate values for a if for a unique set of offsets.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4862) Kafka client connect to a shutdown node will block for a long time

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-4862:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

The KIP associated with this JIRA has not been accepted yet, moving out to 2.1.0

> Kafka client connect to a shutdown node will block for a long time
> --
>
> Key: KAFKA-4862
> URL: https://issues.apache.org/jira/browse/KAFKA-4862
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0, 0.10.2.0
>Reporter: Pengwei
>Assignee: Pengwei
>Priority: Major
> Fix For: 2.1.0
>
>
> Currently in our test env, we found after one of the broker node crash(reboot 
> or os crash), the client maybe still connecting to the crash node to send 
> metadata request or other request, and it need about several  minutes to 
> aware the connection is timeout then try another node to connect to send the 
> request.  Then the client may still not aware the metadata change after 
> several minutes.
> We don't have a connection timeout for the network client, we should add a 
> connection timeout for the client



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4665) Inconsistent handling of non-existing topics in offset fetch handling

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-4665:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on this for several months, moving this out to 
2.1.0.

> Inconsistent handling of non-existing topics in offset fetch handling
> -
>
> Key: KAFKA-4665
> URL: https://issues.apache.org/jira/browse/KAFKA-4665
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core
>Reporter: Jason Gustafson
>Assignee: Vahid Hashemian
>Priority: Major
> Fix For: 2.1.0
>
>
> For version 0 of the offset fetch API, the broker returns 
> UNKNOWN_TOPIC_OR_PARTITION for any topics/partitions which do not exist at 
> the time of fetching. In later versions, we skip this check. We do, however, 
> continue to return UNKNOWN_TOPIC_OR_PARTITION for authorization errors (i.e. 
> if the principal does not have Describe access to the corresponding topic). 
> We should probably make this behavior consistent across versions.
> Note also that currently the consumer raises {{KafkaException}} when it 
> encounters an UNKNOWN_TOPIC_OR_PARTITION error in the offset fetch response, 
> which is inconsistent with how we usually handle this error. This probably 
> doesn't cause any problems currently only because of the inconsistency 
> mentioned in the first paragraph above.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4307) Inconsistent parameters between console producer and consumer

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-4307:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on this for several months, moving this out to 
2.1.0.

> Inconsistent parameters between console producer and consumer
> -
>
> Key: KAFKA-4307
> URL: https://issues.apache.org/jira/browse/KAFKA-4307
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.10.1.0
>Reporter: Gwen Shapira
>Assignee: Balint Molnar
>Priority: Major
>  Labels: newbie
> Fix For: 2.1.0
>
>
> kafka-console-producer uses --broker-list while kafka-console-consumer uses 
> --bootstrap-server.
> Let's add --bootstrap-server to the producer for some consistency?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4249) Document how to customize GC logging options for broker

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-4249:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on this for several months, moving this out to 
2.1.0.

> Document how to customize GC logging options for broker
> ---
>
> Key: KAFKA-4249
> URL: https://issues.apache.org/jira/browse/KAFKA-4249
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Affects Versions: 0.10.0.1
>Reporter: Jim Hoagland
>Assignee: Tom Bentley
>Priority: Major
> Fix For: 2.1.0
>
>
> We wanted to enable GC logging for Kafka broker and saw that you can set 
> GC_LOG_ENABLED=true.  However, this didn't do what we wanted.  For example, 
> the GC log will be overwritten every time the broker gets restarted.  It 
> wasn't clear how we could do that (no documentation of it that I can find), 
> so I did some research by looking at the source code and did some testing and 
> found that KAFKA_GC_LOG_OPTS could be set with alternate JVM options prior to 
> starting broker.  I posted my solution to StackOverflow:
>   
> http://stackoverflow.com/questions/39854424/how-to-enable-gc-logging-for-apache-kafka-brokers-while-preventing-log-file-ove
> (feel free to critique)
> That solution is now public, but it seems like the Kafka documentation should 
> say how to do this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3554) Generate actual data with specific compression ratio and add multi-thread support in the ProducerPerformance tool.

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-3554:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on this for several weeks, moving this out to 
2.1.0.

> Generate actual data with specific compression ratio and add multi-thread 
> support in the ProducerPerformance tool.
> --
>
> Key: KAFKA-3554
> URL: https://issues.apache.org/jira/browse/KAFKA-3554
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.1
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
>Priority: Major
> Fix For: 2.1.0
>
>
> Currently the ProducerPerformance always generate the payload with same 
> bytes. This does not quite well to test the compressed data because the 
> payload is extremely compressible no matter how big the payload is.
> We can make some changes to make it more useful for compressed messages. 
> Currently I am generating the payload containing integer from a given range. 
> By adjusting the range of the integers, we can get different compression 
> ratios. 
> API wise, we can either let user to specify the integer range or the expected 
> compression ratio (we will do some probing to get the corresponding range for 
> the users)
> Besides that, in many cases, it is useful to have multiple producer threads 
> when the producer threads themselves are bottleneck. Admittedly people can 
> run multiple ProducerPerformance to achieve similar result, but it is still 
> different from the real case when people actually use the producer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4203) Java producer default max message size does not align with broker default

2018-06-11 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-4203:
--
Fix Version/s: (was: 2.0.0)
   2.1.0

Since there has been no activity on this for several months, moving this out to 
2.1.0.

> Java producer default max message size does not align with broker default
> -
>
> Key: KAFKA-4203
> URL: https://issues.apache.org/jira/browse/KAFKA-4203
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.1
>Reporter: Grant Henke
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 2.1.0
>
>
> The Java producer sets max.request.size = 1048576 (the base 2 version of 1 MB 
> (MiB))
> The broker sets max.message.bytes = 112 (the base 10 value of 1 MB + 12 
> bytes for overhead)
> This means that by default the producer can try to produce messages larger 
> than the broker will accept resulting in RecordTooLargeExceptions.
> There were not similar issues in the old producer because it sets 
> max.message.size = 100 (the base 10 value of 1 MB)
> I propose we increase the broker default for max.message.bytes to 1048588 
> (the base 2 value of 1 MB (MiB) + 12 bytes for overhead) so that any message 
> produced with default configs from either producer does not result in a 
> RecordTooLargeException.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >