[jira] [Commented] (KAFKA-6881) Kafka 1.1 Broker version crashes when deleting log
[ https://issues.apache.org/jira/browse/KAFKA-6881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509191#comment-16509191 ] xiaojing zhou commented on KAFKA-6881: -- I have the same issue, the __consumer_offsets-7/019668089841.log does exist in our nas folder. My kafka version is 1.0.1. {code} [2018-06-11 00:04:23,282] ERROR Failed to clean up log for __consumer_offsets-7 in dir /nas/kafka_logs/lvsp01hkf001 due to IOException (kafka.server.LogDirFailureChannel) java.nio.file.NoSuchFileException: /nas/kafka_logs/lvsp01hkf001/__consumer_offsets-7/019668089841.log -- java.nio.file.NoSuchFileException: /nas/kafka_logs/lvsp01hkf001/__consumer_offsets-7/019668089841.log at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:409) at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262) at java.nio.file.Files.move(Files.java:1395) at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:682) at org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212) at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:398) at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:1592) at kafka.log.Log$$anonfun$replaceSegments$1.apply(Log.scala:1644) at kafka.log.Log$$anonfun$replaceSegments$1.apply(Log.scala:1639) at scala.collection.immutable.List.foreach(List.scala:392) {code} > Kafka 1.1 Broker version crashes when deleting log > -- > > Key: KAFKA-6881 > URL: https://issues.apache.org/jira/browse/KAFKA-6881 > Project: Kafka > Issue Type: Bug > Environment: Linux >Reporter: K B Parthasarathy >Priority: Critical > > Hello > We are running Kafka 1.1 version in Linux from past 3 weeks. Today Kafka > crashed. When we checked server.log file the following log was found > [2018-05-07 16:53:06,721] ERROR Failed to clean up log for > __consumer_offsets-24 in dir /tmp/kafka-logs due to IOException > (kafka.server.LogDirFailureChannel) > java.nio.file.NoSuchFileException: > /tmp/kafka-logs/__consumer_offsets-24/.log > at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) > at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) > at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) > at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:409) > at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262) > at java.nio.file.Files.move(Files.java:1395) > at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697) > at org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212) > at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:415) > at kafka.log.Log.asyncDeleteSegment(Log.scala:1601) > at kafka.log.Log.$anonfun$replaceSegments$1(Log.scala:1653) > at kafka.log.Log.$anonfun$replaceSegments$1$adapted(Log.scala:1648) > at scala.collection.immutable.List.foreach(List.scala:389) > at kafka.log.Log.replaceSegments(Log.scala:1648) > at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:535) > at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:462) > at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:461) > at scala.collection.immutable.List.foreach(List.scala:389) > at kafka.log.Cleaner.doClean(LogCleaner.scala:461) > at kafka.log.Cleaner.clean(LogCleaner.scala:438) > at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305) > at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > Suppressed: java.nio.file.NoSuchFileException: > /tmp/kafka-logs/__consumer_offsets-24/.log -> > /tmp/kafka-logs/__consumer_offsets-24/.log.deleted > at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) > at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) > at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:396) > at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262) > at java.nio.file.Files.move(Files.java:1395) > at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:694) > ... 16 more > [2018-05-07 16:53:06,725] INFO [ReplicaManager broker=0] Stopping serving > replicas in dir /tmp/kafka-logs (kafka.server.ReplicaManager) > [2018-05-07 16:53:06,762] INFO Stopping serving logs in dir /tmp/kafka-logs > (kafka.log.LogManager) > [2018-05-07 16:53:07,032] ERROR Shutdown broker because all log dirs in > /tmp/kafka-logs have failed
[jira] [Commented] (KAFKA-7043) Connect isolation whitelist does not include new primitive converters (KIP-305)
[ https://issues.apache.org/jira/browse/KAFKA-7043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509082#comment-16509082 ] ASF GitHub Bot commented on KAFKA-7043: --- rhauch opened a new pull request #5198: KAFKA-7043: Modified plugin isolation whitelist with recently added converters (KIP-305) URL: https://github.com/apache/kafka/pull/5198 Several recently-added converters are included in the plugin isolation whitelist, similarly to the `StringConverter`. This is a change in the implementation, and does not affect the approved KIP. Several unit tests were added to verify they are being loaded in isolation, again similarly to `StringConverter`. These changes should be applied only to `trunk`, since these converters were added as part of KIP-305 for AK 2.0. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Connect isolation whitelist does not include new primitive converters > (KIP-305) > --- > > Key: KAFKA-7043 > URL: https://issues.apache.org/jira/browse/KAFKA-7043 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Blocker > Fix For: 2.0.0 > > > KIP-305 added several new primitive converters, but the PR did not add them > to the whitelist for the plugin isolation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7045) Consumer may not be able to consume all messages when down-conversion is required
[ https://issues.apache.org/jira/browse/KAFKA-7045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509076#comment-16509076 ] Dhruvil Shah commented on KAFKA-7045: - Attached topic data that reproduces case (1) described in the JIRA where messages in the first segment starting offset 70 have been cleaned out. These messages were part of the batch starting at offset 69. Credit to [~omkreddy] for finding this issue and providing a reproducible test case. > Consumer may not be able to consume all messages when down-conversion is > required > - > > Key: KAFKA-7045 > URL: https://issues.apache.org/jira/browse/KAFKA-7045 > Project: Kafka > Issue Type: Bug > Components: consumer, core >Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0, 0.11.0.2, 1.1.0, 2.0.0, 1.0.1 >Reporter: Dhruvil Shah >Priority: Major > Fix For: 2.1.0 > > Attachments: log-cleaner-test.zip > > > When down-conversion is required, the consumer might fail consuming messages > under certain conditions. Couple such cases are outlined below: > (1) When consuming from a compacted topic, it is possible that the consumer > wants to fetch messages that fall in the middle of a batch but the messages > have been compacted by the cleaner. For example, let's say we have the > following two segments. The brackets indicate a single batch of messages and > the numbers within are the message offsets. > Segment #1: [0, 1, 2], [3, 4, 5], [6, 7, 8] > Segment #2: [9, 10, 11], [12, 13, 14] > If the cleaner were to come in now and clean up messages with offsets 7 and > 8, the segments would look like the following: > Segment #1: [0, 1, 2], [3, 4, 5], [6] > Segment #2: [9, 10, 11], [12, 13, 14] > A consumer attempting to fetch messages at offset 7 will start reading the > batch starting at offset 6. During down-conversion, we will drop the record > starting at 6 it is less than the current fetch start offset. However, there > are no messages in the log following offset 6. In such cases, we return the > `FileRecords` itself which would cause the consumer to throw an exception > because it does not understand the stored message format. > (2) When consuming from a topic with transactional messages, down-conversion > usually drops control batches because these do not exist in V0 and V1 message > formats. If there are no message batches following the control batch in the > particular segment (or if we are at the end of the log), we would again get > no records after down-conversion and will return the `FileRecords`. Because > the consumer is not able to interpret control batches, it will again throw an > exception. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6966) Extend `TopologyDescription.Sink` to return `TopicNameExtractor`
[ https://issues.apache.org/jira/browse/KAFKA-6966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509072#comment-16509072 ] Matthias J. Sax commented on KAFKA-6966: Thanks for your interest in contributing to Kafka! I added you to the list of contributors and assigned the ticket to you. You can also self-assign tickets now. Please prepare a KIP for this – you can also work on a PR in parallel – whatever works best for you. The KIP must be accepted before we can accept the PR. If you don't have a wiki account yet, please create one and provide your wiki ID so we can grant you write access to the wiki (otherwise you cannot create a KIP page). > Extend `TopologyDescription.Sink` to return `TopicNameExtractor` > > > Key: KAFKA-6966 > URL: https://issues.apache.org/jira/browse/KAFKA-6966 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Matthias J. Sax >Assignee: Nishanth Pradeep >Priority: Major > Labels: beginner, needs-kip, newbie > > With KIP-303, a dynamic routing feature was added and > `TopologyDescription.Sink#topic()` returns `null` if this feature is used. > It would be useful to get the actually used `TopicNameExtractor` class from > the `TopologyDescription`. > We suggest to add `Class > TopologyDescription.Sink#topicNameExtractor()` and let it return `null` if > dynamic routing feature is not used. > This is a public API change and requires a KIP: > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7045) Consumer may not be able to consume all messages when down-conversion is required
[ https://issues.apache.org/jira/browse/KAFKA-7045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dhruvil Shah updated KAFKA-7045: Attachment: log-cleaner-test.zip > Consumer may not be able to consume all messages when down-conversion is > required > - > > Key: KAFKA-7045 > URL: https://issues.apache.org/jira/browse/KAFKA-7045 > Project: Kafka > Issue Type: Bug > Components: consumer, core >Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0, 0.11.0.2, 1.1.0, 2.0.0, 1.0.1 >Reporter: Dhruvil Shah >Priority: Major > Fix For: 2.1.0 > > Attachments: log-cleaner-test.zip > > > When down-conversion is required, the consumer might fail consuming messages > under certain conditions. Couple such cases are outlined below: > (1) When consuming from a compacted topic, it is possible that the consumer > wants to fetch messages that fall in the middle of a batch but the messages > have been compacted by the cleaner. For example, let's say we have the > following two segments. The brackets indicate a single batch of messages and > the numbers within are the message offsets. > Segment #1: [0, 1, 2], [3, 4, 5], [6, 7, 8] > Segment #2: [9, 10, 11], [12, 13, 14] > If the cleaner were to come in now and clean up messages with offsets 7 and > 8, the segments would look like the following: > Segment #1: [0, 1, 2], [3, 4, 5], [6] > Segment #2: [9, 10, 11], [12, 13, 14] > A consumer attempting to fetch messages at offset 7 will start reading the > batch starting at offset 6. During down-conversion, we will drop the record > starting at 6 it is less than the current fetch start offset. However, there > are no messages in the log following offset 6. In such cases, we return the > `FileRecords` itself which would cause the consumer to throw an exception > because it does not understand the stored message format. > (2) When consuming from a topic with transactional messages, down-conversion > usually drops control batches because these do not exist in V0 and V1 message > formats. If there are no message batches following the control batch in the > particular segment (or if we are at the end of the log), we would again get > no records after down-conversion and will return the `FileRecords`. Because > the consumer is not able to interpret control batches, it will again throw an > exception. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7045) Consumer may not be able to consume all messages when down-conversion is required
[ https://issues.apache.org/jira/browse/KAFKA-7045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dhruvil Shah updated KAFKA-7045: Description: When down-conversion is required, the consumer might fail consuming messages under certain conditions. Couple such cases are outlined below: (1) When consuming from a compacted topic, it is possible that the consumer wants to fetch messages that fall in the middle of a batch but the messages have been compacted by the cleaner. For example, let's say we have the following two segments. The brackets indicate a single batch of messages and the numbers within are the message offsets. Segment #1: [0, 1, 2], [3, 4, 5], [6, 7, 8] Segment #2: [9, 10, 11], [12, 13, 14] If the cleaner were to come in now and clean up messages with offsets 7 and 8, the segments would look like the following: Segment #1: [0, 1, 2], [3, 4, 5], [6] Segment #2: [9, 10, 11], [12, 13, 14] A consumer attempting to fetch messages at offset 7 will start reading the batch starting at offset 6. During down-conversion, we will drop the record starting at 6 it is less than the current fetch start offset. However, there are no messages in the log following offset 6. In such cases, we return the `FileRecords` itself which would cause the consumer to throw an exception because it does not understand the stored message format. (2) When consuming from a topic with transactional messages, down-conversion usually drops control batches because these do not exist in V0 and V1 message formats. If there are no message batches following the control batch in the particular segment (or if we are at the end of the log), we would again get no records after down-conversion and will return the `FileRecords`. Because the consumer is not able to interpret control batches, it will again throw an exception. was: When down-conversion is required, the consumer might fail consuming messages under certain conditions. Couple such cases are outlined below: (1) When consuming from a compacted topic, it is possible that the consumer wants to fetch messages that fall in the middle of a batch but the messages have been compacted by the cleaner. For example, let's say we have the following two segments. The brackets indicate a single batch of messages and the numbers within are the message offsets. Segment #1: [0, 1, 2], [3, 4, 5], [6, 7, 8] Segment #2: [9, 10, 11], [12, 13, 14] If the cleaner were to come in now and clean up messages with offsets 7 and 8, the segments would look like the following: Segment #1: [0, 1, 2], [3, 4, 5], [6] Segment #2: [9, 10, 11], [12, 13, 14] A consumer attempting to fetch messages at offset 7 will start reading the batch starting at offset 6. During down-conversion, we will drop the record starting at 6 it is less than the current fetch start offset. However, there are no messages in the log following offset 6. In such cases, we return the `FileRecords` itself which would cause the consumer to throw an exception because it does not understand the stored message format. (2) When consuming from a topic with transactional messages, down-conversion usually drops control batches because these did not exist in V0 and V1 message formats. If there are no message batches following the control batch in the particular segment (or if we are at the end of the log), we would again get no records after down-conversion and will return the `FileRecords`. Because the consumer is not able to interpret control batches, it will again throw an exception. > Consumer may not be able to consume all messages when down-conversion is > required > - > > Key: KAFKA-7045 > URL: https://issues.apache.org/jira/browse/KAFKA-7045 > Project: Kafka > Issue Type: Bug > Components: consumer, core >Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0, 0.11.0.2, 1.1.0, 2.0.0, 1.0.1 >Reporter: Dhruvil Shah >Priority: Major > Fix For: 2.1.0 > > > When down-conversion is required, the consumer might fail consuming messages > under certain conditions. Couple such cases are outlined below: > (1) When consuming from a compacted topic, it is possible that the consumer > wants to fetch messages that fall in the middle of a batch but the messages > have been compacted by the cleaner. For example, let's say we have the > following two segments. The brackets indicate a single batch of messages and > the numbers within are the message offsets. > Segment #1: [0, 1, 2], [3, 4, 5], [6, 7, 8] > Segment #2: [9, 10, 11], [12, 13, 14] > If the cleaner were to come in now and clean up messages with offsets 7 and > 8, the segments would look like the following: > Segment #1: [0, 1, 2], [3, 4, 5], [6] > Segment #2: [9, 10, 11], [12, 13, 14] > A consumer
[jira] [Assigned] (KAFKA-6966) Extend `TopologyDescription.Sink` to return `TopicNameExtractor`
[ https://issues.apache.org/jira/browse/KAFKA-6966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-6966: -- Assignee: Nishanth Pradeep > Extend `TopologyDescription.Sink` to return `TopicNameExtractor` > > > Key: KAFKA-6966 > URL: https://issues.apache.org/jira/browse/KAFKA-6966 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Matthias J. Sax >Assignee: Nishanth Pradeep >Priority: Major > Labels: beginner, needs-kip, newbie > > With KIP-303, a dynamic routing feature was added and > `TopologyDescription.Sink#topic()` returns `null` if this feature is used. > It would be useful to get the actually used `TopicNameExtractor` class from > the `TopologyDescription`. > We suggest to add `Class > TopologyDescription.Sink#topicNameExtractor()` and let it return `null` if > dynamic routing feature is not used. > This is a public API change and requires a KIP: > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7045) Consumer may not be able to consume all messages when down-conversion is required
[ https://issues.apache.org/jira/browse/KAFKA-7045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dhruvil Shah updated KAFKA-7045: Description: When down-conversion is required, the consumer might fail consuming messages under certain conditions. Couple such cases are outlined below: (1) When consuming from a compacted topic, it is possible that the consumer wants to fetch messages that fall in the middle of a batch but the messages have been compacted by the cleaner. For example, let's say we have the following two segments. The brackets indicate a single batch of messages and the numbers within are the message offsets. Segment #1: [0, 1, 2], [3, 4, 5], [6, 7, 8] Segment #2: [9, 10, 11], [12, 13, 14] If the cleaner were to come in now and clean up messages with offsets 7 and 8, the segments would look like the following: Segment #1: [0, 1, 2], [3, 4, 5], [6] Segment #2: [9, 10, 11], [12, 13, 14] A consumer attempting to fetch messages at offset 7 will start reading the batch starting at offset 6. During down-conversion, we will drop the record starting at 6 it is less than the current fetch start offset. However, there are no messages in the log following offset 6. In such cases, we return the `FileRecords` itself which would cause the consumer to throw an exception because it does not understand the stored message format. (2) When consuming from a topic with transactional messages, down-conversion usually drops control batches because these did not exist in V0 and V1 message formats. If there are no message batches following the control batch in the particular segment (or if we are at the end of the log), we would again get no records after down-conversion and will return the `FileRecords`. Because the consumer is not able to interpret control batches, it will again throw an exception. was: When down-conversion is required, the consumer might fail consuming messages under certain conditions. Couple such cases are outlined below: # When consuming from a compacted topic, it is possible that the consumer wants to fetch messages that fall in the middle of a batch but the messages have been compacted by the cleaner. For example, let's say we have the following two segments. The brackets indicate a single batch of messages and the numbers within are the message offsets. Segment #1: [0, 1, 2], [3, 4, 5], [6, 7, 8] Segment #2: [9, 10, 11], [12, 13, 14] If the cleaner were to come in now and clean up messages with offsets 7 and 8, the segments would look like the following: Segment #1: [0, 1, 2], [3, 4, 5], [6] Segment #2: [9, 10, 11], [12, 13, 14] A consumer attempting to fetch messages at offset 7 will start reading the batch starting at offset 6. During down-conversion, we will drop the record starting at 6 it is less than the current fetch start offset. However, there are no messages in the log following offset 6. In such cases, we return the `FileRecords` itself which would cause the consumer to throw an exception because it does not understand the stored message format. (1) When consuming from a topic with transactional messages, down-conversion usually drops control batches because these did not exist in V0 and V1 message formats. If there are no message batches following the control batch in the particular segment (or if we are at the end of the log), we would again get no records after down-conversion and will return the `FileRecords`. Because the consumer is not able to interpret control batches, it will again throw an exception. (2) When consuming from a topic with transactional messages, down-conversion usually drops control batches because these did not exist in V0 and V1 message formats. If there are no message batches following the control batch in the particular segment (or if we are at the end of the log), we would again get no records after down-conversion and will return the `FileRecords`. Because the consumer is not able to interpret control batches, it will again throw an exception. > Consumer may not be able to consume all messages when down-conversion is > required > - > > Key: KAFKA-7045 > URL: https://issues.apache.org/jira/browse/KAFKA-7045 > Project: Kafka > Issue Type: Bug > Components: consumer, core >Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0, 0.11.0.2, 1.1.0, 2.0.0, 1.0.1 >Reporter: Dhruvil Shah >Priority: Major > Fix For: 2.1.0 > > > When down-conversion is required, the consumer might fail consuming messages > under certain conditions. Couple such cases are outlined below: > > (1) When consuming from a compacted topic, it is possible that the consumer > wants to fetch messages that fall in the middle of a batch but the messages > have been compacted by the
[jira] [Updated] (KAFKA-7045) Consumer may not be able to consume all messages when down-conversion is required
[ https://issues.apache.org/jira/browse/KAFKA-7045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dhruvil Shah updated KAFKA-7045: Description: When down-conversion is required, the consumer might fail consuming messages under certain conditions. Couple such cases are outlined below: # When consuming from a compacted topic, it is possible that the consumer wants to fetch messages that fall in the middle of a batch but the messages have been compacted by the cleaner. For example, let's say we have the following two segments. The brackets indicate a single batch of messages and the numbers within are the message offsets. Segment #1: [0, 1, 2], [3, 4, 5], [6, 7, 8] Segment #2: [9, 10, 11], [12, 13, 14] If the cleaner were to come in now and clean up messages with offsets 7 and 8, the segments would look like the following: Segment #1: [0, 1, 2], [3, 4, 5], [6] Segment #2: [9, 10, 11], [12, 13, 14] A consumer attempting to fetch messages at offset 7 will start reading the batch starting at offset 6. During down-conversion, we will drop the record starting at 6 it is less than the current fetch start offset. However, there are no messages in the log following offset 6. In such cases, we return the `FileRecords` itself which would cause the consumer to throw an exception because it does not understand the stored message format. # When consuming from a topic with transactional messages, down-conversion usually drops control batches because these did not exist in V0 and V1 message formats. If there are no message batches following the control batch in the particular segment (or if we are at the end of the log), we would again get no records after down-conversion and will return the `FileRecords`. Because the consumer is not able to interpret control batches, it will again throw an exception. # When consuming from a topic with transactional messages, down-conversion usually drops control batches because these did not exist in V0 and V1 message formats. If there are no message batches following the control batch in the particular segment (or if we are at the end of the log), we would again get no records after down-conversion and will return the `FileRecords`. Because the consumer is not able to interpret control batches, it will again throw an exception. was: When down-conversion is required, the consumer might fail consuming messages under certain conditions. Couple such cases are outlined below: # When consuming from a compacted topic, it is possible that the consumer wants to fetch messages that fall in the middle of a batch but the messages have been compacted by the cleaner. For example, let's say we have the following two segments. The brackets indicate a single batch of messages and the numbers within are the message offsets. Segment #1: [0, 1, 2], [3, 4, 5], [6, 7, 8] Segment #2: [9, 10, 11], [12, 13, 14] If the cleaner were to come in now and clean up messages with offsets 7 and 8, the segments would look like the following: Segment #1: [0, 1, 2], [3, 4, 5], [6] Segment #2: [9, 10, 11], [12, 13, 14] A consumer attempting to fetch messages at offset 7 will start reading the batch starting at offset 6. During down-conversion, we will drop the record starting at 6 it is less than the current fetch start offset. However, there are no messages in the log following offset 6. In such cases, we return the `FileRecords` itself which would cause the consumer to throw an exception because it does not understand the stored message format. # When consuming from a topic with transactional messages, down-conversion usually drops control batches because these did not exist in V0 and V1 message formats. If there are no message batches following the control batch in the particular segment (or if we are at the end of the log), we would again get no records after down-conversion and will return the `FileRecords`. Because the consumer is not able to interpret control batches, it will again throw an exception. Relevant code from 1.x release that sends `FileRecords` when we are not able to down-convert any messages: {quote}public ConvertedRecords downConvert(byte toMagic, long firstOffset, Time time) { ConvertedRecords convertedRecords = downConvert(batches, toMagic, firstOffset, time); if (convertedRecords.recordsProcessingStats().numRecordsConverted() == 0) { // This indicates that the message is too large, which means that the buffer is not large // enough to hold a full record batch. We just return all the bytes in this instance. // Even though the record batch does not have the right format version, we expect old clients // to raise an error to the user after reading the record batch size and seeing that there // are not enough available bytes in the response to read it fully. Note that this is // only possible prior to KIP-74, after which the broker was changed to always
[jira] [Updated] (KAFKA-7045) Consumer may not be able to consume all messages when down-conversion is required
[ https://issues.apache.org/jira/browse/KAFKA-7045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dhruvil Shah updated KAFKA-7045: Description: When down-conversion is required, the consumer might fail consuming messages under certain conditions. Couple such cases are outlined below: # When consuming from a compacted topic, it is possible that the consumer wants to fetch messages that fall in the middle of a batch but the messages have been compacted by the cleaner. For example, let's say we have the following two segments. The brackets indicate a single batch of messages and the numbers within are the message offsets. Segment #1: [0, 1, 2], [3, 4, 5], [6, 7, 8] Segment #2: [9, 10, 11], [12, 13, 14] If the cleaner were to come in now and clean up messages with offsets 7 and 8, the segments would look like the following: Segment #1: [0, 1, 2], [3, 4, 5], [6] Segment #2: [9, 10, 11], [12, 13, 14] A consumer attempting to fetch messages at offset 7 will start reading the batch starting at offset 6. During down-conversion, we will drop the record starting at 6 it is less than the current fetch start offset. However, there are no messages in the log following offset 6. In such cases, we return the `FileRecords` itself which would cause the consumer to throw an exception because it does not understand the stored message format. (1) When consuming from a topic with transactional messages, down-conversion usually drops control batches because these did not exist in V0 and V1 message formats. If there are no message batches following the control batch in the particular segment (or if we are at the end of the log), we would again get no records after down-conversion and will return the `FileRecords`. Because the consumer is not able to interpret control batches, it will again throw an exception. (2) When consuming from a topic with transactional messages, down-conversion usually drops control batches because these did not exist in V0 and V1 message formats. If there are no message batches following the control batch in the particular segment (or if we are at the end of the log), we would again get no records after down-conversion and will return the `FileRecords`. Because the consumer is not able to interpret control batches, it will again throw an exception. was: When down-conversion is required, the consumer might fail consuming messages under certain conditions. Couple such cases are outlined below: # When consuming from a compacted topic, it is possible that the consumer wants to fetch messages that fall in the middle of a batch but the messages have been compacted by the cleaner. For example, let's say we have the following two segments. The brackets indicate a single batch of messages and the numbers within are the message offsets. Segment #1: [0, 1, 2], [3, 4, 5], [6, 7, 8] Segment #2: [9, 10, 11], [12, 13, 14] If the cleaner were to come in now and clean up messages with offsets 7 and 8, the segments would look like the following: Segment #1: [0, 1, 2], [3, 4, 5], [6] Segment #2: [9, 10, 11], [12, 13, 14] A consumer attempting to fetch messages at offset 7 will start reading the batch starting at offset 6. During down-conversion, we will drop the record starting at 6 it is less than the current fetch start offset. However, there are no messages in the log following offset 6. In such cases, we return the `FileRecords` itself which would cause the consumer to throw an exception because it does not understand the stored message format. # When consuming from a topic with transactional messages, down-conversion usually drops control batches because these did not exist in V0 and V1 message formats. If there are no message batches following the control batch in the particular segment (or if we are at the end of the log), we would again get no records after down-conversion and will return the `FileRecords`. Because the consumer is not able to interpret control batches, it will again throw an exception. # When consuming from a topic with transactional messages, down-conversion usually drops control batches because these did not exist in V0 and V1 message formats. If there are no message batches following the control batch in the particular segment (or if we are at the end of the log), we would again get no records after down-conversion and will return the `FileRecords`. Because the consumer is not able to interpret control batches, it will again throw an exception. > Consumer may not be able to consume all messages when down-conversion is > required > - > > Key: KAFKA-7045 > URL: https://issues.apache.org/jira/browse/KAFKA-7045 > Project: Kafka > Issue Type: Bug > Components: consumer, core >Affects Versions: 0.11.0.0, 0.11.0.1,
[jira] [Updated] (KAFKA-7045) Consumer may not be able to consume all messages when down-conversion is required
[ https://issues.apache.org/jira/browse/KAFKA-7045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dhruvil Shah updated KAFKA-7045: Description: When down-conversion is required, the consumer might fail consuming messages under certain conditions. Couple such cases are outlined below: # When consuming from a compacted topic, it is possible that the consumer wants to fetch messages that fall in the middle of a batch but the messages have been compacted by the cleaner. For example, let's say we have the following two segments. The brackets indicate a single batch of messages and the numbers within are the message offsets. Segment #1: [0, 1, 2], [3, 4, 5], [6, 7, 8] Segment #2: [9, 10, 11], [12, 13, 14] If the cleaner were to come in now and clean up messages with offsets 7 and 8, the segments would look like the following: Segment #1: [0, 1, 2], [3, 4, 5], [6] Segment #2: [9, 10, 11], [12, 13, 14] A consumer attempting to fetch messages at offset 7 will start reading the batch starting at offset 6. During down-conversion, we will drop the record starting at 6 it is less than the current fetch start offset. However, there are no messages in the log following offset 6. In such cases, we return the `FileRecords` itself which would cause the consumer to throw an exception because it does not understand the stored message format. # When consuming from a topic with transactional messages, down-conversion usually drops control batches because these did not exist in V0 and V1 message formats. If there are no message batches following the control batch in the particular segment (or if we are at the end of the log), we would again get no records after down-conversion and will return the `FileRecords`. Because the consumer is not able to interpret control batches, it will again throw an exception. Relevant code from 1.x release that sends `FileRecords` when we are not able to down-convert any messages: {quote}public ConvertedRecords downConvert(byte toMagic, long firstOffset, Time time) { ConvertedRecords convertedRecords = downConvert(batches, toMagic, firstOffset, time); if (convertedRecords.recordsProcessingStats().numRecordsConverted() == 0) { // This indicates that the message is too large, which means that the buffer is not large // enough to hold a full record batch. We just return all the bytes in this instance. // Even though the record batch does not have the right format version, we expect old clients // to raise an error to the user after reading the record batch size and seeing that there // are not enough available bytes in the response to read it fully. Note that this is // only possible prior to KIP-74, after which the broker was changed to always return at least // one full record batch, even if it requires exceeding the max fetch size requested by the client. return new ConvertedRecords<>(this, RecordsProcessingStats.EMPTY); } else { return convertedRecords; } } {quote} was: When down-conversion is required, the consumer might fail consuming messages under certain conditions. Couple such cases are outlined below: # When consuming from a compacted topic, it is possible that the consumer wants to fetch messages that fall in the middle of a batch but the messages have been compacted by the cleaner. For example, let's say we have the following two segments. The brackets indicate a single batch of messages and the numbers within are the message offsets. Segment #1: [0, 1, 2], [3, 4, 5], [6, 7, 8] Segment #2: [9, 10, 11], [12, 13, 14] If the cleaner were to come in now and clean up messages with offsets 7 and 8, the segments would look like the following: Segment #1: [0, 1, 2], [3, 4, 5], [6] Segment #2: [9, 10, 11], [12, 13, 14] A consumer attempting to fetch messages at offset 7 will start reading the batch starting at offset 6. During down-conversion, we will drop the record starting at 6 it is less than the current fetch start offset. However, there are no messages in the log following offset 6. In such cases, we return the `FileRecords` itself which would cause the consumer to throw an exception because it does not understand the stored message format. # When consuming from a topic with transactional messages, down-conversion usually drops control batches because these did not exist in V0 and V1 message formats. If there are no message batches following the control batch in the particular segment (or if we are at the end of the log), we would again get no records after down-conversion and will return the `FileRecords`. Because the consumer is not able to interpret control batches, it will again throw an exception. Relevant code from 1.x release that sends `FileRecords` when we are not able to down-convert any messages: {{public ConvertedRecords downConvert(byte toMagic, long firstOffset, Time time) {}} {{ ConvertedRecords convertedRecords =
[jira] [Updated] (KAFKA-7045) Consumer may not be able to consume all messages when down-conversion is required
[ https://issues.apache.org/jira/browse/KAFKA-7045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dhruvil Shah updated KAFKA-7045: Description: When down-conversion is required, the consumer might fail consuming messages under certain conditions. Couple such cases are outlined below: # When consuming from a compacted topic, it is possible that the consumer wants to fetch messages that fall in the middle of a batch but the messages have been compacted by the cleaner. For example, let's say we have the following two segments. The brackets indicate a single batch of messages and the numbers within are the message offsets. Segment #1: [0, 1, 2], [3, 4, 5], [6, 7, 8] Segment #2: [9, 10, 11], [12, 13, 14] If the cleaner were to come in now and clean up messages with offsets 7 and 8, the segments would look like the following: Segment #1: [0, 1, 2], [3, 4, 5], [6] Segment #2: [9, 10, 11], [12, 13, 14] A consumer attempting to fetch messages at offset 7 will start reading the batch starting at offset 6. During down-conversion, we will drop the record starting at 6 it is less than the current fetch start offset. However, there are no messages in the log following offset 6. In such cases, we return the `FileRecords` itself which would cause the consumer to throw an exception because it does not understand the stored message format. # When consuming from a topic with transactional messages, down-conversion usually drops control batches because these did not exist in V0 and V1 message formats. If there are no message batches following the control batch in the particular segment (or if we are at the end of the log), we would again get no records after down-conversion and will return the `FileRecords`. Because the consumer is not able to interpret control batches, it will again throw an exception. Relevant code from 1.x release that sends `FileRecords` when we are not able to down-convert any messages: {{public ConvertedRecords downConvert(byte toMagic, long firstOffset, Time time) {}} {{ ConvertedRecords convertedRecords = downConvert(batches, toMagic, firstOffset, time);}} {{ if (convertedRecords.recordsProcessingStats().numRecordsConverted() == 0) {}} {{ // This indicates that the message is too large, which means that the buffer is not large}} {{ // enough to hold a full record batch. We just return all the bytes in this instance.}} {{ // Even though the record batch does not have the right format version, we expect old clients}} {{ // to raise an error to the user after reading the record batch size and seeing that there}} {{ // are not enough available bytes in the response to read it fully. Note that this is}} {{ // only possible prior to KIP-74, after which the broker was changed to always return at least}} {{ // one full record batch, even if it requires exceeding the max fetch size requested by the client.}} {{ return new ConvertedRecords<>(this, RecordsProcessingStats.EMPTY);}} {{ } else {}} {{ return convertedRecords;}} {{ }}} {{}}} was: When down-conversion is required, the consumer might fail consuming messages under certain conditions. Couple such cases are outlined below: # When consuming from a compacted topic, it is possible that the consumer wants to fetch messages that fall in the middle of a batch but the messages have been compacted by the cleaner. For example, let's say we have the following two segments. The brackets indicate a single batch of messages and the numbers within are the message offsets. Segment #1: [0, 1, 2], [3, 4, 5], [6, 7, 8] Segment #2: [9, 10, 11], [12, 13, 14] If the cleaner were to come in now and clean up messages with offsets 7 and 8, the segments would look like the following: Segment #1: [0, 1, 2], [3, 4, 5], [6] Segment #2: [9, 10, 11], [12, 13, 14] A consumer attempting to fetch messages at offset 7 will start reading the batch starting at offset 6. During down-conversion, we will drop the record starting at 6 it is less than the current fetch start offset. However, there are no messages in the log following offset 6. In such cases, we return the `FileRecords` itself which would cause the consumer to throw an exception because it does not understand the stored message format. # When consuming from a topic with transactional messages, down-conversion usually drops control batches because these did not exist in V0 and V1 message formats. If there are no message batches following the control batch in the particular segment (or if we are at the end of the log), we would again get no records after down-conversion and will return the `FileRecords`. Because the consumer is not able to interpret control batches, it will again throw an exception. Relevant code from 1.x release that sends `FileRecords` when we are not able to down-convert any messages: ``` public ConvertedRecords downConvert(byte toMagic, long firstOffset, Time time) {
[jira] [Created] (KAFKA-7045) Consumer may not be able to consume all messages when down-conversion is required
Dhruvil Shah created KAFKA-7045: --- Summary: Consumer may not be able to consume all messages when down-conversion is required Key: KAFKA-7045 URL: https://issues.apache.org/jira/browse/KAFKA-7045 Project: Kafka Issue Type: Bug Components: consumer, core Affects Versions: 1.0.1, 1.1.0, 0.11.0.2, 1.0.0, 0.11.0.1, 0.11.0.0, 2.0.0 Reporter: Dhruvil Shah Fix For: 2.1.0 When down-conversion is required, the consumer might fail consuming messages under certain conditions. Couple such cases are outlined below: # When consuming from a compacted topic, it is possible that the consumer wants to fetch messages that fall in the middle of a batch but the messages have been compacted by the cleaner. For example, let's say we have the following two segments. The brackets indicate a single batch of messages and the numbers within are the message offsets. Segment #1: [0, 1, 2], [3, 4, 5], [6, 7, 8] Segment #2: [9, 10, 11], [12, 13, 14] If the cleaner were to come in now and clean up messages with offsets 7 and 8, the segments would look like the following: Segment #1: [0, 1, 2], [3, 4, 5], [6] Segment #2: [9, 10, 11], [12, 13, 14] A consumer attempting to fetch messages at offset 7 will start reading the batch starting at offset 6. During down-conversion, we will drop the record starting at 6 it is less than the current fetch start offset. However, there are no messages in the log following offset 6. In such cases, we return the `FileRecords` itself which would cause the consumer to throw an exception because it does not understand the stored message format. # When consuming from a topic with transactional messages, down-conversion usually drops control batches because these did not exist in V0 and V1 message formats. If there are no message batches following the control batch in the particular segment (or if we are at the end of the log), we would again get no records after down-conversion and will return the `FileRecords`. Because the consumer is not able to interpret control batches, it will again throw an exception. Relevant code from 1.x release that sends `FileRecords` when we are not able to down-convert any messages: ``` public ConvertedRecords downConvert(byte toMagic, long firstOffset, Time time) { ConvertedRecords convertedRecords = downConvert(batches, toMagic, firstOffset, time); if (convertedRecords.recordsProcessingStats().numRecordsConverted() == 0) { // This indicates that the message is too large, which means that the buffer is not large // enough to hold a full record batch. We just return all the bytes in this instance. // Even though the record batch does not have the right format version, we expect old clients // to raise an error to the user after reading the record batch size and seeing that there // are not enough available bytes in the response to read it fully. Note that this is // only possible prior to KIP-74, after which the broker was changed to always return at least // one full record batch, even if it requires exceeding the max fetch size requested by the client. return new ConvertedRecords<>(this, RecordsProcessingStats.EMPTY); } else { return convertedRecords; } } ``` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6966) Extend `TopologyDescription.Sink` to return `TopicNameExtractor`
[ https://issues.apache.org/jira/browse/KAFKA-6966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509062#comment-16509062 ] Nishanth Pradeep commented on KAFKA-6966: - Hello [~mjsax]. I would like to pick this up ticket as my first contribution to Kafka. > Extend `TopologyDescription.Sink` to return `TopicNameExtractor` > > > Key: KAFKA-6966 > URL: https://issues.apache.org/jira/browse/KAFKA-6966 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Matthias J. Sax >Priority: Major > Labels: beginner, needs-kip, newbie > > With KIP-303, a dynamic routing feature was added and > `TopologyDescription.Sink#topic()` returns `null` if this feature is used. > It would be useful to get the actually used `TopicNameExtractor` class from > the `TopologyDescription`. > We suggest to add `Class > TopologyDescription.Sink#topicNameExtractor()` and let it return `null` if > dynamic routing feature is not used. > This is a public API change and requires a KIP: > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7044) kafka-consumer-groups.sh NullPointerException describing round robin or sticky assignors
[ https://issues.apache.org/jira/browse/KAFKA-7044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vahid Hashemian reassigned KAFKA-7044: -- Assignee: Vahid Hashemian > kafka-consumer-groups.sh NullPointerException describing round robin or > sticky assignors > > > Key: KAFKA-7044 > URL: https://issues.apache.org/jira/browse/KAFKA-7044 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 1.1.0 > Environment: CentOS 7.4, Oracle JDK 1.8 >Reporter: Jeff Field >Assignee: Vahid Hashemian >Priority: Minor > > We've recently moved to using the round robin assignor for one of our > consumer groups, and started testing the sticky assignor. In both cases, > using Kafka 1.1.0 we get a null pointer exception *unless* the group being > described is rebalancing: > {code:java} > kafka-consumer-groups --bootstrap-server fqdn:9092 --describe --group > groupname-for-consumer > Error: Executing consumer group command failed due to null > [2018-06-12 01:32:34,179] DEBUG Exception in consumer group command > (kafka.admin.ConsumerGroupCommand$) > java.lang.NullPointerException > at scala.Predef$.Long2long(Predef.scala:363) > at > kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$getLogEndOffsets$2.apply(ConsumerGroupCommand.scala:612) > at > kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$getLogEndOffsets$2.apply(ConsumerGroupCommand.scala:610) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:392) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.List.map(List.scala:296) > at > kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.getLogEndOffsets(ConsumerGroupCommand.scala:610) > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describePartitions(ConsumerGroupCommand.scala:328) > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.collectConsumerAssignment(ConsumerGroupCommand.scala:308) > at > kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.collectConsumerAssignment(ConsumerGroupCommand.scala:544) > at > kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10$$anonfun$13.apply(ConsumerGroupCommand.scala:571) > at > kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10$$anonfun$13.apply(ConsumerGroupCommand.scala:565) > at scala.collection.immutable.List.flatMap(List.scala:338) > at > kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10.apply(ConsumerGroupCommand.scala:565) > at > kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10.apply(ConsumerGroupCommand.scala:558) > at scala.Option.map(Option.scala:146) > at > kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.collectGroupOffsets(ConsumerGroupCommand.scala:558) > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describeGroup(ConsumerGroupCommand.scala:271) > at > kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:544) > at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:77) > at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala) > [2018-06-12 01:32:34,255] DEBUG Removed sensor with name connections-closed: > (org.apache.kafka.common.metrics.Metrics){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7044) kafka-consumer-groups.sh NullPointerException describing round robin or sticky assignors
Jeff Field created KAFKA-7044: - Summary: kafka-consumer-groups.sh NullPointerException describing round robin or sticky assignors Key: KAFKA-7044 URL: https://issues.apache.org/jira/browse/KAFKA-7044 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 1.1.0 Environment: CentOS 7.4, Oracle JDK 1.8 Reporter: Jeff Field We've recently moved to using the round robin assignor for one of our consumer groups, and started testing the sticky assignor. In both cases, using Kafka 1.1.0 we get a null pointer exception *unless* the group being described is rebalancing: {code:java} kafka-consumer-groups --bootstrap-server fqdn:9092 --describe --group groupname-for-consumer Error: Executing consumer group command failed due to null [2018-06-12 01:32:34,179] DEBUG Exception in consumer group command (kafka.admin.ConsumerGroupCommand$) java.lang.NullPointerException at scala.Predef$.Long2long(Predef.scala:363) at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$getLogEndOffsets$2.apply(ConsumerGroupCommand.scala:612) at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$getLogEndOffsets$2.apply(ConsumerGroupCommand.scala:610) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:296) at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.getLogEndOffsets(ConsumerGroupCommand.scala:610) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describePartitions(ConsumerGroupCommand.scala:328) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.collectConsumerAssignment(ConsumerGroupCommand.scala:308) at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.collectConsumerAssignment(ConsumerGroupCommand.scala:544) at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10$$anonfun$13.apply(ConsumerGroupCommand.scala:571) at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10$$anonfun$13.apply(ConsumerGroupCommand.scala:565) at scala.collection.immutable.List.flatMap(List.scala:338) at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10.apply(ConsumerGroupCommand.scala:565) at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10.apply(ConsumerGroupCommand.scala:558) at scala.Option.map(Option.scala:146) at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.collectGroupOffsets(ConsumerGroupCommand.scala:558) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describeGroup(ConsumerGroupCommand.scala:271) at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:544) at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:77) at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala) [2018-06-12 01:32:34,255] DEBUG Removed sensor with name connections-closed: (org.apache.kafka.common.metrics.Metrics){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7023) Kafka Streams RocksDB bulk loading config may not be honored with customized RocksDBConfigSetter
[ https://issues.apache.org/jira/browse/KAFKA-7023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509014#comment-16509014 ] ASF GitHub Bot commented on KAFKA-7023: --- guozhangwang opened a new pull request #5197: KAFKA-7023: Add unit test URL: https://github.com/apache/kafka/pull/5197 Add a unit test that validates after restoreStart, the options are set with bulk loading configs; and after restoreEnd, it resumes to the customized configs ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka Streams RocksDB bulk loading config may not be honored with customized > RocksDBConfigSetter > - > > Key: KAFKA-7023 > URL: https://issues.apache.org/jira/browse/KAFKA-7023 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Liquan Pei >Assignee: Liquan Pei >Priority: Major > Original Estimate: 24h > Remaining Estimate: 24h > > We observed frequent L0 -> L1 compaction during Kafka Streams state recovery. > Some sample log: > {code:java} > 2018/06/08-00:04:50.892331 7f8a6d7fa700 (Original Log Time > 2018/06/08-00:04:50.892298) [db/compaction_picker_universal.cc:270] [default] > Universal: sorted runs files(6): files[3 0 0 0 1 1 38] max score 1.00 > 2018/06/08-00:04:50.892336 7f8a6d7fa700 (Original Log Time > 2018/06/08-00:04:50.892300) [db/compaction_picker_universal.cc:655] [default] > Universal: First candidate file 134[0] to reduce size amp. > 2018/06/08-00:04:50.892338 7f8a6d7fa700 (Original Log Time > 2018/06/08-00:04:50.892302) [db/compaction_picker_universal.cc:686] [default] > Universal: size amp not needed. newer-files-total-size 13023497 > earliest-file-size 2541530372 > 2018/06/08-00:04:50.892339 7f8a6d7fa700 (Original Log Time > 2018/06/08-00:04:50.892303) [db/compaction_picker_universal.cc:473] [default] > Universal: Possible candidate file 134[0]. > 2018/06/08-00:04:50.892341 7f8a6d7fa700 (Original Log Time > 2018/06/08-00:04:50.892304) [db/compaction_picker_universal.cc:525] [default] > Universal: Skipping file 134[0] with size 1007 (compensated size 1287) > 2018/06/08-00:04:50.892343 7f8a6d7fa700 (Original Log Time > 2018/06/08-00:04:50.892306) [db/compaction_picker_universal.cc:473] [default] > Universal: Possible candidate file 133[1]. > 2018/06/08-00:04:50.892344 7f8a6d7fa700 (Original Log Time > 2018/06/08-00:04:50.892307) [db/compaction_picker_universal.cc:525] [default] > Universal: Skipping file 133[1] with size 4644 (compensated size 16124) > 2018/06/08-00:04:50.892346 7f8a6d7fa700 (Original Log Time > 2018/06/08-00:04:50.892307) [db/compaction_picker_universal.cc:473] [default] > Universal: Possible candidate file 126[2]. > 2018/06/08-00:04:50.892348 7f8a6d7fa700 (Original Log Time > 2018/06/08-00:04:50.892308) [db/compaction_picker_universal.cc:525] [default] > Universal: Skipping file 126[2] with size 319764 (compensated size 319764) > 2018/06/08-00:04:50.892349 7f8a6d7fa700 (Original Log Time > 2018/06/08-00:04:50.892309) [db/compaction_picker_universal.cc:473] [default] > Universal: Possible candidate level 4[3]. > 2018/06/08-00:04:50.892351 7f8a6d7fa700 (Original Log Time > 2018/06/08-00:04:50.892310) [db/compaction_picker_universal.cc:525] [default] > Universal: Skipping level 4[3] with size 2815574 (compensated size 2815574) > 2018/06/08-00:04:50.892352 7f8a6d7fa700 (Original Log Time > 2018/06/08-00:04:50.892311) [db/compaction_picker_universal.cc:473] [default] > Universal: Possible candidate level 5[4]. > 2018/06/08-00:04:50.892357 7f8a6d7fa700 (Original Log Time > 2018/06/08-00:04:50.892311) [db/compaction_picker_universal.cc:525] [default] > Universal: Skipping level 5[4] with size 9870748 (compensated size 9870748) > 2018/06/08-00:04:50.892358 7f8a6d7fa700 (Original Log Time > 2018/06/08-00:04:50.892313) [db/compaction_picker_universal.cc:473] [default] > Universal: Possible candidate level 6[5]. > {code} > In customized RocksDBConfigSetter, we set > {code:java} > level0_file_num_compaction_trigger=6 {code} > During bulk loading, the following options are set: > [https://github.com/facebook/rocksdb/blob/master/options/options.cc] > {code:java} > Options* > Options::PrepareForBulkLoad() > { > // never slowdown ingest. > level0_file_num_compaction_trigger
[jira] [Commented] (KAFKA-6906) Kafka Streams does not commit transactions if data is produced via wall-clock punctuation
[ https://issues.apache.org/jira/browse/KAFKA-6906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508991#comment-16508991 ] ASF GitHub Bot commented on KAFKA-6906: --- mjsax opened a new pull request #5196: MINOR: code cleanup follow up for KAFKA-6906 URL: https://github.com/apache/kafka/pull/5196 Intellij warned that the condition of the `if` will always be `false` -- thinking about this, it makes sense. After refactoring the code in KAFKA-6906 and moving the EOS-part out of the `commitOffsetNeeded` block L433-440 cover this case already and we can remove redundant code. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka Streams does not commit transactions if data is produced via wall-clock > punctuation > - > > Key: KAFKA-6906 > URL: https://issues.apache.org/jira/browse/KAFKA-6906 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Matthias J. Sax >Assignee: Jagadesh Adireddi >Priority: Major > Fix For: 2.0.0, 1.1.1 > > > Committing in Kafka Streams happens in regular intervals. However, committing > only happens if new input records got processed since the last commit (via > setting flag `commitOffsetNeeded` within `StreamTask#process()`) > However, data could also be emitted via wall-clock based punctuation calls. > Especially if EOS is enabled, this is an issue (maybe also for non-EOS) > because the current running transaction is not committed and thus might time > out leading to a fatal error. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7021) Source KTable checkpoint is not correct
[ https://issues.apache.org/jira/browse/KAFKA-7021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508982#comment-16508982 ] ASF GitHub Bot commented on KAFKA-7021: --- guozhangwang opened a new pull request #5195: KAFKA-7021: Update upgrade guide section for reusing source topic URL: https://github.com/apache/kafka/pull/5195 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Source KTable checkpoint is not correct > --- > > Key: KAFKA-7021 > URL: https://issues.apache.org/jira/browse/KAFKA-7021 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > Fix For: 0.10.2.2, 2.0.0, 0.11.0.3, 1.0.2, 1.1.1 > > > Kafka Streams treats source KTables,ie, table created via `builder.table()`, > differently. Instead of creating a changelog topic, the original source topic > is use to avoid unnecessary data redundancy. > However, Kafka Streams does not write a correct local state checkpoint file. > This results in unnecessary state restore after a rebalance. Instead of the > latest committed offset, the latest restored offset is written into the > checkpoint file in `ProcessorStateManager#close()` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6906) Kafka Streams does not commit transactions if data is produced via wall-clock punctuation
[ https://issues.apache.org/jira/browse/KAFKA-6906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-6906. Resolution: Fixed Fix Version/s: 1.1.1 2.0.0 > Kafka Streams does not commit transactions if data is produced via wall-clock > punctuation > - > > Key: KAFKA-6906 > URL: https://issues.apache.org/jira/browse/KAFKA-6906 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Matthias J. Sax >Assignee: Jagadesh Adireddi >Priority: Major > Fix For: 2.0.0, 1.1.1 > > > Committing in Kafka Streams happens in regular intervals. However, committing > only happens if new input records got processed since the last commit (via > setting flag `commitOffsetNeeded` within `StreamTask#process()`) > However, data could also be emitted via wall-clock based punctuation calls. > Especially if EOS is enabled, this is an issue (maybe also for non-EOS) > because the current running transaction is not committed and thus might time > out leading to a fatal error. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7021) Source KTable checkpoint is not correct
[ https://issues.apache.org/jira/browse/KAFKA-7021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508927#comment-16508927 ] ASF GitHub Bot commented on KAFKA-7021: --- guozhangwang closed pull request #5163: KAFKA-7021: Reuse source based on config URL: https://github.com/apache/kafka/pull/5163 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index d6002ff016b..6a707ff986d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -599,7 +599,7 @@ public KafkaStreams(final Topology topology, @Deprecated public KafkaStreams(final Topology topology, final StreamsConfig config) { -this(topology.internalTopologyBuilder, config, new DefaultKafkaClientSupplier()); +this(topology, config, new DefaultKafkaClientSupplier()); } /** @@ -635,6 +635,10 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, this.config = config; this.time = time; +// adjust the topology if optimization is turned on. +// TODO: to be removed post 2.0 +internalTopologyBuilder.adjust(config); + // The application ID is a required config and hence should always have value processId = UUID.randomUUID(); final String userClientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG); diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java index 517104da323..ae6d44c449e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java @@ -302,11 +302,10 @@ Objects.requireNonNull(materialized, "materialized can't be null"); final MaterializedInternal> materializedInternal = new MaterializedInternal<>(materialized); materializedInternal.generateStoreNameIfNeeded(internalStreamsBuilder, topic + "-"); +final ConsumedInternal consumedInternal = +new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(), materializedInternal.valueSerde())); -return internalStreamsBuilder.table(topic, -new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(), - materializedInternal.valueSerde())), -materializedInternal); +return internalStreamsBuilder.table(topic, consumedInternal, materializedInternal); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java index 22f6ea8362b..753185c2164 100644 --- a/streams/src/main/java/org/apache/kafka/streams/Topology.java +++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java @@ -776,5 +776,4 @@ public synchronized Topology connectProcessorAndStateStores(final String process public synchronized TopologyDescription describe() { return internalTopologyBuilder.describe(); } - } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index 0a19b4eb0c0..c7bf2fac8f7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -72,11 +72,7 @@ public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuil public KTable table(final String topic, final ConsumedInternal consumed, final MaterializedInternal> materialized) { -// explicitly disable logging for source table materialized stores -materialized.withLoggingDisabled(); - -final StoreBuilder> storeBuilder = new KeyValueStoreMaterializer<>(materialized) -.materialize(); +final StoreBuilder> storeBuilder = new KeyValueStoreMaterializer<>(materialized).materialize(); final String source = newProcessorName(KStreamImpl.SOURCE_NAME); final String name = newProcessorName(KTableImpl.SOURCE_NAME); @@ -88,7 +84,7 @@ public InternalStreamsBuilder(final InternalTopologyBuilder
[jira] [Commented] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions
[ https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508907#comment-16508907 ] Lucas Wang commented on KAFKA-7040: --- To be more specific, I think the following sequence of events may cause a truncation below HW. Say currently both broker0 and broker1 have finished processing of a LeaderAndISR request with leader being broker1, and leader epoch 10, and both of them have 100 messages in their respective log (with the largest offset 99, and LEO of 100). 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request with leader epoch 10 to broker1, and broker1 replies with a LEO of 100 given it's the latest leader epoch. Before the replica fetcher on broker0 acquires the AbstractFetcherThread.partitionMapLock and processes the LeaderEpoch response, it goes through steps 2-4 first. 2. A LeaderAndISR request causes broker0 to become the leader for one partition t1p0, which in turn will remove the partition t1p0 from the replica fetcher thread 3. Broker0 accepts one message at offset 100 from a producer, and the message gets replicated to broker1, causing the HW on broker0 to go up to 100. 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and broker0 to become the follower for partition t1p0. This will cause the partition t1p0 to be added back to the replica fetcher thread on broker0. 5. The replica fetcher thread on broker0 processes the LeaderEpoch response received in step 1, and truncates the accepted message with offset 100 in step3. > The replica fetcher thread may truncate accepted messages during multiple > fast leadership transitions > - > > Key: KAFKA-7040 > URL: https://issues.apache.org/jira/browse/KAFKA-7040 > Project: Kafka > Issue Type: Bug >Reporter: Lucas Wang >Priority: Minor > > Problem Statement: > Consider the scenario where there are two brokers, broker0, and broker1, and > there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as > the leader and broker0 as the follower. The following sequence of events > happened on broker0 > 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to > broker1, and awaits to get the response > 2. A LeaderAndISR request causes broker0 to become the leader for one > partition t1p0, which in turn will remove the partition t1p0 from the replica > fetcher thread > 3. Broker0 accepts some messages from a producer > 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and > broker0 to become the follower for partition t1p0. This will cause the > partition t1p0 to be added back to the replica fetcher thread on broker0. > 5. The replica fetcher thread on broker0 receives a response for the > LeaderEpoch request issued in step 1, and truncates the accepted messages in > step3. > The issue can be reproduced with the test from > https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea > [1] Initially we set up broker0 to be the follower of two partitions instead > of just one, to avoid the shutting down of the replica fetcher thread when it > becomes idle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6977) Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 while fetching data
[ https://issues.apache.org/jira/browse/KAFKA-6977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508902#comment-16508902 ] Eugen Feller commented on KAFKA-6977: - I think it was 0.11.0.1 consumer as I have used the kafka-console-consumer CLI (from Kafka 0.11.0.1) to do the test. The consumer was running on my local machine via VPN. I will give it a try on ECS. I think that topic is being consumed by at least one downstream service on ECS. > Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 > while fetching data > - > > Key: KAFKA-6977 > URL: https://issues.apache.org/jira/browse/KAFKA-6977 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 >Reporter: Eugen Feller >Priority: Blocker > Labels: streams > > We are running Kafka Streams 0.11.0.1 with Kafka Broker 0.10.2.1 and > constantly run into the following exception: > {code:java} > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > partition assignment took 40 ms. > current active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, 0_15, > 0_17, 0_20, 0_21, 0_23, 0_25, 0_28, 0_2, 0_6, 0_7, 0_8, 0_9, 0_11, 0_16, > 0_18, 0_19, 0_22, 0_24, 0_26, 0_27, 0_29, 0_30, 0_31] > current standby tasks: [] > previous active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, > 0_15, 0_17, 0_20, 0_21, 0_23, 0_25, 0_28] > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > State transition from PARTITIONS_ASSIGNED to RUNNING. > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > INFO org.apache.kafka.streams.KafkaStreams - stream-client > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e] State > transition from REBALANCING to RUNNING. > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > ERROR org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > Encountered the following error during processing: > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > at > org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:886) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1086) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490) > java.lang.IllegalStateException: Unexpected error code 2 while fetching data > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > Shutting down > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > State transition from RUNNING to PENDING_SHUTDOWN. > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka > producer with timeoutMillis = 9223372036854775807 ms. > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > Stream thread shutdown complete > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > State transition from PENDING_SHUTDOWN to DEAD. > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > INFO org.apache.kafka.streams.KafkaStreams - stream-client >
[jira] [Commented] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions
[ https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508867#comment-16508867 ] Lucas Wang commented on KAFKA-7040: --- [~lindong] Thanks for pointing out the truncation may happen below high watermark. Also your suggested change seems to work by remembering that some partition state has been used, so that we can distinguish between different generations of the partition state. > The replica fetcher thread may truncate accepted messages during multiple > fast leadership transitions > - > > Key: KAFKA-7040 > URL: https://issues.apache.org/jira/browse/KAFKA-7040 > Project: Kafka > Issue Type: Bug >Reporter: Lucas Wang >Priority: Minor > > Problem Statement: > Consider the scenario where there are two brokers, broker0, and broker1, and > there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as > the leader and broker0 as the follower. The following sequence of events > happened on broker0 > 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to > broker1, and awaits to get the response > 2. A LeaderAndISR request causes broker0 to become the leader for one > partition t1p0, which in turn will remove the partition t1p0 from the replica > fetcher thread > 3. Broker0 accepts some messages from a producer > 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and > broker0 to become the follower for partition t1p0. This will cause the > partition t1p0 to be added back to the replica fetcher thread on broker0. > 5. The replica fetcher thread on broker0 receives a response for the > LeaderEpoch request issued in step 1, and truncates the accepted messages in > step3. > The issue can be reproduced with the test from > https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea > [1] Initially we set up broker0 to be the follower of two partitions instead > of just one, to avoid the shutting down of the replica fetcher thread when it > becomes idle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7003) Add headers with error context in messages written to the Connect DeadLetterQueue topic
[ https://issues.apache.org/jira/browse/KAFKA-7003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508849#comment-16508849 ] ASF GitHub Bot commented on KAFKA-7003: --- ewencp closed pull request #5159: KAFKA-7003: Set error context in message headers (KIP-298) URL: https://github.com/apache/kafka/pull/5159 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java index 6e9bd6b9e71..d9d140b9cdc 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java @@ -57,11 +57,19 @@ public static final short DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT = 3; private static final String DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY = "Dead Letter Queue Topic Replication Factor"; +public static final String DLQ_CONTEXT_HEADERS_ENABLE_CONFIG = DLQ_PREFIX + "context.headers.enable"; +public static final boolean DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT = false; +public static final String DLQ_CONTEXT_HEADERS_ENABLE_DOC = "If true, add headers containing error context to the messages " + +"written to the dead letter queue. To avoid clashing with headers from the original record, all error context header " + +"keys, all error context header keys will start with __connect.errors."; +private static final String DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY = "Enable Error Context Headers"; + static ConfigDef config = ConnectorConfig.configDef() .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY) .define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY) .define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY) -.define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY); +.define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY) +.define(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT, Importance.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DOC, ERROR_GROUP, 8, ConfigDef.Width.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY); public static ConfigDef configDef() { return config; @@ -107,4 +115,8 @@ public String dlqTopicName() { public short dlqTopicReplicationFactor() { return getShort(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG); } + +public boolean isDlqContextHeadersEnabled() { +return getBoolean(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG); +} } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 97e68faa4ca..c794eb8c807 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -530,7 +530,7 @@ ErrorHandlingMetrics errorHandlingMetrics(ConnectorTaskId id) { // check if topic for dead letter queue exists String topic = connConfig.dlqTopicName(); if (topic != null && !topic.isEmpty()) { -DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(config, connConfig, producerProps); +DeadLetterQueueReporter reporter = DeadLetterQueueReporter.createAndSetup(config, id, connConfig, producerProps); reporters.add(reporter); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java index 459eeae1ff4..d36ec22ec88 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java +++
[jira] [Commented] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions
[ https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508847#comment-16508847 ] Dong Lin commented on KAFKA-7040: - [~luwang] Nice catch. It is indeed problematic (with some probability of losing data) that a replica fetcher thread can truncate to an offset smaller than the high watermark due to receiving an "outdated" OffsetsForLeaderEpochResponse. One possible solution is to add one more field, say `initialized`, in AbstractFetcherThread.partitionStates. The value is set to false after the partition is added to partitionStates. ReplicaFetcherThread will update `initialized` to true the first time it reads this partition. Later, after ReplicaFetcherThread receives either FetchResponse or OffsetsForLeaderEpochResponse, it should discard this partitions in the response whose `initialized` is false. This seems to fix the issue here. > The replica fetcher thread may truncate accepted messages during multiple > fast leadership transitions > - > > Key: KAFKA-7040 > URL: https://issues.apache.org/jira/browse/KAFKA-7040 > Project: Kafka > Issue Type: Bug >Reporter: Lucas Wang >Priority: Minor > > Problem Statement: > Consider the scenario where there are two brokers, broker0, and broker1, and > there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as > the leader and broker0 as the follower. The following sequence of events > happened on broker0 > 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to > broker1, and awaits to get the response > 2. A LeaderAndISR request causes broker0 to become the leader for one > partition t1p0, which in turn will remove the partition t1p0 from the replica > fetcher thread > 3. Broker0 accepts some messages from a producer > 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and > broker0 to become the follower for partition t1p0. This will cause the > partition t1p0 to be added back to the replica fetcher thread on broker0. > 5. The replica fetcher thread on broker0 receives a response for the > LeaderEpoch request issued in step 1, and truncates the accepted messages in > step3. > The issue can be reproduced with the test from > https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea > [1] Initially we set up broker0 to be the follower of two partitions instead > of just one, to avoid the shutting down of the replica fetcher thread when it > becomes idle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7003) Add headers with error context in messages written to the Connect DeadLetterQueue topic
[ https://issues.apache.org/jira/browse/KAFKA-7003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava resolved KAFKA-7003. -- Resolution: Fixed Fix Version/s: 2.1.0 2.0.0 Issue resolved by pull request 5159 [https://github.com/apache/kafka/pull/5159] > Add headers with error context in messages written to the Connect > DeadLetterQueue topic > --- > > Key: KAFKA-7003 > URL: https://issues.apache.org/jira/browse/KAFKA-7003 > Project: Kafka > Issue Type: Task >Reporter: Arjun Satish >Priority: Major > Fix For: 2.0.0, 2.1.0 > > > This was added to the KIP after the feature freeze. > If the property {{errors.deadletterqueue.}}{{context.headers.enable}} is set > to {{*true*}}, the following headers will be added to the produced raw > message (only if they don't already exist in the message). All values will be > serialized as UTF-8 strings. > ||Header Name||Description|| > |__connect.errors.topic|Name of the topic that contained the message.| > |__connect.errors.task.id|The numeric ID of the task that encountered the > error (encoded as a UTF-8 string).| > |__connect.errors.stage|The name of the stage where the error occurred.| > |__connect.errors.partition|The numeric ID of the partition in the original > topic that contained the message (encoded as a UTF-8 string).| > |__connect.errors.offset|The numeric value of the message offset in the > original topic (encoded as a UTF-8 string).| > |__connect.errors.exception.stacktrace|The stacktrace of the exception.| > |__connect.errors.exception.message|The message in the exception.| > |__connect.errors.exception.class.name|The fully qualified classname of the > exception that was thrown during the execution.| > |__connect.errors.connector.name|The name of the connector which encountered > the error.| > |__connect.errors.class.name|The fully qualified name of the class that > caused the error.| -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7042) Fall back to the old behavior when the broker is too old to recognize LIST_OFFSET versions
[ https://issues.apache.org/jira/browse/KAFKA-7042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508845#comment-16508845 ] Ismael Juma commented on KAFKA-7042: Maybe they should not turn it on for older brokers? > Fall back to the old behavior when the broker is too old to recognize > LIST_OFFSET versions > -- > > Key: KAFKA-7042 > URL: https://issues.apache.org/jira/browse/KAFKA-7042 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Guozhang Wang >Priority: Major > > When READ_COMMITTED is turned on (since 0.11.0), LIST_OFFSET requires > min_version to be 2 on the consumer client side. On the other hand, if broker > is no newer than 0.10.2 it can only recognize the version of LIST_OFFSET up > to 1. In this case the consumer talking to such an old broker will throw an > exception right away. > What we can improve though, is that when the consumer realized broker does > not recognize LIST_OFFSET of at least 2, it can fall back to the old behavior > of READ_UNCOMMITTED since the data on that broker should not have any txn > markers anyways. By doing this we would lift the hard restriction that > consumers with READ_COMMITTED cannot work with an older version of broker > (remember we are trying to achieve broker compatibility since 0.10.0). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6990) CommitFailedException; this task may be no longer owned by the thread
[ https://issues.apache.org/jira/browse/KAFKA-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508843#comment-16508843 ] Eugen Feller commented on KAFKA-6990: - Thanks a lot. For sake of completeness. Logs sent via Slack. :) > CommitFailedException; this task may be no longer owned by the thread > - > > Key: KAFKA-6990 > URL: https://issues.apache.org/jira/browse/KAFKA-6990 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 >Reporter: Eugen Feller >Priority: Blocker > > We are seeing a lot of CommitFailedExceptions on one of our Kafka stream > apps. Running Kafka Streams 0.11.0.1 and Kafka Broker 0.10.2.1. Full error > message: > {code:java} > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] INFO > org.apache.kafka.streams.KafkaStreams - stream-client > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d] State transition from > REBALANCING to RUNNING. > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.StreamTask - task [0_0] Failed > offset commits > {sightings_sighting_byclientmac_0-0=OffsetAndMetadata{offset=11, > metadata=''}, > mactocontactmappings_contactmapping_byclientmac_0-0=OffsetAndMetadata{offset=26, > metadata=''}} due to CommitFailedException > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit > stream task 0_0 during commit state due to CommitFailedException; this task > may be no longer owned by the thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit > stream task 0_1 during commit state due to CommitFailedException; this task > may be no longer owned by the thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.StreamTask - task [0_1] Failed > offset commits > {mactocontactmappings_contactmapping_byclientmac_0-1=OffsetAndMetadata{offset=38, > metadata=''}, sightings_sighting_byclientmac_0-1=OffsetAndMetadata{offset=1, > metadata=''}} due to CommitFailedException > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.StreamTask - task [0_2] Failed > offset commits > {mactocontactmappings_contactmapping_byclientmac_0-2=OffsetAndMetadata{offset=8, > metadata=''}, > sightings_sighting_byclientmac_0-2=OffsetAndMetadata{offset=24, metadata=''}} > due to CommitFailedException > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit > stream task 0_2 during commit state due to CommitFailedException; this task > may be no longer owned by the thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.StreamTask - task [0_3] Failed > offset commits > {sightings_sighting_byclientmac_0-3=OffsetAndMetadata{offset=21, > metadata=''}, > mactocontactmappings_contactmapping_byclientmac_0-3=OffsetAndMetadata{offset=102, > metadata=''}} due to CommitFailedException > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit > stream task 0_3 during commit state due to CommitFailedException; this task > may be no longer owned by the thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.StreamTask - task [0_4] Failed > offset commits > {sightings_sighting_byclientmac_0-4=OffsetAndMetadata{offset=5, metadata=''}, > mactocontactmappings_contactmapping_byclientmac_0-4=OffsetAndMetadata{offset=20, > metadata=''}} due to CommitFailedException > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit > stream task 0_4 during commit state due to CommitFailedException; this task > may be no longer owned by the thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.StreamTask - task [0_5] Failed > offset commits >
[jira] [Commented] (KAFKA-5570) Join request's timeout should be slightly higher than the rebalance timeout
[ https://issues.apache.org/jira/browse/KAFKA-5570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508828#comment-16508828 ] ASF GitHub Bot commented on KAFKA-5570: --- ijuma closed pull request #3503: KAFKA-5570: Join request's timeout should be slightly higher than the rebalance timeout URL: https://github.com/apache/kafka/pull/3503 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java index 9646f4a..3d6c2819ae9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java @@ -30,6 +30,7 @@ private final int correlationId; private final String clientId; private final long createdTimeMs; +private final int timeoutMs; private final boolean expectResponse; private final RequestCompletionHandler callback; @@ -39,6 +40,7 @@ * @param correlationId The correlation id for this client request * @param clientId The client ID to use for the header * @param createdTimeMs The unix timestamp in milliseconds for the time at which this request was created. +* @param timeoutMs The request timeout in milliseconds. * @param expectResponse Should we expect a response message or is this request complete once it is sent? * @param callback A callback to execute when the response has been received (or null if no callback is necessary) */ @@ -47,6 +49,7 @@ public ClientRequest(String destination, int correlationId, String clientId, long createdTimeMs, + int timeoutMs, boolean expectResponse, RequestCompletionHandler callback) { this.destination = destination; @@ -54,6 +57,7 @@ public ClientRequest(String destination, this.correlationId = correlationId; this.clientId = clientId; this.createdTimeMs = createdTimeMs; +this.timeoutMs = timeoutMs; this.expectResponse = expectResponse; this.callback = callback; } @@ -66,6 +70,7 @@ public String toString() { ", correlationId=" + correlationId + ", clientId=" + clientId + ", createdTimeMs=" + createdTimeMs + +", timeoutMs=" + timeoutMs + ", requestBuilder=" + requestBuilder + ")"; } @@ -98,6 +103,10 @@ public long createdTimeMs() { return createdTimeMs; } +public int timeoutMs() { +return timeoutMs; +} + public int correlationId() { return correlationId; } diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java index f9773297dbb..d58827fcd0f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java +++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java @@ -17,10 +17,10 @@ package org.apache.kafka.clients; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Collections; import java.util.Deque; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -31,6 +31,7 @@ private final int maxInFlightRequestsPerConnection; private final Map> requests = new HashMap<>(); +private Integer minTimeoutMs; public InFlightRequests(int maxInFlightRequestsPerConnection) { this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection; @@ -46,6 +47,8 @@ public void add(NetworkClient.InFlightRequest request) { reqs = new ArrayDeque<>(); this.requests.put(destination, reqs); } +if (minTimeoutMs != null) +minTimeoutMs = Math.min(request.timeoutMs, minTimeoutMs); reqs.addFirst(request); } @@ -60,10 +63,12 @@ public void add(NetworkClient.InFlightRequest request) { } /** - * Get the oldest request (the one that that will be completed next) for the given node + * Complete the oldest request (the one that that will be completed next) for the given node */ public NetworkClient.InFlightRequest completeNext(String node) { -return requestQueue(node).pollLast(); +NetworkClient.InFlightRequest request = requestQueue(node).pollLast(); +minTimeoutMs = null; +return request; } /** @@ -80,7 +85,9 @@ public void add(NetworkClient.InFlightRequest request) { *
[jira] [Commented] (KAFKA-7042) Fall back to the old behavior when the broker is too old to recognize LIST_OFFSET versions
[ https://issues.apache.org/jira/browse/KAFKA-7042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508815#comment-16508815 ] Guozhang Wang commented on KAFKA-7042: -- System tests. > Fall back to the old behavior when the broker is too old to recognize > LIST_OFFSET versions > -- > > Key: KAFKA-7042 > URL: https://issues.apache.org/jira/browse/KAFKA-7042 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Guozhang Wang >Priority: Major > > When READ_COMMITTED is turned on (since 0.11.0), LIST_OFFSET requires > min_version to be 2 on the consumer client side. On the other hand, if broker > is no newer than 0.10.2 it can only recognize the version of LIST_OFFSET up > to 1. In this case the consumer talking to such an old broker will throw an > exception right away. > What we can improve though, is that when the consumer realized broker does > not recognize LIST_OFFSET of at least 2, it can fall back to the old behavior > of READ_UNCOMMITTED since the data on that broker should not have any txn > markers anyways. By doing this we would lift the hard restriction that > consumers with READ_COMMITTED cannot work with an older version of broker > (remember we are trying to achieve broker compatibility since 0.10.0). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6906) Kafka Streams does not commit transactions if data is produced via wall-clock punctuation
[ https://issues.apache.org/jira/browse/KAFKA-6906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508805#comment-16508805 ] ASF GitHub Bot commented on KAFKA-6906: --- mjsax closed pull request #5105: KAFKA-6906: Fixed to commit transactions if data is produced via wall clock punctuation URL: https://github.com/apache/kafka/pull/5105 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index e2be3e29172..4cea5280f86 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -424,17 +424,22 @@ private void commitOffsets(final boolean startNewTransaction) { if (eosEnabled) { producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, applicationId); -producer.commitTransaction(); -transactionInFlight = false; -if (startNewTransaction) { -producer.beginTransaction(); -transactionInFlight = true; -} } else { consumer.commitSync(consumedOffsetsAndMetadata); } commitOffsetNeeded = false; -} else if (eosEnabled && !startNewTransaction && transactionInFlight) { // need to make sure to commit txn for suspend case +} + +if (eosEnabled) { +producer.commitTransaction(); +transactionInFlight = false; +if (startNewTransaction) { +producer.beginTransaction(); +transactionInFlight = true; +} +} + +if (eosEnabled && !startNewTransaction && transactionInFlight) { // need to make sure to commit txn for suspend case producer.commitTransaction(); transactionInFlight = false; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 5537335b221..bfbb2a00270 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -35,6 +35,7 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; @@ -913,6 +914,7 @@ public void shouldNotAbortTransactionOnDirtyClosedIfEosDisabled() { @Test public void shouldCloseProducerOnCloseWhenEosEnabled() { task = createStatelessTask(createConfig(true)); +task.initializeTopology(); task.close(true, false); task = null; @@ -1028,6 +1030,39 @@ public void shouldReturnOffsetsForRepartitionTopicsForPurging() { assertThat(map, equalTo(Collections.singletonMap(repartition, 11L))); } +@Test +public void shouldThrowOnCleanCloseTaskWhenEosEnabledIfTransactionInFlight() { +task = createStatelessTask(createConfig(true)); +try { +task.close(true, false); +fail("should have throw IllegalStateException"); +} catch (final IllegalStateException expected) { +// pass +} +task = null; + +assertTrue(producer.closed()); +} + +@Test +public void shouldAlwaysCommitIfEosEnabled() { +final RecordCollectorImpl recordCollector = new RecordCollectorImpl(producer, "StreamTask", +new LogContext("StreamTaskTest "), new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records")); + +task = createStatelessTask(createConfig(true)); +task.initializeStateStores(); +task.initializeTopology(); +task.punctuate(processorSystemTime, 5, PunctuationType.WALL_CLOCK_TIME, new Punctuator() { +@Override +public void punctuate(final long timestamp) { +recordCollector.send("result-topic1", 3, 5, null, 0, time.milliseconds(), +new IntegerSerializer(), new IntegerSerializer()); +}
[jira] [Created] (KAFKA-7043) Connect isolation whitelist does not include new primitive converters (KIP-305)
Randall Hauch created KAFKA-7043: Summary: Connect isolation whitelist does not include new primitive converters (KIP-305) Key: KAFKA-7043 URL: https://issues.apache.org/jira/browse/KAFKA-7043 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.0.0 Reporter: Randall Hauch Assignee: Randall Hauch Fix For: 2.0.0 KIP-305 added several new primitive converters, but the PR did not add them to the whitelist for the plugin isolation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions
[ https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508771#comment-16508771 ] Ismael Juma commented on KAFKA-7040: cc [~apovzner] > The replica fetcher thread may truncate accepted messages during multiple > fast leadership transitions > - > > Key: KAFKA-7040 > URL: https://issues.apache.org/jira/browse/KAFKA-7040 > Project: Kafka > Issue Type: Bug >Reporter: Lucas Wang >Priority: Minor > > Problem Statement: > Consider the scenario where there are two brokers, broker0, and broker1, and > there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as > the leader and broker0 as the follower. The following sequence of events > happened on broker0 > 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to > broker1, and awaits to get the response > 2. A LeaderAndISR request causes broker0 to become the leader for one > partition t1p0, which in turn will remove the partition t1p0 from the replica > fetcher thread > 3. Broker0 accepts some messages from a producer > 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and > broker0 to become the follower for partition t1p0. This will cause the > partition t1p0 to be added back to the replica fetcher thread on broker0. > 5. The replica fetcher thread on broker0 receives a response for the > LeaderEpoch request issued in step 1, and truncates the accepted messages in > step3. > The issue can be reproduced with the test from > https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea > [1] Initially we set up broker0 to be the follower of two partitions instead > of just one, to avoid the shutting down of the replica fetcher thread when it > becomes idle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7042) Fall back to the old behavior when the broker is too old to recognize LIST_OFFSET versions
[ https://issues.apache.org/jira/browse/KAFKA-7042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508768#comment-16508768 ] Ismael Juma commented on KAFKA-7042: Who is turning on READ_COMMITTED in this case? > Fall back to the old behavior when the broker is too old to recognize > LIST_OFFSET versions > -- > > Key: KAFKA-7042 > URL: https://issues.apache.org/jira/browse/KAFKA-7042 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Guozhang Wang >Priority: Major > > When READ_COMMITTED is turned on (since 0.11.0), LIST_OFFSET requires > min_version to be 2 on the consumer client side. On the other hand, if broker > is no newer than 0.10.2 it can only recognize the version of LIST_OFFSET up > to 1. In this case the consumer talking to such an old broker will throw an > exception right away. > What we can improve though, is that when the consumer realized broker does > not recognize LIST_OFFSET of at least 2, it can fall back to the old behavior > of READ_UNCOMMITTED since the data on that broker should not have any txn > markers anyways. By doing this we would lift the hard restriction that > consumers with READ_COMMITTED cannot work with an older version of broker > (remember we are trying to achieve broker compatibility since 0.10.0). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions
[ https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508763#comment-16508763 ] Dong Lin edited comment on KAFKA-7040 at 6/11/18 9:15 PM: -- [~luwang] Say message m0 is truncated in step 5. Is that message acked by broker1 before broker0 accepts m0 in step 3? If no, it seems that the message is produced with ack=1 and it is within Kafka's contract to truncate it. This is a known behavior when we have leadership change in Kafka. If m0 has been acked by broker1 in step 3, then broker0 should still be able to fetch it again from broker1 after step 5, right? was (Author: lindong): [~luwang] Say message m0 is truncated in step 5. Is that message acked by broker1 in before Broker0 accepts m0 in step 3? If no, it seems that the message is produced with ack=1 and it is within Kafka's contract to truncate it. This is a known behavior when we have leadership change in Kafka. If m0 has been acked by broker1 in step 3, then broker0 should still be able to fetch it again from broker1 after step 5, right? > The replica fetcher thread may truncate accepted messages during multiple > fast leadership transitions > - > > Key: KAFKA-7040 > URL: https://issues.apache.org/jira/browse/KAFKA-7040 > Project: Kafka > Issue Type: Bug >Reporter: Lucas Wang >Priority: Minor > > Problem Statement: > Consider the scenario where there are two brokers, broker0, and broker1, and > there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as > the leader and broker0 as the follower. The following sequence of events > happened on broker0 > 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to > broker1, and awaits to get the response > 2. A LeaderAndISR request causes broker0 to become the leader for one > partition t1p0, which in turn will remove the partition t1p0 from the replica > fetcher thread > 3. Broker0 accepts some messages from a producer > 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and > broker0 to become the follower for partition t1p0. This will cause the > partition t1p0 to be added back to the replica fetcher thread on broker0. > 5. The replica fetcher thread on broker0 receives a response for the > LeaderEpoch request issued in step 1, and truncates the accepted messages in > step3. > The issue can be reproduced with the test from > https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea > [1] Initially we set up broker0 to be the follower of two partitions instead > of just one, to avoid the shutting down of the replica fetcher thread when it > becomes idle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions
[ https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508763#comment-16508763 ] Dong Lin commented on KAFKA-7040: - [~luwang] Say message m0 is truncated in step 5. Is that message acked by broker1 in before Broker0 accepts m0 in step 3? If no, it seems that the message is produced with ack=1 and it is within Kafka's contract to truncate it. This is a known behavior when we have leadership change in Kafka. If m0 has been acked by broker1 in step 3, then broker0 should still be able to fetch it again after broker1 after step 5, right? > The replica fetcher thread may truncate accepted messages during multiple > fast leadership transitions > - > > Key: KAFKA-7040 > URL: https://issues.apache.org/jira/browse/KAFKA-7040 > Project: Kafka > Issue Type: Bug >Reporter: Lucas Wang >Priority: Minor > > Problem Statement: > Consider the scenario where there are two brokers, broker0, and broker1, and > there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as > the leader and broker0 as the follower. The following sequence of events > happened on broker0 > 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to > broker1, and awaits to get the response > 2. A LeaderAndISR request causes broker0 to become the leader for one > partition t1p0, which in turn will remove the partition t1p0 from the replica > fetcher thread > 3. Broker0 accepts some messages from a producer > 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and > broker0 to become the follower for partition t1p0. This will cause the > partition t1p0 to be added back to the replica fetcher thread on broker0. > 5. The replica fetcher thread on broker0 receives a response for the > LeaderEpoch request issued in step 1, and truncates the accepted messages in > step3. > The issue can be reproduced with the test from > https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea > [1] Initially we set up broker0 to be the follower of two partitions instead > of just one, to avoid the shutting down of the replica fetcher thread when it > becomes idle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions
[ https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508763#comment-16508763 ] Dong Lin edited comment on KAFKA-7040 at 6/11/18 9:14 PM: -- [~luwang] Say message m0 is truncated in step 5. Is that message acked by broker1 in before Broker0 accepts m0 in step 3? If no, it seems that the message is produced with ack=1 and it is within Kafka's contract to truncate it. This is a known behavior when we have leadership change in Kafka. If m0 has been acked by broker1 in step 3, then broker0 should still be able to fetch it again after broker1 after step 5, right? was (Author: lindong): [~luwang] Say message m0 is truncated in step 5. Is that message acked by broker1 in before Broker0 accepts m0 in step 3? If no, it seems that the message is produced with ack=1 and it is within Kafka's contract to truncate it. This is a known behavior when we have leadership change in Kafka. If m0 has been acked by broker1 in step 3, then broker0 should still be able to fetch it again after broker1 after step 5, right? > The replica fetcher thread may truncate accepted messages during multiple > fast leadership transitions > - > > Key: KAFKA-7040 > URL: https://issues.apache.org/jira/browse/KAFKA-7040 > Project: Kafka > Issue Type: Bug >Reporter: Lucas Wang >Priority: Minor > > Problem Statement: > Consider the scenario where there are two brokers, broker0, and broker1, and > there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as > the leader and broker0 as the follower. The following sequence of events > happened on broker0 > 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to > broker1, and awaits to get the response > 2. A LeaderAndISR request causes broker0 to become the leader for one > partition t1p0, which in turn will remove the partition t1p0 from the replica > fetcher thread > 3. Broker0 accepts some messages from a producer > 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and > broker0 to become the follower for partition t1p0. This will cause the > partition t1p0 to be added back to the replica fetcher thread on broker0. > 5. The replica fetcher thread on broker0 receives a response for the > LeaderEpoch request issued in step 1, and truncates the accepted messages in > step3. > The issue can be reproduced with the test from > https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea > [1] Initially we set up broker0 to be the follower of two partitions instead > of just one, to avoid the shutting down of the replica fetcher thread when it > becomes idle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions
[ https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508763#comment-16508763 ] Dong Lin edited comment on KAFKA-7040 at 6/11/18 9:14 PM: -- [~luwang] Say message m0 is truncated in step 5. Is that message acked by broker1 in before Broker0 accepts m0 in step 3? If no, it seems that the message is produced with ack=1 and it is within Kafka's contract to truncate it. This is a known behavior when we have leadership change in Kafka. If m0 has been acked by broker1 in step 3, then broker0 should still be able to fetch it again from broker1 after step 5, right? was (Author: lindong): [~luwang] Say message m0 is truncated in step 5. Is that message acked by broker1 in before Broker0 accepts m0 in step 3? If no, it seems that the message is produced with ack=1 and it is within Kafka's contract to truncate it. This is a known behavior when we have leadership change in Kafka. If m0 has been acked by broker1 in step 3, then broker0 should still be able to fetch it again after broker1 after step 5, right? > The replica fetcher thread may truncate accepted messages during multiple > fast leadership transitions > - > > Key: KAFKA-7040 > URL: https://issues.apache.org/jira/browse/KAFKA-7040 > Project: Kafka > Issue Type: Bug >Reporter: Lucas Wang >Priority: Minor > > Problem Statement: > Consider the scenario where there are two brokers, broker0, and broker1, and > there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as > the leader and broker0 as the follower. The following sequence of events > happened on broker0 > 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to > broker1, and awaits to get the response > 2. A LeaderAndISR request causes broker0 to become the leader for one > partition t1p0, which in turn will remove the partition t1p0 from the replica > fetcher thread > 3. Broker0 accepts some messages from a producer > 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and > broker0 to become the follower for partition t1p0. This will cause the > partition t1p0 to be added back to the replica fetcher thread on broker0. > 5. The replica fetcher thread on broker0 receives a response for the > LeaderEpoch request issued in step 1, and truncates the accepted messages in > step3. > The issue can be reproduced with the test from > https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea > [1] Initially we set up broker0 to be the follower of two partitions instead > of just one, to avoid the shutting down of the replica fetcher thread when it > becomes idle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7042) Fall back to the old behavior when the broker is too old to recognize LIST_OFFSET versions
[ https://issues.apache.org/jira/browse/KAFKA-7042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508761#comment-16508761 ] Guozhang Wang commented on KAFKA-7042: -- cc [~hachikuji] > Fall back to the old behavior when the broker is too old to recognize > LIST_OFFSET versions > -- > > Key: KAFKA-7042 > URL: https://issues.apache.org/jira/browse/KAFKA-7042 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Guozhang Wang >Priority: Major > > When READ_COMMITTED is turned on (since 0.11.0), LIST_OFFSET requires > min_version to be 2 on the consumer client side. On the other hand, if broker > is no newer than 0.10.2 it can only recognize the version of LIST_OFFSET up > to 1. In this case the consumer talking to such an old broker will throw an > exception right away. > What we can improve though, is that when the consumer realized broker does > not recognize LIST_OFFSET of at least 2, it can fall back to the old behavior > of READ_UNCOMMITTED since the data on that broker should not have any txn > markers anyways. By doing this we would lift the hard restriction that > consumers with READ_COMMITTED cannot work with an older version of broker > (remember we are trying to achieve broker compatibility since 0.10.0). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7042) Fall back to the old behavior when the broker is too old to recognize LIST_OFFSET versions
Guozhang Wang created KAFKA-7042: Summary: Fall back to the old behavior when the broker is too old to recognize LIST_OFFSET versions Key: KAFKA-7042 URL: https://issues.apache.org/jira/browse/KAFKA-7042 Project: Kafka Issue Type: Improvement Components: consumer Reporter: Guozhang Wang When READ_COMMITTED is turned on (since 0.11.0), LIST_OFFSET requires min_version to be 2 on the consumer client side. On the other hand, if broker is no newer than 0.10.2 it can only recognize the version of LIST_OFFSET up to 1. In this case the consumer talking to such an old broker will throw an exception right away. What we can improve though, is that when the consumer realized broker does not recognize LIST_OFFSET of at least 2, it can fall back to the old behavior of READ_UNCOMMITTED since the data on that broker should not have any txn markers anyways. By doing this we would lift the hard restriction that consumers with READ_COMMITTED cannot work with an older version of broker (remember we are trying to achieve broker compatibility since 0.10.0). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7024) Rocksdb state directory should be created before opening the DB
[ https://issues.apache.org/jira/browse/KAFKA-7024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-7024: - Labels: user-experience (was: ) > Rocksdb state directory should be created before opening the DB > --- > > Key: KAFKA-7024 > URL: https://issues.apache.org/jira/browse/KAFKA-7024 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Abhishek Agarwal >Priority: Minor > Labels: user-experience > > After enabling rocksDB logging, We continually see these errors in kafka > stream logs, everytime a new window segment is created > ``` > Error when reading > ``` > While its not a problem in itself, since rocksDB internally will create the > directory but It will do so only after logging the above error. It would > avoid unnecessary logging if the segment directory can be created in advance. > Right now, only the parent directories are created for a rocksDB segment. > Logging is more prominent when there are many partitions and segment size is > smaller (minute or two). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7041) Using RocksDB bulk loading for StandbyTasks
Matthias J. Sax created KAFKA-7041: -- Summary: Using RocksDB bulk loading for StandbyTasks Key: KAFKA-7041 URL: https://issues.apache.org/jira/browse/KAFKA-7041 Project: Kafka Issue Type: Improvement Components: streams Reporter: Matthias J. Sax In KAFKA-5363 we introduced RocksDB bulk loading to speed up store recovery. We could do the same optimization for StandbyTasks to make them more efficient and to reduce the likelihood that StandbyTasks lag behind. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7037) delete topic command replaces '+' from the topic name which leads incorrect topic deletion
[ https://issues.apache.org/jira/browse/KAFKA-7037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508735#comment-16508735 ] ASF GitHub Bot commented on KAFKA-7037: --- vahidhashemian opened a new pull request #5193: KAFKA-7037: Escape regex symbol in topic name for topic operations URL: https://github.com/apache/kafka/pull/5193 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > delete topic command replaces '+' from the topic name which leads incorrect > topic deletion > -- > > Key: KAFKA-7037 > URL: https://issues.apache.org/jira/browse/KAFKA-7037 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.0, 1.0.0 >Reporter: Sandeep Nemuri >Assignee: Vahid Hashemian >Priority: Major > > While executing a delete command kafka cli tool is removing the "+" symbol > and deleting the incorrect topic. In below case if _"*test+topic"*_ is > deleted kafka deletes _*testtopic.*_ > {code:java} > [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh > --create --zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 > --topic testtopic > Created topic "testtopic". > [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh > --zookeeper `hostname`:2181 --topic test+topic --delete > Topic testtopic is marked for deletion.{code} > delete topic replaces '+' from the topic name -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7023) Kafka Streams RocksDB bulk loading config may not be honored with customized RocksDBConfigSetter
[ https://issues.apache.org/jira/browse/KAFKA-7023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508694#comment-16508694 ] ASF GitHub Bot commented on KAFKA-7023: --- guozhangwang closed pull request #5166: KAFKA-7023: Move prepareForBulkLoad() call after customized RocksDBConfigSetter URL: https://github.com/apache/kafka/pull/5166 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index cfef035a4fd..6084ecbf1e0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -130,10 +130,6 @@ public void openDB(final ProcessorContext context) { // (this could be a bug in the RocksDB code and their devs have been contacted). options.setIncreaseParallelism(Math.max(Runtime.getRuntime().availableProcessors(), 2)); -if (prepareForBulkload) { -options.prepareForBulkLoad(); -} - wOptions = new WriteOptions(); wOptions.setDisableWAL(true); @@ -148,6 +144,11 @@ public void openDB(final ProcessorContext context) { final RocksDBConfigSetter configSetter = Utils.newInstance(configSetterClass); configSetter.setConfig(name, options, configs); } + +if (prepareForBulkload) { +options.prepareForBulkLoad(); +} + this.dbDir = new File(new File(context.stateDir(), parentDir), this.name); try { This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka Streams RocksDB bulk loading config may not be honored with customized > RocksDBConfigSetter > - > > Key: KAFKA-7023 > URL: https://issues.apache.org/jira/browse/KAFKA-7023 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Liquan Pei >Assignee: Liquan Pei >Priority: Major > Original Estimate: 24h > Remaining Estimate: 24h > > We observed frequent L0 -> L1 compaction during Kafka Streams state recovery. > Some sample log: > {code:java} > 2018/06/08-00:04:50.892331 7f8a6d7fa700 (Original Log Time > 2018/06/08-00:04:50.892298) [db/compaction_picker_universal.cc:270] [default] > Universal: sorted runs files(6): files[3 0 0 0 1 1 38] max score 1.00 > 2018/06/08-00:04:50.892336 7f8a6d7fa700 (Original Log Time > 2018/06/08-00:04:50.892300) [db/compaction_picker_universal.cc:655] [default] > Universal: First candidate file 134[0] to reduce size amp. > 2018/06/08-00:04:50.892338 7f8a6d7fa700 (Original Log Time > 2018/06/08-00:04:50.892302) [db/compaction_picker_universal.cc:686] [default] > Universal: size amp not needed. newer-files-total-size 13023497 > earliest-file-size 2541530372 > 2018/06/08-00:04:50.892339 7f8a6d7fa700 (Original Log Time > 2018/06/08-00:04:50.892303) [db/compaction_picker_universal.cc:473] [default] > Universal: Possible candidate file 134[0]. > 2018/06/08-00:04:50.892341 7f8a6d7fa700 (Original Log Time > 2018/06/08-00:04:50.892304) [db/compaction_picker_universal.cc:525] [default] > Universal: Skipping file 134[0] with size 1007 (compensated size 1287) > 2018/06/08-00:04:50.892343 7f8a6d7fa700 (Original Log Time > 2018/06/08-00:04:50.892306) [db/compaction_picker_universal.cc:473] [default] > Universal: Possible candidate file 133[1]. > 2018/06/08-00:04:50.892344 7f8a6d7fa700 (Original Log Time > 2018/06/08-00:04:50.892307) [db/compaction_picker_universal.cc:525] [default] > Universal: Skipping file 133[1] with size 4644 (compensated size 16124) > 2018/06/08-00:04:50.892346 7f8a6d7fa700 (Original Log Time > 2018/06/08-00:04:50.892307) [db/compaction_picker_universal.cc:473] [default] > Universal: Possible candidate file 126[2]. > 2018/06/08-00:04:50.892348 7f8a6d7fa700 (Original Log Time > 2018/06/08-00:04:50.892308) [db/compaction_picker_universal.cc:525] [default] > Universal: Skipping file 126[2] with size 319764 (compensated size 319764) > 2018/06/08-00:04:50.892349 7f8a6d7fa700 (Original Log Time > 2018/06/08-00:04:50.892309) [db/compaction_picker_universal.cc:473] [default] >
[jira] [Commented] (KAFKA-6990) CommitFailedException; this task may be no longer owned by the thread
[ https://issues.apache.org/jira/browse/KAFKA-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508693#comment-16508693 ] Matthias J. Sax commented on KAFKA-6990: There are two thread one for heartbeats and one for processing. The heartbeat thread is subject to heartbeat interval and session.timeout.ms – as you have session timeout with 30 seconds and heartbeat are sent every 5 second, you would need to miss 6 consecutive heartbeat to drop out of the group. For the main processing thread, max.poll.interval.ms is the timeout, ie, poll() must be called every 20 minutes in your case – this should actually be sufficient. Even if you wait max 5 seconds for MongoDB to response, you have maximum 50 records to process and thus, it should not take longer than 250 seconds until poll() is called again... Really hard to say what it happening. Let's see if Kafka Streams/Consumer logs reveal the issue. > CommitFailedException; this task may be no longer owned by the thread > - > > Key: KAFKA-6990 > URL: https://issues.apache.org/jira/browse/KAFKA-6990 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 >Reporter: Eugen Feller >Priority: Blocker > > We are seeing a lot of CommitFailedExceptions on one of our Kafka stream > apps. Running Kafka Streams 0.11.0.1 and Kafka Broker 0.10.2.1. Full error > message: > {code:java} > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] INFO > org.apache.kafka.streams.KafkaStreams - stream-client > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d] State transition from > REBALANCING to RUNNING. > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.StreamTask - task [0_0] Failed > offset commits > {sightings_sighting_byclientmac_0-0=OffsetAndMetadata{offset=11, > metadata=''}, > mactocontactmappings_contactmapping_byclientmac_0-0=OffsetAndMetadata{offset=26, > metadata=''}} due to CommitFailedException > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit > stream task 0_0 during commit state due to CommitFailedException; this task > may be no longer owned by the thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit > stream task 0_1 during commit state due to CommitFailedException; this task > may be no longer owned by the thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.StreamTask - task [0_1] Failed > offset commits > {mactocontactmappings_contactmapping_byclientmac_0-1=OffsetAndMetadata{offset=38, > metadata=''}, sightings_sighting_byclientmac_0-1=OffsetAndMetadata{offset=1, > metadata=''}} due to CommitFailedException > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.StreamTask - task [0_2] Failed > offset commits > {mactocontactmappings_contactmapping_byclientmac_0-2=OffsetAndMetadata{offset=8, > metadata=''}, > sightings_sighting_byclientmac_0-2=OffsetAndMetadata{offset=24, metadata=''}} > due to CommitFailedException > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit > stream task 0_2 during commit state due to CommitFailedException; this task > may be no longer owned by the thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.StreamTask - task [0_3] Failed > offset commits > {sightings_sighting_byclientmac_0-3=OffsetAndMetadata{offset=21, > metadata=''}, > mactocontactmappings_contactmapping_byclientmac_0-3=OffsetAndMetadata{offset=102, > metadata=''}} due to CommitFailedException > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit > stream task 0_3 during commit state due to CommitFailedException; this task > may be no longer owned by the thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.StreamTask - task [0_4] Failed > offset commits > {sightings_sighting_byclientmac_0-4=OffsetAndMetadata{offset=5, metadata=''}, >
[jira] [Resolved] (KAFKA-7005) Remove duplicate Java Resource class.
[ https://issues.apache.org/jira/browse/KAFKA-7005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-7005. Resolution: Fixed merged the PR to trunk and 2.0 branch. > Remove duplicate Java Resource class. > - > > Key: KAFKA-7005 > URL: https://issues.apache.org/jira/browse/KAFKA-7005 > Project: Kafka > Issue Type: Sub-task > Components: core, security >Reporter: Andy Coates >Assignee: Andy Coates >Priority: Major > Fix For: 2.0.0 > > > Relating to one of the outstanding work items in PR > [#5117|[https://github.com/apache/kafka/pull/5117]...] > The o.a.k.c.request.Resource class could be dropped in favour of > o.a.k.c..config.ConfigResource. > This will remove the duplication of `Resource` classes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7030) Add configuration to disable message down-conversion
[ https://issues.apache.org/jira/browse/KAFKA-7030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508688#comment-16508688 ] ASF GitHub Bot commented on KAFKA-7030: --- dhruvilshah3 opened a new pull request #5192: KAFKA-7030: Add configuration to disable message down-conversion (KIP-283) URL: https://github.com/apache/kafka/pull/5192 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add configuration to disable message down-conversion > > > Key: KAFKA-7030 > URL: https://issues.apache.org/jira/browse/KAFKA-7030 > Project: Kafka > Issue Type: Sub-task >Reporter: Dhruvil Shah >Assignee: Dhruvil Shah >Priority: Major > > Add configuration to disable message down-conversion as described in > [KIP-283|https://cwiki.apache.org/confluence/display/KAFKA/KIP-283%3A+Efficient+Memory+Usage+for+Down-Conversion]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7005) Remove duplicate Java Resource class.
[ https://issues.apache.org/jira/browse/KAFKA-7005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508684#comment-16508684 ] ASF GitHub Bot commented on KAFKA-7005: --- junrao closed pull request #5184: KAFKA-7005: Remove duplicate resource class. URL: https://github.com/apache/kafka/pull/5184 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 450de06fcd1..495095a9276 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -112,8 +112,6 @@ import org.apache.kafka.common.requests.OffsetFetchResponse; import org.apache.kafka.common.requests.RenewDelegationTokenRequest; import org.apache.kafka.common.requests.RenewDelegationTokenResponse; -import org.apache.kafka.common.requests.Resource; -import org.apache.kafka.common.requests.ResourceType; import org.apache.kafka.common.security.token.delegation.DelegationToken; import org.apache.kafka.common.security.token.delegation.TokenInformation; import org.apache.kafka.common.utils.AppInfoParser; @@ -1683,19 +1681,19 @@ public DescribeConfigsResult describeConfigs(Collection configRe // The BROKER resources which we want to describe. We must make a separate DescribeConfigs // request for every BROKER resource we want to describe. -final Collection brokerResources = new ArrayList<>(); +final Collection brokerResources = new ArrayList<>(); // The non-BROKER resources which we want to describe. These resources can be described by a // single, unified DescribeConfigs request. -final Collection unifiedRequestResources = new ArrayList<>(configResources.size()); +final Collection unifiedRequestResources = new ArrayList<>(configResources.size()); for (ConfigResource resource : configResources) { if (resource.type() == ConfigResource.Type.BROKER && !resource.isDefault()) { -brokerFutures.put(resource, new KafkaFutureImpl()); -brokerResources.add(configResourceToResource(resource)); +brokerFutures.put(resource, new KafkaFutureImpl<>()); +brokerResources.add(resource); } else { -unifiedRequestFutures.put(resource, new KafkaFutureImpl()); - unifiedRequestResources.add(configResourceToResource(resource)); +unifiedRequestFutures.put(resource, new KafkaFutureImpl<>()); +unifiedRequestResources.add(resource); } } @@ -1716,7 +1714,7 @@ void handleResponse(AbstractResponse abstractResponse) { for (Map.Entry> entry : unifiedRequestFutures.entrySet()) { ConfigResource configResource = entry.getKey(); KafkaFutureImpl future = entry.getValue(); -DescribeConfigsResponse.Config config = response.config(configResourceToResource(configResource)); +DescribeConfigsResponse.Config config = response.config(configResource); if (config == null) { future.completeExceptionally(new UnknownServerException( "Malformed broker response: missing config for " + configResource)); @@ -1746,7 +1744,7 @@ void handleFailure(Throwable throwable) { for (Map.Entry> entry : brokerFutures.entrySet()) { final KafkaFutureImpl brokerFuture = entry.getValue(); -final Resource resource = configResourceToResource(entry.getKey()); +final ConfigResource resource = entry.getKey(); final int nodeId = Integer.parseInt(resource.name()); runnable.call(new Call("describeBrokerConfigs", calcDeadlineMs(now, options.timeoutMs()), new ConstantNodeIdProvider(nodeId)) { @@ -1792,21 +1790,6 @@ void handleFailure(Throwable throwable) { return new DescribeConfigsResult(allFutures); } -private Resource configResourceToResource(ConfigResource configResource) { -ResourceType resourceType; -switch (configResource.type()) { -case TOPIC: -resourceType = ResourceType.TOPIC; -break; -case BROKER: -resourceType = ResourceType.BROKER; -break; -default: -throw new IllegalArgumentException("Unexpected resource type " + configResource.type()); -
[jira] [Commented] (KAFKA-6977) Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 while fetching data
[ https://issues.apache.org/jira/browse/KAFKA-6977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508683#comment-16508683 ] Matthias J. Sax commented on KAFKA-6977: Hard to tell where the corruption occurs. I could be on the wire... Maybe [~hachikuji] can help out here. It does not seem to be a Kafka Streams issues. Did you use a 0.11.0 or 0.10.2 consumer? Did the consumer run in the same environment (ie, AWS ECS as mentioned in the description)? Can you consumer the data with Kafka Streams in a different environment? > Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 > while fetching data > - > > Key: KAFKA-6977 > URL: https://issues.apache.org/jira/browse/KAFKA-6977 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 >Reporter: Eugen Feller >Priority: Blocker > Labels: streams > > We are running Kafka Streams 0.11.0.1 with Kafka Broker 0.10.2.1 and > constantly run into the following exception: > {code:java} > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > partition assignment took 40 ms. > current active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, 0_15, > 0_17, 0_20, 0_21, 0_23, 0_25, 0_28, 0_2, 0_6, 0_7, 0_8, 0_9, 0_11, 0_16, > 0_18, 0_19, 0_22, 0_24, 0_26, 0_27, 0_29, 0_30, 0_31] > current standby tasks: [] > previous active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, > 0_15, 0_17, 0_20, 0_21, 0_23, 0_25, 0_28] > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > State transition from PARTITIONS_ASSIGNED to RUNNING. > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > INFO org.apache.kafka.streams.KafkaStreams - stream-client > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e] State > transition from REBALANCING to RUNNING. > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > ERROR org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > Encountered the following error during processing: > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > at > org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:886) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1086) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490) > java.lang.IllegalStateException: Unexpected error code 2 while fetching data > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > Shutting down > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > State transition from RUNNING to PENDING_SHUTDOWN. > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka > producer with timeoutMillis = 9223372036854775807 ms. > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > Stream thread shutdown complete > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > State transition from PENDING_SHUTDOWN to DEAD. >
[jira] [Created] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions
Lucas Wang created KAFKA-7040: - Summary: The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions Key: KAFKA-7040 URL: https://issues.apache.org/jira/browse/KAFKA-7040 Project: Kafka Issue Type: Bug Reporter: Lucas Wang Problem Statement: Consider the scenario where there are two brokers, broker0, and broker1, and there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as the leader and broker0 as the follower. The following sequence of events happened on broker0 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to broker1, and awaits to get the response 2. A LeaderAndISR request causes broker0 to become the leader for one partition t1p0, which in turn will remove the partition t1p0 from the replica fetcher thread 3. Broker0 accepts some messages from a producer 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and broker0 to become the follower for partition t1p0. This will cause the partition t1p0 to be added back to the replica fetcher thread on broker0. 5. The replica fetcher thread on broker0 receives a response for the LeaderEpoch request issued in step 1, and truncates the accepted messages in step3. The issue can be reproduced with the test from https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea [1] Initially we set up broker0 to be the follower of two partitions instead of just one, to avoid the shutting down of the replica fetcher thread when it becomes idle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6977) Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 while fetching data
[ https://issues.apache.org/jira/browse/KAFKA-6977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508661#comment-16508661 ] Eugen Feller commented on KAFKA-6977: - Hi [~mjsax]. Interesting. Thanks a lot. I think the topic itself is likely consumed by multiple downstream consumers. However, only this job actually consumes it for the purposes of writing out to a database. Just tested consuming the topic with plain KafkaConsumer and it works. I also went over all the stack traces seen in that context and found the following: {code:java} KafkaException: Record for partition our_stats_topic_0-17 at offset 217641273 is invalid, cause: Record is corrupt (stored crc = 3302026163, computed crc = 3432237873) 1 at maybeEnsureValid (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1002) 2 at nextFetchedRecord (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1059) 3 at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1090) 4 at access$1200 (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:944) 5 at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:567) 6 at fetchedRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:528) 7 at pollOnce (org.apache.kafka.clients.consumer.KafkaConsumer.java:1086) 8 at poll (org.apache.kafka.clients.consumer.KafkaConsumer.java:1043) 9 at pollRequests (org.apache.kafka.streams.processor.internals.StreamThread.java:536) 10 at runOnce (org.apache.kafka.streams.processor.internals.StreamThread.java:490) 11 at runLoop (org.apache.kafka.streams.processor.internals.StreamThread.java:480) 12 at run (org.apache.kafka.streams.processor.internals.StreamThread.java:457) KafkaException: Received exception when fetching the next record from our_stats_topic_0-17. If needed, please seek past the record to continue consumption. 1 at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1110) 2 at access$1200 (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:944) 3 at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:567) 4 at fetchedRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:528) 5 at pollOnce (org.apache.kafka.clients.consumer.KafkaConsumer.java:1086) 6 at poll (org.apache.kafka.clients.consumer.KafkaConsumer.java:1043) 7 at pollRequests (org.apache.kafka.streams.processor.internals.StreamThread.java:536) 8 at runOnce (org.apache.kafka.streams.processor.internals.StreamThread.java:490) 9 at runLoop (org.apache.kafka.streams.processor.internals.StreamThread.java:480) 10 at run (org.apache.kafka.streams.processor.internals.StreamThread.java:457) {code} In this particular instance I think the following stack trace was seen at the same time with error code 2: {code:java} CorruptRecordException: Record size is less than the minimum record overhead (14) KafkaException: Received exception when fetching the next record from our_stats_topic_0-16. If needed, please seek past the record to continue consumption. 1 at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1076) 2 at access$1200 (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:944) 3 at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:567) 4 at fetchedRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:528) 5 at pollOnce (org.apache.kafka.clients.consumer.KafkaConsumer.java:1086) 6 at poll (org.apache.kafka.clients.consumer.KafkaConsumer.java:1043) 7 at pollRequests (org.apache.kafka.streams.processor.internals.StreamThread.java:536) 8 at runOnce (org.apache.kafka.streams.processor.internals.StreamThread.java:490) 9 at runLoop (org.apache.kafka.streams.processor.internals.StreamThread.java:480) 10 at run (org.apache.kafka.streams.processor.internals.StreamThread.java:457) {code} I am wondering what could have lead to error code 2 and maybe the above ConcurrentRecordException and what would be the best way to mitigate them? Do the records get corrupt on the wire somehow? > Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 > while fetching data > - > > Key: KAFKA-6977 > URL: https://issues.apache.org/jira/browse/KAFKA-6977 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 >Reporter: Eugen Feller >Priority: Blocker > Labels: streams > > We are running Kafka Streams 0.11.0.1 with Kafka Broker 0.10.2.1 and > constantly run into the following exception: > {code:java} > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > INFO
[jira] [Comment Edited] (KAFKA-6977) Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 while fetching data
[ https://issues.apache.org/jira/browse/KAFKA-6977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508661#comment-16508661 ] Eugen Feller edited comment on KAFKA-6977 at 6/11/18 8:13 PM: -- Hi [~mjsax]. Interesting. Thanks a lot. I think the topic itself is likely consumed by multiple downstream consumers. However, only this job actually consumes it for the purposes of writing out to a database. Just tested consuming the topic with plain KafkaConsumer and it works. I also went over all the stack traces seen in that context and found the following: {code:java} KafkaException: Record for partition our_stats_topic_0-17 at offset 217641273 is invalid, cause: Record is corrupt (stored crc = 3302026163, computed crc = 3432237873) 1 at maybeEnsureValid (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1002) 2 at nextFetchedRecord (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1059) 3 at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1090) 4 at access$1200 (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:944) 5 at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:567) 6 at fetchedRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:528) 7 at pollOnce (org.apache.kafka.clients.consumer.KafkaConsumer.java:1086) 8 at poll (org.apache.kafka.clients.consumer.KafkaConsumer.java:1043) 9 at pollRequests (org.apache.kafka.streams.processor.internals.StreamThread.java:536) 10 at runOnce (org.apache.kafka.streams.processor.internals.StreamThread.java:490) 11 at runLoop (org.apache.kafka.streams.processor.internals.StreamThread.java:480) 12 at run (org.apache.kafka.streams.processor.internals.StreamThread.java:457) KafkaException: Received exception when fetching the next record from our_stats_topic_0-17. If needed, please seek past the record to continue consumption. 1 at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1110) 2 at access$1200 (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:944) 3 at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:567) 4 at fetchedRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:528) 5 at pollOnce (org.apache.kafka.clients.consumer.KafkaConsumer.java:1086) 6 at poll (org.apache.kafka.clients.consumer.KafkaConsumer.java:1043) 7 at pollRequests (org.apache.kafka.streams.processor.internals.StreamThread.java:536) 8 at runOnce (org.apache.kafka.streams.processor.internals.StreamThread.java:490) 9 at runLoop (org.apache.kafka.streams.processor.internals.StreamThread.java:480) 10 at run (org.apache.kafka.streams.processor.internals.StreamThread.java:457) {code} In this particular instance I think the following stack trace was seen at the same time with error code 2: {code:java} CorruptRecordException: Record size is less than the minimum record overhead (14) KafkaException: Received exception when fetching the next record from our_stats_topic_0-16. If needed, please seek past the record to continue consumption. 1 at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1076) 2 at access$1200 (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:944) 3 at fetchRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:567) 4 at fetchedRecords (org.apache.kafka.clients.consumer.internals.Fetcher.java:528) 5 at pollOnce (org.apache.kafka.clients.consumer.KafkaConsumer.java:1086) 6 at poll (org.apache.kafka.clients.consumer.KafkaConsumer.java:1043) 7 at pollRequests (org.apache.kafka.streams.processor.internals.StreamThread.java:536) 8 at runOnce (org.apache.kafka.streams.processor.internals.StreamThread.java:490) 9 at runLoop (org.apache.kafka.streams.processor.internals.StreamThread.java:480) 10 at run (org.apache.kafka.streams.processor.internals.StreamThread.java:457) {code} I am wondering what could have lead to error code 2 and maybe the above ConcurrentRecordException and what would be the best way to mitigate them? Do the records get corrupt on the wire somehow? was (Author: efeller): Hi [~mjsax]. Interesting. Thanks a lot. I think the topic itself is likely consumed by multiple downstream consumers. However, only this job actually consumes it for the purposes of writing out to a database. Just tested consuming the topic with plain KafkaConsumer and it works. I also went over all the stack traces seen in that context and found the following: {code:java} KafkaException: Record for partition our_stats_topic_0-17 at offset 217641273 is invalid, cause: Record is corrupt (stored crc = 3302026163, computed crc = 3432237873) 1 at maybeEnsureValid (org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.java:1002) 2 at nextFetchedRecord
[jira] [Comment Edited] (KAFKA-6990) CommitFailedException; this task may be no longer owned by the thread
[ https://issues.apache.org/jira/browse/KAFKA-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508566#comment-16508566 ] Eugen Feller edited comment on KAFKA-6990 at 6/11/18 7:11 PM: -- Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. My current defaults are as follows: {code:java} requestTimeout: Duration = 30 minutes, maxPollInterval: Duration = 20 minutes, maxPollRecords: Long = 50, fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES, fetchMaxWait: Duration = 10 minutes, sessionTimeout: Duration = 30 seconds, heartbeatInterval: Duration = 5 seconds, stateDirCleanupDelay: Long = Long.MaxValue, numStreamThreads: Long = 1, numStandbyReplicas: Long = 0, cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code} Should I further increase the max.poll.interval.ms (currently 20 minutes)? This job implements the following logic: {code:java} stream().foreach(record => { // Buffered write this record to MongoDB // Await.result(record, 5 seconds) }){code} Can it be that writes to Mongo take too long and that leads to problems? I was under the impression that by setting heartbeat interval to 5 seconds in Kafka 0.11.0.1, we should have two threads, one that does heartbeat sending (in order for the consumer to considered alive) and one that actually calls poll(). In that case blocking too long inside foreach should not kick us out of the consumer group? I will try to collect DEBUG level logs today. was (Author: efeller): Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. My current defaults are as follows: {code:java} requestTimeout: Duration = 30 minutes, maxPollInterval: Duration = 20 minutes, maxPollRecords: Long = 50, fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES, fetchMaxWait: Duration = 10 minutes, sessionTimeout: Duration = 30 seconds, heartbeatInterval: Duration = 5 seconds, stateDirCleanupDelay: Long = Long.MaxValue, numStreamThreads: Long = 1, numStandbyReplicas: Long = 0, cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code} Should I further increase the max.poll.interval.ms (currently 20 minutes)? This job implements the following logic: {code:java} stream().foreach(record => { // Buffered write this record to MongoDB // Await.result(record, 5 seconds) }){code} Can it be that writes to Mongo take too long and that leads to problems? I was under the impression that by setting heartbeat interval to 5 seconds in Kafka 0.11.0.1, we should have two threads, one that does heartbeat sending (to considered alive) and one that actually calls poll(). In that case blocking too long inside foreach should not kick us out of the consumer group? I will try to collect DEBUG level logs today. > CommitFailedException; this task may be no longer owned by the thread > - > > Key: KAFKA-6990 > URL: https://issues.apache.org/jira/browse/KAFKA-6990 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 >Reporter: Eugen Feller >Priority: Blocker > > We are seeing a lot of CommitFailedExceptions on one of our Kafka stream > apps. Running Kafka Streams 0.11.0.1 and Kafka Broker 0.10.2.1. Full error > message: > {code:java} > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] INFO > org.apache.kafka.streams.KafkaStreams - stream-client > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d] State transition from > REBALANCING to RUNNING. > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.StreamTask - task [0_0] Failed > offset commits > {sightings_sighting_byclientmac_0-0=OffsetAndMetadata{offset=11, > metadata=''}, > mactocontactmappings_contactmapping_byclientmac_0-0=OffsetAndMetadata{offset=26, > metadata=''}} due to CommitFailedException > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit > stream task 0_0 during commit state due to CommitFailedException; this task > may be no longer owned by the thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit > stream task 0_1 during commit state due to CommitFailedException; this task > may be no longer owned by the thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.StreamTask - task [0_1] Failed > offset commits > {mactocontactmappings_contactmapping_byclientmac_0-1=OffsetAndMetadata{offset=38,
[jira] [Comment Edited] (KAFKA-6990) CommitFailedException; this task may be no longer owned by the thread
[ https://issues.apache.org/jira/browse/KAFKA-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508566#comment-16508566 ] Eugen Feller edited comment on KAFKA-6990 at 6/11/18 7:10 PM: -- Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. My current defaults are as follows: {code:java} requestTimeout: Duration = 30 minutes, maxPollInterval: Duration = 20 minutes, maxPollRecords: Long = 50, fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES, fetchMaxWait: Duration = 10 minutes, sessionTimeout: Duration = 30 seconds, heartbeatInterval: Duration = 5 seconds, stateDirCleanupDelay: Long = Long.MaxValue, numStreamThreads: Long = 1, numStandbyReplicas: Long = 0, cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code} Should I further increase the max.poll.interval.ms (currently 20 minutes)? This job implements the following logic: {code:java} stream().foreach(record => { // Buffered write this record to MongoDB // Await.result(record, 5 seconds) }){code} Can it be that writes to Mongo take too long and that leads to problems? I was under the impression that by setting heartbeat interval to 5 seconds in Kafka 0.11.0.1, we should have two threads, one that does heartbeat sending (to considered alive) and one that actually calls poll(). In that case blocking too long inside foreach should not kick us out of the consumer group? I will try to collect DEBUG level logs today. was (Author: efeller): Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. My current defaults are as follows: {code:java} requestTimeout: Duration = 30 minutes, maxPollInterval: Duration = 20 minutes, maxPollRecords: Long = 50, fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES, fetchMaxWait: Duration = 10 minutes, sessionTimeout: Duration = 30 seconds, heartbeatInterval: Duration = 5 seconds, stateDirCleanupDelay: Long = Long.MaxValue, numStreamThreads: Long = 1, numStandbyReplicas: Long = 0, cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code} Should I further increase the max.poll.interval.ms (currently 20 minutes)? This job implements the following logic: {code:java} stream().foreach(record => { // Buffered write to MongoDB // Await.result(5 seconds) }){code} Can it be that writes to Mongo take too long and that leads to problems? I was under the impression that by setting heartbeat interval to 5 seconds in Kafka 0.11.0.1, we should have two threads, one that does heartbeat sending (to considered alive) and one that actually calls poll(). In that case blocking too long inside foreach should not kick us out of the consumer group? I will try to collect DEBUG level logs today. > CommitFailedException; this task may be no longer owned by the thread > - > > Key: KAFKA-6990 > URL: https://issues.apache.org/jira/browse/KAFKA-6990 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 >Reporter: Eugen Feller >Priority: Blocker > > We are seeing a lot of CommitFailedExceptions on one of our Kafka stream > apps. Running Kafka Streams 0.11.0.1 and Kafka Broker 0.10.2.1. Full error > message: > {code:java} > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] INFO > org.apache.kafka.streams.KafkaStreams - stream-client > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d] State transition from > REBALANCING to RUNNING. > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.StreamTask - task [0_0] Failed > offset commits > {sightings_sighting_byclientmac_0-0=OffsetAndMetadata{offset=11, > metadata=''}, > mactocontactmappings_contactmapping_byclientmac_0-0=OffsetAndMetadata{offset=26, > metadata=''}} due to CommitFailedException > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit > stream task 0_0 during commit state due to CommitFailedException; this task > may be no longer owned by the thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit > stream task 0_1 during commit state due to CommitFailedException; this task > may be no longer owned by the thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.StreamTask - task [0_1] Failed > offset commits > {mactocontactmappings_contactmapping_byclientmac_0-1=OffsetAndMetadata{offset=38, > metadata=''},
[jira] [Comment Edited] (KAFKA-6990) CommitFailedException; this task may be no longer owned by the thread
[ https://issues.apache.org/jira/browse/KAFKA-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508566#comment-16508566 ] Eugen Feller edited comment on KAFKA-6990 at 6/11/18 7:09 PM: -- Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. My current defaults are as follows: {code:java} requestTimeout: Duration = 30 minutes, maxPollInterval: Duration = 20 minutes, maxPollRecords: Long = 50, fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES, fetchMaxWait: Duration = 10 minutes, sessionTimeout: Duration = 30 seconds, heartbeatInterval: Duration = 5 seconds, stateDirCleanupDelay: Long = Long.MaxValue, numStreamThreads: Long = 1, numStandbyReplicas: Long = 0, cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code} Should I further increase the max.poll.interval.ms (currently 20 minutes)? This job implements the following logic: {code:java} stream().foreach(record => { // Buffered write to MongoDB // Await.result(5 seconds) }){code} Can it be that writes to Mongo take too long and that leads to problems? I was under the impression that by setting heartbeat interval to 5 seconds in Kafka 0.11.0.1, we should have two threads, one that does heartbeat sending (to considered alive) and one that actually calls poll(). In that case blocking too long inside foreach should not kick us out of the consumer group? I will try to collect DEBUG level logs today. was (Author: efeller): Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. My current defaults are as follows: {code:java} requestTimeout: Duration = 30 minutes, maxPollInterval: Duration = 20 minutes, maxPollRecords: Long = 1000, fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES, fetchMaxWait: Duration = 10 minutes, sessionTimeout: Duration = 30 seconds, heartbeatInterval: Duration = 5 seconds, stateDirCleanupDelay: Long = Long.MaxValue, numStreamThreads: Long = 1, numStandbyReplicas: Long = 0, cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code} Should I further increase the max.poll.interval.ms (currently 20 minutes)? For this particular job max poll records is set to 50. This job implements the following logic: {code:java} stream().foreach(record => { // Buffered write to MongoDB // Await.result(5 seconds) }){code} Can it be that writes to Mongo take too long and that leads to problems? I was under the impression that by setting heartbeat interval to 5 seconds in Kafka 0.11.0.1, we should have two threads, one that does heartbeat sending (to considered alive) and one that actually calls poll(). In that case blocking too long inside foreach should not kick us out of the consumer group? I will try to collect DEBUG level logs today. > CommitFailedException; this task may be no longer owned by the thread > - > > Key: KAFKA-6990 > URL: https://issues.apache.org/jira/browse/KAFKA-6990 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 >Reporter: Eugen Feller >Priority: Blocker > > We are seeing a lot of CommitFailedExceptions on one of our Kafka stream > apps. Running Kafka Streams 0.11.0.1 and Kafka Broker 0.10.2.1. Full error > message: > {code:java} > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] INFO > org.apache.kafka.streams.KafkaStreams - stream-client > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d] State transition from > REBALANCING to RUNNING. > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.StreamTask - task [0_0] Failed > offset commits > {sightings_sighting_byclientmac_0-0=OffsetAndMetadata{offset=11, > metadata=''}, > mactocontactmappings_contactmapping_byclientmac_0-0=OffsetAndMetadata{offset=26, > metadata=''}} due to CommitFailedException > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit > stream task 0_0 during commit state due to CommitFailedException; this task > may be no longer owned by the thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit > stream task 0_1 during commit state due to CommitFailedException; this task > may be no longer owned by the thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.StreamTask - task [0_1] Failed > offset commits > {mactocontactmappings_contactmapping_byclientmac_0-1=OffsetAndMetadata{offset=38, >
[jira] [Comment Edited] (KAFKA-6990) CommitFailedException; this task may be no longer owned by the thread
[ https://issues.apache.org/jira/browse/KAFKA-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508566#comment-16508566 ] Eugen Feller edited comment on KAFKA-6990 at 6/11/18 7:08 PM: -- Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. My current defaults are as follows: {code:java} requestTimeout: Duration = 30 minutes, maxPollInterval: Duration = 20 minutes, maxPollRecords: Long = 1000, fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES, fetchMaxWait: Duration = 10 minutes, sessionTimeout: Duration = 30 seconds, heartbeatInterval: Duration = 5 seconds, stateDirCleanupDelay: Long = Long.MaxValue, numStreamThreads: Long = 1, numStandbyReplicas: Long = 0, cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code} Should I further increase the max.poll.interval.ms (currently 20 minutes)? For this particular job max poll records is set to 50. This job implements the following logic: {code:java} stream().foreach(record => { // Buffered write to MongoDB // Await.result(5 seconds) }){code} Can it be that writes to Mongo take too long and that leads to problems? I was under the impression that by setting heartbeat interval to 5 seconds in Kafka 0.11.0.1, we should have two threads, one that does heartbeat sending (to considered alive) and one that actually calls poll(). In that case blocking too long inside foreach should not kick us out of the consumer group? I will try to collect DEBUG level logs today. was (Author: efeller): Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. My current defaults are as follows: {code:java} requestTimeout: Duration = 30 minutes, maxPollInterval: Duration = 20 minutes, maxPollRecords: Long = 1000, fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES, fetchMaxWait: Duration = 10 minutes, sessionTimeout: Duration = 30 seconds, heartbeatInterval: Duration = 5 seconds, stateDirCleanupDelay: Long = Long.MaxValue, numStreamThreads: Long = 1, numStandbyReplicas: Long = 0, cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code} Should I further increase the max.poll.interval.ms (currently 20 minutes)? For this particular job max poll records is set to 50. This job implements the following logic: {code:java} stream().foreach(record => { // Buffered write to MongoDB // Await.result(5 seconds) }){code} Can it be that writes to Mongo take too long and that leads to problems? I was under the impression that by setting heartbeat interval to 5 seconds in Kafka 0.11.0.1, we should have two threads, one that does heartbeat sending (to considered alive) and one that actually calls poll(). In that case blocking too long inside foreach should not kick us out of the consumer group? I will try to collect DEBUG level logs today. > CommitFailedException; this task may be no longer owned by the thread > - > > Key: KAFKA-6990 > URL: https://issues.apache.org/jira/browse/KAFKA-6990 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 >Reporter: Eugen Feller >Priority: Blocker > > We are seeing a lot of CommitFailedExceptions on one of our Kafka stream > apps. Running Kafka Streams 0.11.0.1 and Kafka Broker 0.10.2.1. Full error > message: > {code:java} > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] INFO > org.apache.kafka.streams.KafkaStreams - stream-client > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d] State transition from > REBALANCING to RUNNING. > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.StreamTask - task [0_0] Failed > offset commits > {sightings_sighting_byclientmac_0-0=OffsetAndMetadata{offset=11, > metadata=''}, > mactocontactmappings_contactmapping_byclientmac_0-0=OffsetAndMetadata{offset=26, > metadata=''}} due to CommitFailedException > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit > stream task 0_0 during commit state due to CommitFailedException; this task > may be no longer owned by the thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit > stream task 0_1 during commit state due to CommitFailedException; this task > may be no longer owned by the thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.StreamTask - task [0_1] Failed > offset commits >
[jira] [Comment Edited] (KAFKA-6990) CommitFailedException; this task may be no longer owned by the thread
[ https://issues.apache.org/jira/browse/KAFKA-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508566#comment-16508566 ] Eugen Feller edited comment on KAFKA-6990 at 6/11/18 7:08 PM: -- Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. My current defaults are as follows: {code:java} requestTimeout: Duration = 30 minutes, maxPollInterval: Duration = 20 minutes, maxPollRecords: Long = 1000, fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES, fetchMaxWait: Duration = 10 minutes, sessionTimeout: Duration = 30 seconds, heartbeatInterval: Duration = 5 seconds, stateDirCleanupDelay: Long = Long.MaxValue, numStreamThreads: Long = 1, numStandbyReplicas: Long = 0, cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code} Should I further increase the max.poll.interval.ms (currently 20 minutes)? For this particular job max poll records is set to 50. This job implements the following logic: {code:java} stream().foreach(record => { // Buffered write to MongoDB // Await.result(5 seconds) }){code} Can it be that writes to Mongo take too long and that leads to problems? I was under the impression that by setting heartbeat interval to 5 seconds in Kafka 0.11.0.1, we should have two threads, one that does heartbeat sending (to considered alive) and one that actually calls poll(). In that case blocking too long inside foreach should not kick us out of the consumer group? I will try to collect DEBUG level logs today. was (Author: efeller): Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. My current defaults are as follows: {code:java} requestTimeout: Duration = 30 minutes, maxPollInterval: Duration = 20 minutes, maxPollRecords: Long = 1000, fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES, fetchMaxWait: Duration = 10 minutes, sessionTimeout: Duration = 30 seconds, heartbeatInterval: Duration = 5 seconds, stateDirCleanupDelay: Long = Long.MaxValue, numStreamThreads: Long = 1, numStandbyReplicas: Long = 0, cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code} Should I further increase the max.poll.interval.ms (currently 20 minutes)? This particular job does the following logic: {code:java} stream().foreach(record => { // Buffered write to MongoDB // Await.result(5 seconds) }){code} Can it be that writes to Mongo take too long and that leads to problems? I was under the impression that by setting heartbeat interval to 5 seconds in Kafka 0.11.0.1, we should have two threads, one that does heartbeat sending (to considered alive) and one that actually calls poll(). In that case blocking too long inside foreach should not kick us out of the consumer group? I will try to collect DEBUG level logs today. > CommitFailedException; this task may be no longer owned by the thread > - > > Key: KAFKA-6990 > URL: https://issues.apache.org/jira/browse/KAFKA-6990 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 >Reporter: Eugen Feller >Priority: Blocker > > We are seeing a lot of CommitFailedExceptions on one of our Kafka stream > apps. Running Kafka Streams 0.11.0.1 and Kafka Broker 0.10.2.1. Full error > message: > {code:java} > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] INFO > org.apache.kafka.streams.KafkaStreams - stream-client > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d] State transition from > REBALANCING to RUNNING. > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.StreamTask - task [0_0] Failed > offset commits > {sightings_sighting_byclientmac_0-0=OffsetAndMetadata{offset=11, > metadata=''}, > mactocontactmappings_contactmapping_byclientmac_0-0=OffsetAndMetadata{offset=26, > metadata=''}} due to CommitFailedException > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit > stream task 0_0 during commit state due to CommitFailedException; this task > may be no longer owned by the thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit > stream task 0_1 during commit state due to CommitFailedException; this task > may be no longer owned by the thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.StreamTask - task [0_1] Failed > offset commits >
[jira] [Commented] (KAFKA-6990) CommitFailedException; this task may be no longer owned by the thread
[ https://issues.apache.org/jira/browse/KAFKA-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508566#comment-16508566 ] Eugen Feller commented on KAFKA-6990: - Hi [~mjsax]. Thank you very much for your inputs. That makes total sense to me. My current defaults are as follows: {code:java} requestTimeout: Duration = 30 minutes, maxPollInterval: Duration = 20 minutes, maxPollRecords: Long = 1000, fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES, fetchMaxWait: Duration = 10 minutes, sessionTimeout: Duration = 30 seconds, heartbeatInterval: Duration = 5 seconds, stateDirCleanupDelay: Long = Long.MaxValue, numStreamThreads: Long = 1, numStandbyReplicas: Long = 0, cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L{code} Should I further increase the max.poll.interval.ms (currently 20 minutes)? This particular job does the following logic: {code:java} stream().foreach(record => { // Buffered write to MongoDB // Await.result(5 seconds) }){code} Can it be that writes to Mongo take too long and that leads to problems? I was under the impression that by setting heartbeat interval to 5 seconds in Kafka 0.11.0.1, we should have two threads, one that does heartbeat sending (to considered alive) and one that actually calls poll(). In that case blocking too long inside foreach should not kick us out of the consumer group? I will try to collect DEBUG level logs today. > CommitFailedException; this task may be no longer owned by the thread > - > > Key: KAFKA-6990 > URL: https://issues.apache.org/jira/browse/KAFKA-6990 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 >Reporter: Eugen Feller >Priority: Blocker > > We are seeing a lot of CommitFailedExceptions on one of our Kafka stream > apps. Running Kafka Streams 0.11.0.1 and Kafka Broker 0.10.2.1. Full error > message: > {code:java} > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] INFO > org.apache.kafka.streams.KafkaStreams - stream-client > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d] State transition from > REBALANCING to RUNNING. > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.StreamTask - task [0_0] Failed > offset commits > {sightings_sighting_byclientmac_0-0=OffsetAndMetadata{offset=11, > metadata=''}, > mactocontactmappings_contactmapping_byclientmac_0-0=OffsetAndMetadata{offset=26, > metadata=''}} due to CommitFailedException > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit > stream task 0_0 during commit state due to CommitFailedException; this task > may be no longer owned by the thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit > stream task 0_1 during commit state due to CommitFailedException; this task > may be no longer owned by the thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.StreamTask - task [0_1] Failed > offset commits > {mactocontactmappings_contactmapping_byclientmac_0-1=OffsetAndMetadata{offset=38, > metadata=''}, sightings_sighting_byclientmac_0-1=OffsetAndMetadata{offset=1, > metadata=''}} due to CommitFailedException > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.StreamTask - task [0_2] Failed > offset commits > {mactocontactmappings_contactmapping_byclientmac_0-2=OffsetAndMetadata{offset=8, > metadata=''}, > sightings_sighting_byclientmac_0-2=OffsetAndMetadata{offset=24, metadata=''}} > due to CommitFailedException > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit > stream task 0_2 during commit state due to CommitFailedException; this task > may be no longer owned by the thread > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN > org.apache.kafka.streams.processor.internals.StreamTask - task [0_3] Failed > offset commits > {sightings_sighting_byclientmac_0-3=OffsetAndMetadata{offset=21, > metadata=''}, > mactocontactmappings_contactmapping_byclientmac_0-3=OffsetAndMetadata{offset=102, > metadata=''}} due to CommitFailedException > [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN >
[jira] [Commented] (KAFKA-7039) DelegatingClassLoader creates plugin instance even if its not Versioned
[ https://issues.apache.org/jira/browse/KAFKA-7039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508556#comment-16508556 ] ASF GitHub Bot commented on KAFKA-7039: --- mageshn opened a new pull request #5191: KAFKA-7039 : Create an instance of the plugin only it's a Versioned Plugin URL: https://github.com/apache/kafka/pull/5191 Create an instance of the plugin only it's a Versioned Plugin. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behavior change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > DelegatingClassLoader creates plugin instance even if its not Versioned > --- > > Key: KAFKA-7039 > URL: https://issues.apache.org/jira/browse/KAFKA-7039 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Magesh kumar Nandakumar >Assignee: Magesh kumar Nandakumar >Priority: Critical > Fix For: 2.0.0 > > > The versioned interface was introduced as part of > [KIP-285|https://cwiki.apache.org/confluence/display/KAFKA/KIP-285%3A+Connect+Rest+Extension+Plugin]. > DelegatingClassLoader is now attempting to create an instance of all the > plugins, even if it's not required. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7039) DelegatingClassLoader creates plugin instance even if its not Versioned
Magesh kumar Nandakumar created KAFKA-7039: -- Summary: DelegatingClassLoader creates plugin instance even if its not Versioned Key: KAFKA-7039 URL: https://issues.apache.org/jira/browse/KAFKA-7039 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.0.0 Reporter: Magesh kumar Nandakumar Assignee: Magesh kumar Nandakumar Fix For: 2.0.0 The versioned interface was introduced as part of [KIP-285|https://cwiki.apache.org/confluence/display/KAFKA/KIP-285%3A+Connect+Rest+Extension+Plugin]. DelegatingClassLoader is now attempting to create an instance of all the plugins, even if it's not required. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6546) Add ENDPOINT_NOT_FOUND_ON_LEADER error code for missing listener
[ https://issues.apache.org/jira/browse/KAFKA-6546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508476#comment-16508476 ] ASF GitHub Bot commented on KAFKA-6546: --- rajinisivaram opened a new pull request #5189: KAFKA-6546: Use LISTENER_NOT_FOUND_ON_LEADER error for missing listener URL: https://github.com/apache/kafka/pull/5189 For metadata request version 6 and above, use a different error code to indicate missing listener on leader broker to enable diagnosis of listener configuration issues. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add ENDPOINT_NOT_FOUND_ON_LEADER error code for missing listener > > > Key: KAFKA-6546 > URL: https://issues.apache.org/jira/browse/KAFKA-6546 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.1.0 > > > In 1,1, if an endpoint is available on the broker processing a metadata > request, but the corresponding listener is not available on the leader of a > partition, LEADER_NOT_AVAILABLE is returned (earlier versions returned > UNKNOWN_SERVER_ERROR). This could indicate broker misconfiguration where some > brokers are not configured with all listeners or it could indicate a > transient error when listeners are dynamically added, We want to treat the > error as a transient error to process dynamic updates, but we should notify > clients of the actual error. This change should be made when MetadataRequest > version is updated so that LEADER_NOT_AVAILABLE is returned to older clients. > See > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration] > and [https://github.com/apache/kafka/pull/4539] for details. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7029) ReplicaVerificationTool should not use the deprecated SimpleConsumer
[ https://issues.apache.org/jira/browse/KAFKA-7029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508472#comment-16508472 ] ASF GitHub Bot commented on KAFKA-7029: --- omkreddy opened a new pull request #5188: KAFKA-7029: Update ReplicaVerificationTool to use Java Consumer URL: https://github.com/apache/kafka/pull/5188 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ReplicaVerificationTool should not use the deprecated SimpleConsumer > > > Key: KAFKA-7029 > URL: https://issues.apache.org/jira/browse/KAFKA-7029 > Project: Kafka > Issue Type: Bug >Reporter: Ismael Juma >Assignee: Manikumar >Priority: Major > > Unless there's a reason not to, the simplest would be to use KafkaConsumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6697) JBOD configured broker should not die if log directory is invalid
[ https://issues.apache.org/jira/browse/KAFKA-6697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-6697: Issue Type: Bug (was: Improvement) > JBOD configured broker should not die if log directory is invalid > - > > Key: KAFKA-6697 > URL: https://issues.apache.org/jira/browse/KAFKA-6697 > Project: Kafka > Issue Type: Bug >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > Fix For: 2.0.0 > > > Currently JBOD configured broker will still die on startup if > dir.getCanonicalPath() throws IOException. We should mark such log directory > as offline and broker should still run if there is good disk. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7037) delete topic command replaces '+' from the topic name which leads incorrect topic deletion
[ https://issues.apache.org/jira/browse/KAFKA-7037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vahid Hashemian reassigned KAFKA-7037: -- Assignee: Vahid Hashemian > delete topic command replaces '+' from the topic name which leads incorrect > topic deletion > -- > > Key: KAFKA-7037 > URL: https://issues.apache.org/jira/browse/KAFKA-7037 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.0, 1.0.0 >Reporter: Sandeep Nemuri >Assignee: Vahid Hashemian >Priority: Major > > While executing a delete command kafka cli tool is removing the "+" symbol > and deleting the incorrect topic. In below case if _"*test+topic"*_ is > deleted kafka deletes _*testtopic.*_ > {code:java} > [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh > --create --zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 > --topic testtopic > Created topic "testtopic". > [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh > --zookeeper `hostname`:2181 --topic test+topic --delete > Topic testtopic is marked for deletion.{code} > delete topic replaces '+' from the topic name -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6946) Keep the session id for incremental fetch when fetch responses are throttled
[ https://issues.apache.org/jira/browse/KAFKA-6946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin resolved KAFKA-6946. - Resolution: Fixed > Keep the session id for incremental fetch when fetch responses are throttled > - > > Key: KAFKA-6946 > URL: https://issues.apache.org/jira/browse/KAFKA-6946 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.0.0 >Reporter: Jon Lee >Priority: Major > > The current patch for KAFKA-6028 (KIP-219) sends a FetchResponse with > INVALID_SESSION_ID if the response needs to be throttled due to quota > violation. If it is for incremental fetch, this will make the client reset > its session and send a full fetch request next time. This is not a > correctness issue, but it may affect performance when fetches are throttled. > In case of incremental fetch, a throttled response should use the same > session id as before so that the next unthrottled response can be in the same > session. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6860) NPE when reinitializeStateStores with eos enabled
[ https://issues.apache.org/jira/browse/KAFKA-6860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508424#comment-16508424 ] ASF GitHub Bot commented on KAFKA-6860: --- mjsax opened a new pull request #5187: KAFKA-6860: Fix NPE in Kafka Streams with EOS enabled URL: https://github.com/apache/kafka/pull/5187 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > NPE when reinitializeStateStores with eos enabled > - > > Key: KAFKA-6860 > URL: https://issues.apache.org/jira/browse/KAFKA-6860 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 > Environment: mac, kafka1.1 >Reporter: ko byoung kwon >Assignee: Matthias J. Sax >Priority: Major > Fix For: 2.0.0 > > Original Estimate: 2h > Remaining Estimate: 2h > > *Symptom* > With EOS enabled , Reinitializing stateStores get an NPE because checkpoint > is null. > {code:java} > 2018-05-02 18:05:17.156 ERROR 60836 --- [-StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread > [kafka-stream-application-d6ec1dfb-9b7f-42dd-8b28-899ff3d1ad98-StreamThread-1] > Encountered the following error during processing: > java.lang.NullPointerException: null > at > org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions(AbstractStateManager.java:66) > ~[kafka-streams-1.1.0.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:155) > ~[kafka-streams-1.1.0.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:230) > ~[kafka-streams-1.1.0.jar:na] > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94) > ~[kafka-streams-1.1.0.jar:na] > {code} > *How to reproduce* > *configure as* > - changelog topic with short `retention.ms` and `delete` policy (just to > reproduce the symptom easily) > ex) retention.ms=3,cleanup.policy=delete > - exaclty once semantic enabled > - no cleanup > *Step* > - two task[0_0],[0,1] , two Spring Boot (assign was#1:task[0_0], > was#2:task[0_1]) > - write some data each state store(changelog topic will soon erase those > messages. by short "retentin.ms") > - when was#2 is killed, then was#1 will restore task[0_1]'s data on its own > rocksDB > - In the process, it finds a checkpoint and an error > occurs.(AbstractStateManager #66) > {code:java} > // My code > Map topicConfiguration = new HashMap<>(); > topicConfiguration.putIfAbsent("cleanup.policy", "delete"); > topicConfiguration.putIfAbsent("file.delete.delay.ms", "0"); > topicConfiguration.putIfAbsent("retention.ms", "3000"); > builder.stream(properties.getSourceTopic(), >Consumed.with(Serdes.Long(), Serdes.String())) >.groupByKey() >.count(Materialized > . byte[]>>as(ORDER_STORE_NAME) > .withKeySerde(Serdes.Long()) > .withValueSerde(Serdes.Long()) > .withLoggingEnabled(topicConfiguration)); > {code} > *Suggestion* > When EOS is enabled, the checkpoint will be null. > I think , need to add some code to create a Checkpoint. > As follows > {code:java} > // # At org.apache.kafka.streams.processor.internals.AbstractStateManager #66 > // # suggestion start > if (checkpoint == null) { > checkpoint = new OffsetCheckpoint(new File(baseDir, > CHECKPOINT_FILE_NAME)); > } > // # suggestion end > try { > checkpoint.write(checkpointableOffsets); > } catch (final IOException fatalException) { > log.error("Failed to write offset checkpoint file to {} while > re-initializing {}: {}", checkpoint, stateStores, fatalException); > throw new StreamsException("Failed to reinitialize global store.", >
[jira] [Commented] (KAFKA-7035) Kafka Processor's init() method sometimes is not called
[ https://issues.apache.org/jira/browse/KAFKA-7035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508425#comment-16508425 ] Oleksandr Konopko commented on KAFKA-7035: -- ok, but processor constructor and processor.init() are not atomic piece of logic, right ? * data processed by A and B is not reprocessed by C and D * will check... > Kafka Processor's init() method sometimes is not called > --- > > Key: KAFKA-7035 > URL: https://issues.apache.org/jira/browse/KAFKA-7035 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Oleksandr Konopko >Priority: Critical > Attachments: TransformProcessor.java > > > Scenario: > 1. We have processing of Kafka Topic which is implemented with Processor API > 2. We want to collect metrics (lets say just count number of processed > entities for simplicity) > 3. How we tried to organize this > * process data with process() method and send it down the stream with context > * on each call of process() method update the counter > * schedule puctuate function which will send metric to special topic. Metric > is build with counter > You can find the code (we removed all business sensitive code out of it, so > it should be easy to read) in attachment > > Problematic Kafka Streams behaviour that i can see by logging every step: > 1. We have 80 messages in the input topic > 2. Kafka Streams creates 4 Processor instances. Lets name them: ProcessorA, > ProcessorB, ProcessorC and ProcessorD > 3. ProcessorA and ProcessorB receive 1-5% of data. Data is processed > correctly, results are sent down the stream. Counter is upated > 4. init() method was not called for ProcessorA and ProcessorB > 5. ProcessorC and ProcessorD are created and they start to receive all the > rest of data. 95-99% > 6. init() method is called for both ProcessorC and ProcessorD. It initiates > punctuation, which causes Metrics message be created and sent down the metric > stream periodically > 7. ProcessorA and ProcessorB are closed. init() was never called for them. So > Metric entity was not sent to metrics topic > 8. Processing is finished. > > In the end: > Expected: > * 80 entities were processed and sent to the Sink > * Metrics entities contain counters which sum up to 80 > Actual results: > * 80 entities were processed and sent to the Sink > * Metrics entities contain counters which sum up to some number 3-6% less > than 80, for example 786543 > > Problem: > * init() method call is not guaranteed > * there is no way to guarantee that all work was done by punctuate method > before close() > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6946) Keep the session id for incremental fetch when fetch responses are throttled
[ https://issues.apache.org/jira/browse/KAFKA-6946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508420#comment-16508420 ] ASF GitHub Bot commented on KAFKA-6946: --- lindong28 closed pull request #5164: KAFKA-6946: Keep the session id for incremental fetch when fetch responses are throttled URL: https://github.com/apache/kafka/pull/5164 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/server/FetchSession.scala b/core/src/main/scala/kafka/server/FetchSession.scala index 7a47780a135..68f79cace38 100644 --- a/core/src/main/scala/kafka/server/FetchSession.scala +++ b/core/src/main/scala/kafka/server/FetchSession.scala @@ -290,6 +290,12 @@ trait FetchContext extends Logging { def partitionsToLogString(partitions: util.Collection[TopicPartition]): String = FetchSession.partitionsToLogString(partitions, isTraceEnabled) + + /** +* Return an empty throttled response due to quota violation. +*/ + def getThrottledResponse(throttleTimeMs: Int): FetchResponse[Records] = +new FetchResponse(Errors.NONE, new FetchSession.RESP_MAP, throttleTimeMs, INVALID_SESSION_ID) } /** @@ -474,6 +480,21 @@ class IncrementalFetchContext(private val time: Time, } } } + + override def getThrottledResponse(throttleTimeMs: Int): FetchResponse[Records] = { +session.synchronized { + // Check to make sure that the session epoch didn't change in between + // creating this fetch context and generating this response. + val expectedEpoch = JFetchMetadata.nextEpoch(reqMetadata.epoch()) + if (session.epoch != expectedEpoch) { +info(s"Incremental fetch session ${session.id} expected epoch $expectedEpoch, but " + + s"got ${session.epoch}. Possible duplicate request.") +new FetchResponse(Errors.INVALID_FETCH_SESSION_EPOCH, new FetchSession.RESP_MAP, throttleTimeMs, session.id) + } else { +new FetchResponse(Errors.NONE, new FetchSession.RESP_MAP, throttleTimeMs, session.id) + } +} + } } case class LastUsedKey(val lastUsedMs: Long, diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 6d9e3d115b8..f7e9ec98fdb 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -655,8 +655,7 @@ class KafkaApis(val requestChannel: RequestChannel, quotas.request.throttle(request, requestThrottleTimeMs, sendResponse) } // If throttling is required, return an empty response. - unconvertedFetchResponse = new FetchResponse(Errors.NONE, new util.LinkedHashMap[TopicPartition, -FetchResponse.PartitionData[Records]](), maxThrottleTimeMs, INVALID_SESSION_ID) + unconvertedFetchResponse = fetchContext.getThrottledResponse(maxThrottleTimeMs) } else { // Get the actual response. This will update the fetch context. unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions) diff --git a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala index 84efa6b684d..b79692d69d9 100755 --- a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala +++ b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala @@ -201,25 +201,34 @@ class FetchSessionTest { assertEquals(Errors.INVALID_FETCH_SESSION_EPOCH, context6.updateAndGenerateResponseData(respData2).error()) +// Test generating a throttled response for the incremental fetch session +val reqData7 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] +val context7 = fetchManager.newContext( + new JFetchMetadata(resp2.sessionId(), 2), reqData7, EMPTY_PART_LIST, false) +val resp7 = context7.getThrottledResponse(100) +assertEquals(Errors.NONE, resp7.error()) +assertEquals(resp2.sessionId(), resp7.sessionId()) +assertEquals(100, resp7.throttleTimeMs()) + // Close the incremental fetch session. val prevSessionId = resp5.sessionId var nextSessionId = prevSessionId do { - val reqData7 = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData] - reqData7.put(new TopicPartition("bar", 0), new FetchRequest.PartitionData(0, 0, 100)) - reqData7.put(new TopicPartition("bar", 1), new FetchRequest.PartitionData(10, 0, 100)) - val context7 = fetchManager.newContext( -new JFetchMetadata(prevSessionId, FINAL_EPOCH), reqData7, EMPTY_PART_LIST, false) - assertEquals(classOf[SessionlessFetchContext], context7.getClass) + val reqData8 = new
[jira] [Commented] (KAFKA-7035) Kafka Processor's init() method sometimes is not called
[ https://issues.apache.org/jira/browse/KAFKA-7035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508402#comment-16508402 ] Guozhang Wang commented on KAFKA-7035: -- Hi [~akonopko], processor.init() should always be called when the task was initialized for the first time. I've just checked the code in 1.0.0 and confirmed it is still the case. BTW, could you elaborate the scenario why 1) four processors were created, and processorC and D takes over the data from processorA and B? Are there only two input partitions available? How many total num.threads existed for this application? > Kafka Processor's init() method sometimes is not called > --- > > Key: KAFKA-7035 > URL: https://issues.apache.org/jira/browse/KAFKA-7035 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Oleksandr Konopko >Priority: Critical > Attachments: TransformProcessor.java > > > Scenario: > 1. We have processing of Kafka Topic which is implemented with Processor API > 2. We want to collect metrics (lets say just count number of processed > entities for simplicity) > 3. How we tried to organize this > * process data with process() method and send it down the stream with context > * on each call of process() method update the counter > * schedule puctuate function which will send metric to special topic. Metric > is build with counter > You can find the code (we removed all business sensitive code out of it, so > it should be easy to read) in attachment > > Problematic Kafka Streams behaviour that i can see by logging every step: > 1. We have 80 messages in the input topic > 2. Kafka Streams creates 4 Processor instances. Lets name them: ProcessorA, > ProcessorB, ProcessorC and ProcessorD > 3. ProcessorA and ProcessorB receive 1-5% of data. Data is processed > correctly, results are sent down the stream. Counter is upated > 4. init() method was not called for ProcessorA and ProcessorB > 5. ProcessorC and ProcessorD are created and they start to receive all the > rest of data. 95-99% > 6. init() method is called for both ProcessorC and ProcessorD. It initiates > punctuation, which causes Metrics message be created and sent down the metric > stream periodically > 7. ProcessorA and ProcessorB are closed. init() was never called for them. So > Metric entity was not sent to metrics topic > 8. Processing is finished. > > In the end: > Expected: > * 80 entities were processed and sent to the Sink > * Metrics entities contain counters which sum up to 80 > Actual results: > * 80 entities were processed and sent to the Sink > * Metrics entities contain counters which sum up to some number 3-6% less > than 80, for example 786543 > > Problem: > * init() method call is not guaranteed > * there is no way to guarantee that all work was done by punctuate method > before close() > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6860) NPE when reinitializeStateStores with eos enabled
[ https://issues.apache.org/jira/browse/KAFKA-6860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6860: --- Fix Version/s: 2.0.0 > NPE when reinitializeStateStores with eos enabled > - > > Key: KAFKA-6860 > URL: https://issues.apache.org/jira/browse/KAFKA-6860 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 > Environment: mac, kafka1.1 >Reporter: ko byoung kwon >Assignee: Matthias J. Sax >Priority: Major > Fix For: 2.0.0 > > Original Estimate: 2h > Remaining Estimate: 2h > > *Symptom* > With EOS enabled , Reinitializing stateStores get an NPE because checkpoint > is null. > {code:java} > 2018-05-02 18:05:17.156 ERROR 60836 --- [-StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread > [kafka-stream-application-d6ec1dfb-9b7f-42dd-8b28-899ff3d1ad98-StreamThread-1] > Encountered the following error during processing: > java.lang.NullPointerException: null > at > org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions(AbstractStateManager.java:66) > ~[kafka-streams-1.1.0.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:155) > ~[kafka-streams-1.1.0.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:230) > ~[kafka-streams-1.1.0.jar:na] > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94) > ~[kafka-streams-1.1.0.jar:na] > {code} > *How to reproduce* > *configure as* > - changelog topic with short `retention.ms` and `delete` policy (just to > reproduce the symptom easily) > ex) retention.ms=3,cleanup.policy=delete > - exaclty once semantic enabled > - no cleanup > *Step* > - two task[0_0],[0,1] , two Spring Boot (assign was#1:task[0_0], > was#2:task[0_1]) > - write some data each state store(changelog topic will soon erase those > messages. by short "retentin.ms") > - when was#2 is killed, then was#1 will restore task[0_1]'s data on its own > rocksDB > - In the process, it finds a checkpoint and an error > occurs.(AbstractStateManager #66) > {code:java} > // My code > Map topicConfiguration = new HashMap<>(); > topicConfiguration.putIfAbsent("cleanup.policy", "delete"); > topicConfiguration.putIfAbsent("file.delete.delay.ms", "0"); > topicConfiguration.putIfAbsent("retention.ms", "3000"); > builder.stream(properties.getSourceTopic(), >Consumed.with(Serdes.Long(), Serdes.String())) >.groupByKey() >.count(Materialized > . byte[]>>as(ORDER_STORE_NAME) > .withKeySerde(Serdes.Long()) > .withValueSerde(Serdes.Long()) > .withLoggingEnabled(topicConfiguration)); > {code} > *Suggestion* > When EOS is enabled, the checkpoint will be null. > I think , need to add some code to create a Checkpoint. > As follows > {code:java} > // # At org.apache.kafka.streams.processor.internals.AbstractStateManager #66 > // # suggestion start > if (checkpoint == null) { > checkpoint = new OffsetCheckpoint(new File(baseDir, > CHECKPOINT_FILE_NAME)); > } > // # suggestion end > try { > checkpoint.write(checkpointableOffsets); > } catch (final IOException fatalException) { > log.error("Failed to write offset checkpoint file to {} while > re-initializing {}: {}", checkpoint, stateStores, fatalException); > throw new StreamsException("Failed to reinitialize global store.", > fatalException); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6860) NPE when reinitializeStateStores with eos enabled
[ https://issues.apache.org/jira/browse/KAFKA-6860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-6860: -- Assignee: Matthias J. Sax > NPE when reinitializeStateStores with eos enabled > - > > Key: KAFKA-6860 > URL: https://issues.apache.org/jira/browse/KAFKA-6860 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 > Environment: mac, kafka1.1 >Reporter: ko byoung kwon >Assignee: Matthias J. Sax >Priority: Major > Original Estimate: 2h > Remaining Estimate: 2h > > *Symptom* > With EOS enabled , Reinitializing stateStores get an NPE because checkpoint > is null. > {code:java} > 2018-05-02 18:05:17.156 ERROR 60836 --- [-StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread > [kafka-stream-application-d6ec1dfb-9b7f-42dd-8b28-899ff3d1ad98-StreamThread-1] > Encountered the following error during processing: > java.lang.NullPointerException: null > at > org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions(AbstractStateManager.java:66) > ~[kafka-streams-1.1.0.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:155) > ~[kafka-streams-1.1.0.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:230) > ~[kafka-streams-1.1.0.jar:na] > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94) > ~[kafka-streams-1.1.0.jar:na] > {code} > *How to reproduce* > *configure as* > - changelog topic with short `retention.ms` and `delete` policy (just to > reproduce the symptom easily) > ex) retention.ms=3,cleanup.policy=delete > - exaclty once semantic enabled > - no cleanup > *Step* > - two task[0_0],[0,1] , two Spring Boot (assign was#1:task[0_0], > was#2:task[0_1]) > - write some data each state store(changelog topic will soon erase those > messages. by short "retentin.ms") > - when was#2 is killed, then was#1 will restore task[0_1]'s data on its own > rocksDB > - In the process, it finds a checkpoint and an error > occurs.(AbstractStateManager #66) > {code:java} > // My code > Map topicConfiguration = new HashMap<>(); > topicConfiguration.putIfAbsent("cleanup.policy", "delete"); > topicConfiguration.putIfAbsent("file.delete.delay.ms", "0"); > topicConfiguration.putIfAbsent("retention.ms", "3000"); > builder.stream(properties.getSourceTopic(), >Consumed.with(Serdes.Long(), Serdes.String())) >.groupByKey() >.count(Materialized > . byte[]>>as(ORDER_STORE_NAME) > .withKeySerde(Serdes.Long()) > .withValueSerde(Serdes.Long()) > .withLoggingEnabled(topicConfiguration)); > {code} > *Suggestion* > When EOS is enabled, the checkpoint will be null. > I think , need to add some code to create a Checkpoint. > As follows > {code:java} > // # At org.apache.kafka.streams.processor.internals.AbstractStateManager #66 > // # suggestion start > if (checkpoint == null) { > checkpoint = new OffsetCheckpoint(new File(baseDir, > CHECKPOINT_FILE_NAME)); > } > // # suggestion end > try { > checkpoint.write(checkpointableOffsets); > } catch (final IOException fatalException) { > log.error("Failed to write offset checkpoint file to {} while > re-initializing {}: {}", checkpoint, stateStores, fatalException); > throw new StreamsException("Failed to reinitialize global store.", > fatalException); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6860) NPE when reinitializeStateStores with eos enabled
[ https://issues.apache.org/jira/browse/KAFKA-6860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508317#comment-16508317 ] Guozhang Wang commented on KAFKA-6860: -- I see. Thanks for the explanation [~mjsax]. And your proposed fix makes sense to me. I think a more general solution would involve also fixing the double checkpointing for non-EOS case: today we checkpoint in `suspend` if EOS is not turned in and in `closeSuspended` always. So for EOS, we only checkpoint in `closeSuspended`, while in non EOS we checkpoint in both, hence we have unnecessarily written twice of the checkpoints when closing. But for this general fix, I think it may be better to consider fixing with some refactoring on the ProcessorStateManager code, and hence not necessarily to be included for this JIRA. > NPE when reinitializeStateStores with eos enabled > - > > Key: KAFKA-6860 > URL: https://issues.apache.org/jira/browse/KAFKA-6860 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 > Environment: mac, kafka1.1 >Reporter: ko byoung kwon >Priority: Major > Original Estimate: 2h > Remaining Estimate: 2h > > *Symptom* > With EOS enabled , Reinitializing stateStores get an NPE because checkpoint > is null. > {code:java} > 2018-05-02 18:05:17.156 ERROR 60836 --- [-StreamThread-1] > o.a.k.s.p.internals.StreamThread : stream-thread > [kafka-stream-application-d6ec1dfb-9b7f-42dd-8b28-899ff3d1ad98-StreamThread-1] > Encountered the following error during processing: > java.lang.NullPointerException: null > at > org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions(AbstractStateManager.java:66) > ~[kafka-streams-1.1.0.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:155) > ~[kafka-streams-1.1.0.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:230) > ~[kafka-streams-1.1.0.jar:na] > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94) > ~[kafka-streams-1.1.0.jar:na] > {code} > *How to reproduce* > *configure as* > - changelog topic with short `retention.ms` and `delete` policy (just to > reproduce the symptom easily) > ex) retention.ms=3,cleanup.policy=delete > - exaclty once semantic enabled > - no cleanup > *Step* > - two task[0_0],[0,1] , two Spring Boot (assign was#1:task[0_0], > was#2:task[0_1]) > - write some data each state store(changelog topic will soon erase those > messages. by short "retentin.ms") > - when was#2 is killed, then was#1 will restore task[0_1]'s data on its own > rocksDB > - In the process, it finds a checkpoint and an error > occurs.(AbstractStateManager #66) > {code:java} > // My code > Map topicConfiguration = new HashMap<>(); > topicConfiguration.putIfAbsent("cleanup.policy", "delete"); > topicConfiguration.putIfAbsent("file.delete.delay.ms", "0"); > topicConfiguration.putIfAbsent("retention.ms", "3000"); > builder.stream(properties.getSourceTopic(), >Consumed.with(Serdes.Long(), Serdes.String())) >.groupByKey() >.count(Materialized > . byte[]>>as(ORDER_STORE_NAME) > .withKeySerde(Serdes.Long()) > .withValueSerde(Serdes.Long()) > .withLoggingEnabled(topicConfiguration)); > {code} > *Suggestion* > When EOS is enabled, the checkpoint will be null. > I think , need to add some code to create a Checkpoint. > As follows > {code:java} > // # At org.apache.kafka.streams.processor.internals.AbstractStateManager #66 > // # suggestion start > if (checkpoint == null) { > checkpoint = new OffsetCheckpoint(new File(baseDir, > CHECKPOINT_FILE_NAME)); > } > // # suggestion end > try { > checkpoint.write(checkpointableOffsets); > } catch (final IOException fatalException) { > log.error("Failed to write offset checkpoint file to {} while > re-initializing {}: {}", checkpoint, stateStores, fatalException); > throw new StreamsException("Failed to reinitialize global store.", > fatalException); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6889) Fix Streams system test to only specify used log files
[ https://issues.apache.org/jira/browse/KAFKA-6889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-6889: --- Attachment: (was: Screen Shot 2018-06-11 at 7.15.02 PM.png) > Fix Streams system test to only specify used log files > -- > > Key: KAFKA-6889 > URL: https://issues.apache.org/jira/browse/KAFKA-6889 > Project: Kafka > Issue Type: Task > Components: streams, system tests >Reporter: Matthias J. Sax >Priority: Minor > Labels: beginner, easyfix > > In `streams.py` the class `StreamsTestBaseService` lists many log files that > are only used in certain tests. For all tests, that do not use those log > files, during the test run WARN messages are produced: > {noformat} > [WARNING - 2018-05-09 13:51:22,065 - test - compress_service_logs - > lineno:131]: Error compressing log /mnt/streams/streams.stdout.0-6: service > ['worker7']>: ubuntu@worker7: Command 'cd "$(dirname > /mnt/streams/streams.stdout.0-6)" && f="$(basename > /mnt/streams/streams.stdout.0-6)" && tar czf "$f.tgz" "$f" && rm -rf > /mnt/streams/streams.stdout.0-6' returned non-zero exit status 2. Remote > error message: tar: streams.stdout.0-6: Cannot stat: No such file or directory > tar: Exiting with failure status due to previous errors > {noformat} > Those message spam the output and might be miss leading. We should update the > Streams system tests accordingly such that each test only specifies the > log-file names it actually uses to avoid the WARN message. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6889) Fix Streams system test to only specify used log files
[ https://issues.apache.org/jira/browse/KAFKA-6889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508285#comment-16508285 ] Stanislav Kozlovski commented on KAFKA-6889: What do you mean the Jenkins has read access? I personally cannot open the link sent by Matthias - https://imgur.com/yt7Su2e > Fix Streams system test to only specify used log files > -- > > Key: KAFKA-6889 > URL: https://issues.apache.org/jira/browse/KAFKA-6889 > Project: Kafka > Issue Type: Task > Components: streams, system tests >Reporter: Matthias J. Sax >Priority: Minor > Labels: beginner, easyfix > > In `streams.py` the class `StreamsTestBaseService` lists many log files that > are only used in certain tests. For all tests, that do not use those log > files, during the test run WARN messages are produced: > {noformat} > [WARNING - 2018-05-09 13:51:22,065 - test - compress_service_logs - > lineno:131]: Error compressing log /mnt/streams/streams.stdout.0-6: service > ['worker7']>: ubuntu@worker7: Command 'cd "$(dirname > /mnt/streams/streams.stdout.0-6)" && f="$(basename > /mnt/streams/streams.stdout.0-6)" && tar czf "$f.tgz" "$f" && rm -rf > /mnt/streams/streams.stdout.0-6' returned non-zero exit status 2. Remote > error message: tar: streams.stdout.0-6: Cannot stat: No such file or directory > tar: Exiting with failure status due to previous errors > {noformat} > Those message spam the output and might be miss leading. We should update the > Streams system tests accordingly such that each test only specifies the > log-file names it actually uses to avoid the WARN message. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6889) Fix Streams system test to only specify used log files
[ https://issues.apache.org/jira/browse/KAFKA-6889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-6889: --- Attachment: Screen Shot 2018-06-11 at 7.15.02 PM.png > Fix Streams system test to only specify used log files > -- > > Key: KAFKA-6889 > URL: https://issues.apache.org/jira/browse/KAFKA-6889 > Project: Kafka > Issue Type: Task > Components: streams, system tests >Reporter: Matthias J. Sax >Priority: Minor > Labels: beginner, easyfix > Attachments: Screen Shot 2018-06-11 at 7.15.02 PM.png > > > In `streams.py` the class `StreamsTestBaseService` lists many log files that > are only used in certain tests. For all tests, that do not use those log > files, during the test run WARN messages are produced: > {noformat} > [WARNING - 2018-05-09 13:51:22,065 - test - compress_service_logs - > lineno:131]: Error compressing log /mnt/streams/streams.stdout.0-6: service > ['worker7']>: ubuntu@worker7: Command 'cd "$(dirname > /mnt/streams/streams.stdout.0-6)" && f="$(basename > /mnt/streams/streams.stdout.0-6)" && tar czf "$f.tgz" "$f" && rm -rf > /mnt/streams/streams.stdout.0-6' returned non-zero exit status 2. Remote > error message: tar: streams.stdout.0-6: Cannot stat: No such file or directory > tar: Exiting with failure status due to previous errors > {noformat} > Those message spam the output and might be miss leading. We should update the > Streams system tests accordingly such that each test only specifies the > log-file names it actually uses to avoid the WARN message. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6889) Fix Streams system test to only specify used log files
[ https://issues.apache.org/jira/browse/KAFKA-6889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-6889: --- Attachment: Screen Shot 2018-06-11 at 7.15.02 PM.png > Fix Streams system test to only specify used log files > -- > > Key: KAFKA-6889 > URL: https://issues.apache.org/jira/browse/KAFKA-6889 > Project: Kafka > Issue Type: Task > Components: streams, system tests >Reporter: Matthias J. Sax >Priority: Minor > Labels: beginner, easyfix > Attachments: Screen Shot 2018-06-11 at 7.15.02 PM.png > > > In `streams.py` the class `StreamsTestBaseService` lists many log files that > are only used in certain tests. For all tests, that do not use those log > files, during the test run WARN messages are produced: > {noformat} > [WARNING - 2018-05-09 13:51:22,065 - test - compress_service_logs - > lineno:131]: Error compressing log /mnt/streams/streams.stdout.0-6: service > ['worker7']>: ubuntu@worker7: Command 'cd "$(dirname > /mnt/streams/streams.stdout.0-6)" && f="$(basename > /mnt/streams/streams.stdout.0-6)" && tar czf "$f.tgz" "$f" && rm -rf > /mnt/streams/streams.stdout.0-6' returned non-zero exit status 2. Remote > error message: tar: streams.stdout.0-6: Cannot stat: No such file or directory > tar: Exiting with failure status due to previous errors > {noformat} > Those message spam the output and might be miss leading. We should update the > Streams system tests accordingly such that each test only specifies the > log-file names it actually uses to avoid the WARN message. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6889) Fix Streams system test to only specify used log files
[ https://issues.apache.org/jira/browse/KAFKA-6889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-6889: --- Attachment: (was: Screen Shot 2018-06-11 at 7.15.02 PM.png) > Fix Streams system test to only specify used log files > -- > > Key: KAFKA-6889 > URL: https://issues.apache.org/jira/browse/KAFKA-6889 > Project: Kafka > Issue Type: Task > Components: streams, system tests >Reporter: Matthias J. Sax >Priority: Minor > Labels: beginner, easyfix > Attachments: Screen Shot 2018-06-11 at 7.15.02 PM.png > > > In `streams.py` the class `StreamsTestBaseService` lists many log files that > are only used in certain tests. For all tests, that do not use those log > files, during the test run WARN messages are produced: > {noformat} > [WARNING - 2018-05-09 13:51:22,065 - test - compress_service_logs - > lineno:131]: Error compressing log /mnt/streams/streams.stdout.0-6: service > ['worker7']>: ubuntu@worker7: Command 'cd "$(dirname > /mnt/streams/streams.stdout.0-6)" && f="$(basename > /mnt/streams/streams.stdout.0-6)" && tar czf "$f.tgz" "$f" && rm -rf > /mnt/streams/streams.stdout.0-6' returned non-zero exit status 2. Remote > error message: tar: streams.stdout.0-6: Cannot stat: No such file or directory > tar: Exiting with failure status due to previous errors > {noformat} > Those message spam the output and might be miss leading. We should update the > Streams system tests accordingly such that each test only specifies the > log-file names it actually uses to avoid the WARN message. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7038) Support AdminClient Example
darion yaphet created KAFKA-7038: Summary: Support AdminClient Example Key: KAFKA-7038 URL: https://issues.apache.org/jira/browse/KAFKA-7038 Project: Kafka Issue Type: New Feature Components: admin Reporter: darion yaphet -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6889) Fix Streams system test to only specify used log files
[ https://issues.apache.org/jira/browse/KAFKA-6889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508243#comment-16508243 ] Guozhang Wang commented on KAFKA-6889: -- Confluent jenkins has read access but not write access: i.e. you can check the results of a certain job, but cannot create new jobs. > Fix Streams system test to only specify used log files > -- > > Key: KAFKA-6889 > URL: https://issues.apache.org/jira/browse/KAFKA-6889 > Project: Kafka > Issue Type: Task > Components: streams, system tests >Reporter: Matthias J. Sax >Priority: Minor > Labels: beginner, easyfix > > In `streams.py` the class `StreamsTestBaseService` lists many log files that > are only used in certain tests. For all tests, that do not use those log > files, during the test run WARN messages are produced: > {noformat} > [WARNING - 2018-05-09 13:51:22,065 - test - compress_service_logs - > lineno:131]: Error compressing log /mnt/streams/streams.stdout.0-6: service > ['worker7']>: ubuntu@worker7: Command 'cd "$(dirname > /mnt/streams/streams.stdout.0-6)" && f="$(basename > /mnt/streams/streams.stdout.0-6)" && tar czf "$f.tgz" "$f" && rm -rf > /mnt/streams/streams.stdout.0-6' returned non-zero exit status 2. Remote > error message: tar: streams.stdout.0-6: Cannot stat: No such file or directory > tar: Exiting with failure status due to previous errors > {noformat} > Those message spam the output and might be miss leading. We should update the > Streams system tests accordingly such that each test only specifies the > log-file names it actually uses to avoid the WARN message. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7037) delete topic command replaces '+' from the topic name which leads incorrect topic deletion
[ https://issues.apache.org/jira/browse/KAFKA-7037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sandeep Nemuri updated KAFKA-7037: -- Description: While executing a delete command kafka cli tool is removing the "+" symbol and deleting the incorrect topic. In below case if _"*test+topic"*_ is deleted kafka deletes _*testtopic.*_ {code:java} [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 --topic testtopic Created topic "testtopic". [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper `hostname`:2181 --topic test+topic --delete Topic testtopic is marked for deletion.{code} delete topic replaces '+' from the topic name was: While executing a delete command kafka cli tool is removing the "+" symbol and deleting the incorrect topic. In below case if _"*test+topic"*_ is deleted kafka deletes _*testtopic.*_ {code:java} [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 --topic testtopic Created topic "testtopic". [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper `hostname`:2181 --topic test+topic --delete Topic testtopic is marked for deletion.{code} delete topic replaces '+' and few other special characters from the topic name : [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/consumer/TopicFilter.scala#L28-L33] > delete topic command replaces '+' from the topic name which leads incorrect > topic deletion > -- > > Key: KAFKA-7037 > URL: https://issues.apache.org/jira/browse/KAFKA-7037 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.0, 1.0.0 >Reporter: Sandeep Nemuri >Priority: Major > > While executing a delete command kafka cli tool is removing the "+" symbol > and deleting the incorrect topic. In below case if _"*test+topic"*_ is > deleted kafka deletes _*testtopic.*_ > {code:java} > [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh > --create --zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 > --topic testtopic > Created topic "testtopic". > [kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh > --zookeeper `hostname`:2181 --topic test+topic --delete > Topic testtopic is marked for deletion.{code} > delete topic replaces '+' from the topic name -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6788) Grouping consumer requests per consumer coordinator in admin client
[ https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508215#comment-16508215 ] ASF GitHub Bot commented on KAFKA-6788: --- cyrusv closed pull request #4878: KAFKA-6788: Combine queries for describe and delete groups in AdminCl… URL: https://github.com/apache/kafka/pull/4878 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index fa3f943555b..cd79453d29a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -2252,14 +2252,17 @@ public DescribeConsumerGroupsResult describeConsumerGroups(final Collection describedGroupIds = new HashSet<>(); + for (final Map.Entry> entry : futures.entrySet()) { // skip sending request for those futures that already failed. if (entry.getValue().isCompletedExceptionally()) continue; final String groupId = entry.getKey(); +if (describedGroupIds.contains(groupId)) { +continue; +} final long startFindCoordinatorMs = time.milliseconds(); final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs()); @@ -2274,53 +2277,82 @@ public DescribeConsumerGroupsResult describeConsumerGroups(final Collection future = futures.get(groupId); -final DescribeGroupsResponse.GroupMetadata groupMetadata = response.groups().get(groupId); +final Set groupIdsToDescribe = new HashSet<>(); +for (ListGroupsResponse.Group group : listResponse.groups()) { +groupIdsToDescribe.add(group.groupId()); +} -final Errors groupError = groupMetadata.error(); -if (groupError != Errors.NONE) { -// TODO: KAFKA-6789, we can retry based on the error code - future.completeExceptionally(groupError.exception()); -} else { -final String protocolType = groupMetadata.protocolType(); -if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) { -final List members = groupMetadata.members(); -final List consumers = new ArrayList<>(members.size()); - -for (DescribeGroupsResponse.GroupMember groupMember : members) { -final PartitionAssignor.Assignment assignment = - ConsumerProtocol.deserializeAssignment( - ByteBuffer.wrap(Utils.readBytes(groupMember.memberAssignment(; - -final MemberDescription memberDescription = -new MemberDescription( -groupMember.memberId(), -groupMember.clientId(), - groupMember.clientHost(), -new MemberAssignment(assignment.partitions())); -consumers.add(memberDescription); +runnable.call(new Call("describeConsumerGroups", deadline, new ConstantNodeIdProvider(nodeId)) { + +@Override +AbstractRequest.Builder createRequest(int timeoutMs) { +return new DescribeGroupsRequest.Builder(Collections.singletonList(groupId)); +} + +@Override +void handleResponse(AbstractResponse abstractResponse) { +final DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse; +for (String describedCandidate : groupIdsToDescribe) { +if (response.groups().containsKey(describedCandidate)) { + describedGroupIds.add(describedCandidate); +} +
[jira] [Commented] (KAFKA-7033) Modify AbstractOptions's timeoutMs as Long type
[ https://issues.apache.org/jira/browse/KAFKA-7033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508205#comment-16508205 ] ASF GitHub Bot commented on KAFKA-7033: --- darionyaphet opened a new pull request #5186: [KAFKA-7033] Modify AbstractOptions's timeoutMs as Long type URL: https://github.com/apache/kafka/pull/5186 [KAFKA-7033 | Modify AbstractOptions's timeoutMs as Long type](https://issues.apache.org/jira/browse/KAFKA-7033) Currently AbstractOptions's timeoutMs is Integer and using Long to represent timeout Millisecond maybe better . ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Modify AbstractOptions's timeoutMs as Long type > --- > > Key: KAFKA-7033 > URL: https://issues.apache.org/jira/browse/KAFKA-7033 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 1.1.0 >Reporter: darion yaphet >Priority: Minor > > Currently AbstractOptions's timeoutMs is Integer and using Long to represent > timeout Millisecond maybe better . -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue
[ https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-3042: -- Fix Version/s: (was: 2.0.0) 2.1.0 Moving this out to 2.1.0 since there is no fix yet. > updateIsr should stop after failed several times due to zkVersion issue > --- > > Key: KAFKA-3042 > URL: https://issues.apache.org/jira/browse/KAFKA-3042 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.10.0.0 > Environment: jdk 1.7 > centos 6.4 >Reporter: Jiahongchao >Assignee: Dong Lin >Priority: Major > Labels: reliability > Fix For: 2.1.0 > > Attachments: controller.log, server.log.2016-03-23-01, > state-change.log > > > sometimes one broker may repeatly log > "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR" > I think this is because the broker consider itself as the leader in fact it's > a follower. > So after several failed tries, it need to find out who is the leader -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4682) Committed offsets should not be deleted if a consumer is still active (KIP-211)
[ https://issues.apache.org/jira/browse/KAFKA-4682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-4682: -- Fix Version/s: (was: 2.0.0) 2.1.0 Moving this out to 2.1.0. > Committed offsets should not be deleted if a consumer is still active > (KIP-211) > --- > > Key: KAFKA-4682 > URL: https://issues.apache.org/jira/browse/KAFKA-4682 > Project: Kafka > Issue Type: Bug >Reporter: James Cheng >Assignee: Vahid Hashemian >Priority: Major > Labels: kip > Fix For: 2.1.0 > > > Kafka will delete committed offsets that are older than > offsets.retention.minutes > If there is an active consumer on a low traffic partition, it is possible > that Kafka will delete the committed offset for that consumer. Once the > offset is deleted, a restart or a rebalance of that consumer will cause the > consumer to not find any committed offset and start consuming from > earliest/latest (depending on auto.offset.reset). I'm not sure, but a broker > failover might also cause you to start reading from auto.offset.reset (due to > broker restart, or coordinator failover). > I think that Kafka should only delete offsets for inactive consumers. The > timer should only start after a consumer group goes inactive. For example, if > a consumer group goes inactive, then after 1 week, delete the offsets for > that consumer group. This is a solution that [~junrao] mentioned in > https://issues.apache.org/jira/browse/KAFKA-3806?focusedCommentId=15323521=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15323521 > The current workarounds are to: > # Commit an offset on every partition you own on a regular basis, making sure > that it is more frequent than offsets.retention.minutes (a broker-side > setting that a consumer might not be aware of) > or > # Turn the value of offsets.retention.minutes up really really high. You have > to make sure it is higher than any valid low-traffic rate that you want to > support. For example, if you want to support a topic where someone produces > once a month, you would have to set offsetes.retention.mintues to 1 month. > or > # Turn on enable.auto.commit (this is essentially #1, but easier to > implement). > None of these are ideal. > #1 can be spammy. It requires your consumers know something about how the > brokers are configured. Sometimes it is out of your control. Mirrormaker, for > example, only commits offsets on partitions where it receives data. And it is > duplication that you need to put into all of your consumers. > #2 has disk-space impact on the broker (in __consumer_offsets) as well as > memory-size on the broker (to answer OffsetFetch). > #3 I think has the potential for message loss (the consumer might commit on > messages that are not yet fully processed) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5054) ChangeLoggingKeyValueByteStore delete and putIfAbsent should be synchronized
[ https://issues.apache.org/jira/browse/KAFKA-5054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-5054: -- Fix Version/s: (was: 2.0.0) 2.1.0 Since there has been no activity on this for a long time, moving out to 2.1.0. > ChangeLoggingKeyValueByteStore delete and putIfAbsent should be synchronized > > > Key: KAFKA-5054 > URL: https://issues.apache.org/jira/browse/KAFKA-5054 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Damian Guy >Assignee: Damian Guy >Priority: Critical > Fix For: 2.1.0 > > > {{putIfAbsent}} and {{delete}} should be synchronized as they involve at > least 2 operations on the underlying store and may result in inconsistent > results if someone were to query via IQ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3190) KafkaProducer should not invoke callback in send()
[ https://issues.apache.org/jira/browse/KAFKA-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-3190: -- Fix Version/s: (was: 2.0.0) 2.1.0 Since there has been no activity on the PR for several months, moving this out to 2.1.0. > KafkaProducer should not invoke callback in send() > -- > > Key: KAFKA-3190 > URL: https://issues.apache.org/jira/browse/KAFKA-3190 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 0.9.0.0 >Reporter: Jiangjie Qin >Assignee: Jiangjie Qin >Priority: Critical > Fix For: 2.1.0 > > > Currently KafkaProducer will invoke callback.onComplete() if it receives an > ApiException during send(). This breaks the guarantee that callback will be > invoked in order. It seems ApiException in send() only comes from metadata > refresh. If so, we can probably simply throw it instead of invoking > callback(). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4701) Allow kafka brokers to dynamically reload truststore without restarting.
[ https://issues.apache.org/jira/browse/KAFKA-4701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-4701: -- Fix Version/s: (was: 2.0.0) 2.1.0 We have added dynamic update of truststore using AdminClient/kafka-configs.sh with new truststore files. Moving this out to 2.1.0 to see if we need to support updates without filename changes. > Allow kafka brokers to dynamically reload truststore without restarting. > > > Key: KAFKA-4701 > URL: https://issues.apache.org/jira/browse/KAFKA-4701 > Project: Kafka > Issue Type: Improvement > Components: security >Reporter: Allen Xiang >Priority: Major > Labels: security > Fix For: 2.1.0 > > > Right now in order to add SSL clients(update broker truststores), a rolling > restart of all brokers is required. This is very time consuming and > unnecessary. A dynamic truststore manager is needed to reload truststore from > file system without restarting brokers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4808) send of null key to a compacted topic should throw error back to user
[ https://issues.apache.org/jira/browse/KAFKA-4808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-4808: -- Fix Version/s: (was: 2.0.0) 2.1.0 Since there has been no activity on the PR for several months, moving this out to 2.1.0. > send of null key to a compacted topic should throw error back to user > - > > Key: KAFKA-4808 > URL: https://issues.apache.org/jira/browse/KAFKA-4808 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 0.10.2.0 >Reporter: Ismael Juma >Assignee: Mayuresh Gharat >Priority: Major > Fix For: 2.1.0 > > > If a message with a null key is produced to a compacted topic, the broker > returns `CorruptRecordException`, which is a retriable exception. As such, > the producer keeps retrying until retries are exhausted or request.timeout.ms > expires and eventually throws a TimeoutException. This is confusing and not > user-friendly. > We should throw a meaningful error back to the user. From an implementation > perspective, we would have to use a non retriable error code to avoid this > issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4850) RocksDb cannot use Bloom Filters
[ https://issues.apache.org/jira/browse/KAFKA-4850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-4850: -- Fix Version/s: (was: 2.0.0) 2.1.0 Since there has been no activity on the PR for several months, moving out to 2.1.0. > RocksDb cannot use Bloom Filters > > > Key: KAFKA-4850 > URL: https://issues.apache.org/jira/browse/KAFKA-4850 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Eno Thereska >Assignee: Bharat Viswanadham >Priority: Major > Fix For: 2.1.0 > > > Bloom Filters would speed up RocksDb lookups. However they currently do not > work in RocksDb 5.0.2. This has been fixed in trunk, but we'll have to wait > until that is released and tested. > Then we can add the line in RocksDbStore.java in openDb: > tableConfig.setFilter(new BloomFilter(10)); -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3297) More optimally balanced partition assignment strategy (new consumer)
[ https://issues.apache.org/jira/browse/KAFKA-3297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-3297: -- Fix Version/s: (was: 2.0.0) 2.1.0 Since there has been no activity on the PR for several months, moving this out to 2.1.0. > More optimally balanced partition assignment strategy (new consumer) > > > Key: KAFKA-3297 > URL: https://issues.apache.org/jira/browse/KAFKA-3297 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Andrew Olson >Assignee: Andrew Olson >Priority: Major > Fix For: 2.1.0 > > > While the roundrobin partition assignment strategy is an improvement over the > range strategy, when the consumer topic subscriptions are not identical > (previously disallowed but will be possible as of KAFKA-2172) it can produce > heavily skewed assignments. As suggested > [here|https://issues.apache.org/jira/browse/KAFKA-2172?focusedCommentId=14530767=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14530767] > it would be nice to have a strategy that attempts to assign an equal number > of partitions to each consumer in a group, regardless of how similar their > individual topic subscriptions are. We can accomplish this by tracking the > number of partitions assigned to each consumer, and having the partition > assignment loop assign each partition to a consumer interested in that topic > with the least number of partitions assigned. > Additionally, we can optimize the distribution fairness by adjusting the > partition assignment order: > * Topics with fewer consumers are assigned first. > * In the event of a tie for least consumers, the topic with more partitions > is assigned first. > The general idea behind these two rules is to keep the most flexible > assignment choices available as long as possible by starting with the most > constrained partitions/consumers. > This JIRA addresses the new consumer. For the original high-level consumer, > see KAFKA-2435. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5952) Refactor Consumer Fetcher metrics
[ https://issues.apache.org/jira/browse/KAFKA-5952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-5952: -- Fix Version/s: (was: 2.0.0) 2.1.0 Since there has been no activity on the PR for several months, moving this out to 2.1.0. > Refactor Consumer Fetcher metrics > - > > Key: KAFKA-5952 > URL: https://issues.apache.org/jira/browse/KAFKA-5952 > Project: Kafka > Issue Type: Sub-task >Reporter: James Cheng >Assignee: James Cheng >Priority: Major > Fix For: 2.1.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5951) Autogenerate Producer RecordAccumulator metrics
[ https://issues.apache.org/jira/browse/KAFKA-5951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-5951: -- Fix Version/s: (was: 2.0.0) 2.1.0 Since there has been no activity on this for several months, moving this out to 2.1.0. > Autogenerate Producer RecordAccumulator metrics > --- > > Key: KAFKA-5951 > URL: https://issues.apache.org/jira/browse/KAFKA-5951 > Project: Kafka > Issue Type: Sub-task >Reporter: James Cheng >Assignee: James Cheng >Priority: Major > Fix For: 2.1.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5403) Transactions system test should dedup consumed messages by offset
[ https://issues.apache.org/jira/browse/KAFKA-5403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-5403: -- Fix Version/s: (was: 2.0.0) 2.1.0 Since there has been no activity on this for several months, moving this out to 2.1.0. > Transactions system test should dedup consumed messages by offset > - > > Key: KAFKA-5403 > URL: https://issues.apache.org/jira/browse/KAFKA-5403 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0 >Reporter: Apurva Mehta >Assignee: Apurva Mehta >Priority: Major > Fix For: 2.1.0 > > > In KAFKA-5396, we saw that the consumers which verify the data in multiple > topics could read the same offsets multiple times, for instance when a > rebalance happens. > This would detect spurious duplicates, causing the test to fail. We should > dedup the consumed messages by offset and only fail the test if we have > duplicate values for a if for a unique set of offsets. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4862) Kafka client connect to a shutdown node will block for a long time
[ https://issues.apache.org/jira/browse/KAFKA-4862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-4862: -- Fix Version/s: (was: 2.0.0) 2.1.0 The KIP associated with this JIRA has not been accepted yet, moving out to 2.1.0 > Kafka client connect to a shutdown node will block for a long time > -- > > Key: KAFKA-4862 > URL: https://issues.apache.org/jira/browse/KAFKA-4862 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0, 0.10.2.0 >Reporter: Pengwei >Assignee: Pengwei >Priority: Major > Fix For: 2.1.0 > > > Currently in our test env, we found after one of the broker node crash(reboot > or os crash), the client maybe still connecting to the crash node to send > metadata request or other request, and it need about several minutes to > aware the connection is timeout then try another node to connect to send the > request. Then the client may still not aware the metadata change after > several minutes. > We don't have a connection timeout for the network client, we should add a > connection timeout for the client -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4665) Inconsistent handling of non-existing topics in offset fetch handling
[ https://issues.apache.org/jira/browse/KAFKA-4665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-4665: -- Fix Version/s: (was: 2.0.0) 2.1.0 Since there has been no activity on this for several months, moving this out to 2.1.0. > Inconsistent handling of non-existing topics in offset fetch handling > - > > Key: KAFKA-4665 > URL: https://issues.apache.org/jira/browse/KAFKA-4665 > Project: Kafka > Issue Type: Bug > Components: consumer, core >Reporter: Jason Gustafson >Assignee: Vahid Hashemian >Priority: Major > Fix For: 2.1.0 > > > For version 0 of the offset fetch API, the broker returns > UNKNOWN_TOPIC_OR_PARTITION for any topics/partitions which do not exist at > the time of fetching. In later versions, we skip this check. We do, however, > continue to return UNKNOWN_TOPIC_OR_PARTITION for authorization errors (i.e. > if the principal does not have Describe access to the corresponding topic). > We should probably make this behavior consistent across versions. > Note also that currently the consumer raises {{KafkaException}} when it > encounters an UNKNOWN_TOPIC_OR_PARTITION error in the offset fetch response, > which is inconsistent with how we usually handle this error. This probably > doesn't cause any problems currently only because of the inconsistency > mentioned in the first paragraph above. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4307) Inconsistent parameters between console producer and consumer
[ https://issues.apache.org/jira/browse/KAFKA-4307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-4307: -- Fix Version/s: (was: 2.0.0) 2.1.0 Since there has been no activity on this for several months, moving this out to 2.1.0. > Inconsistent parameters between console producer and consumer > - > > Key: KAFKA-4307 > URL: https://issues.apache.org/jira/browse/KAFKA-4307 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.10.1.0 >Reporter: Gwen Shapira >Assignee: Balint Molnar >Priority: Major > Labels: newbie > Fix For: 2.1.0 > > > kafka-console-producer uses --broker-list while kafka-console-consumer uses > --bootstrap-server. > Let's add --bootstrap-server to the producer for some consistency? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4249) Document how to customize GC logging options for broker
[ https://issues.apache.org/jira/browse/KAFKA-4249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-4249: -- Fix Version/s: (was: 2.0.0) 2.1.0 Since there has been no activity on this for several months, moving this out to 2.1.0. > Document how to customize GC logging options for broker > --- > > Key: KAFKA-4249 > URL: https://issues.apache.org/jira/browse/KAFKA-4249 > Project: Kafka > Issue Type: Improvement > Components: documentation >Affects Versions: 0.10.0.1 >Reporter: Jim Hoagland >Assignee: Tom Bentley >Priority: Major > Fix For: 2.1.0 > > > We wanted to enable GC logging for Kafka broker and saw that you can set > GC_LOG_ENABLED=true. However, this didn't do what we wanted. For example, > the GC log will be overwritten every time the broker gets restarted. It > wasn't clear how we could do that (no documentation of it that I can find), > so I did some research by looking at the source code and did some testing and > found that KAFKA_GC_LOG_OPTS could be set with alternate JVM options prior to > starting broker. I posted my solution to StackOverflow: > > http://stackoverflow.com/questions/39854424/how-to-enable-gc-logging-for-apache-kafka-brokers-while-preventing-log-file-ove > (feel free to critique) > That solution is now public, but it seems like the Kafka documentation should > say how to do this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3554) Generate actual data with specific compression ratio and add multi-thread support in the ProducerPerformance tool.
[ https://issues.apache.org/jira/browse/KAFKA-3554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-3554: -- Fix Version/s: (was: 2.0.0) 2.1.0 Since there has been no activity on this for several weeks, moving this out to 2.1.0. > Generate actual data with specific compression ratio and add multi-thread > support in the ProducerPerformance tool. > -- > > Key: KAFKA-3554 > URL: https://issues.apache.org/jira/browse/KAFKA-3554 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.9.0.1 >Reporter: Jiangjie Qin >Assignee: Jiangjie Qin >Priority: Major > Fix For: 2.1.0 > > > Currently the ProducerPerformance always generate the payload with same > bytes. This does not quite well to test the compressed data because the > payload is extremely compressible no matter how big the payload is. > We can make some changes to make it more useful for compressed messages. > Currently I am generating the payload containing integer from a given range. > By adjusting the range of the integers, we can get different compression > ratios. > API wise, we can either let user to specify the integer range or the expected > compression ratio (we will do some probing to get the corresponding range for > the users) > Besides that, in many cases, it is useful to have multiple producer threads > when the producer threads themselves are bottleneck. Admittedly people can > run multiple ProducerPerformance to achieve similar result, but it is still > different from the real case when people actually use the producer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4203) Java producer default max message size does not align with broker default
[ https://issues.apache.org/jira/browse/KAFKA-4203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-4203: -- Fix Version/s: (was: 2.0.0) 2.1.0 Since there has been no activity on this for several months, moving this out to 2.1.0. > Java producer default max message size does not align with broker default > - > > Key: KAFKA-4203 > URL: https://issues.apache.org/jira/browse/KAFKA-4203 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.1 >Reporter: Grant Henke >Assignee: Ismael Juma >Priority: Major > Fix For: 2.1.0 > > > The Java producer sets max.request.size = 1048576 (the base 2 version of 1 MB > (MiB)) > The broker sets max.message.bytes = 112 (the base 10 value of 1 MB + 12 > bytes for overhead) > This means that by default the producer can try to produce messages larger > than the broker will accept resulting in RecordTooLargeExceptions. > There were not similar issues in the old producer because it sets > max.message.size = 100 (the base 10 value of 1 MB) > I propose we increase the broker default for max.message.bytes to 1048588 > (the base 2 value of 1 MB (MiB) + 12 bytes for overhead) so that any message > produced with default configs from either producer does not result in a > RecordTooLargeException. -- This message was sent by Atlassian JIRA (v7.6.3#76005)