[jira] [Commented] (KAFKA-7245) Deprecate WindowStore#put(key, value)
[ https://issues.apache.org/jira/browse/KAFKA-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885855#comment-16885855 ] Omkar Mestry commented on KAFKA-7245: - So should I start making changes in code for removing the usage of method. > Deprecate WindowStore#put(key, value) > - > > Key: KAFKA-7245 > URL: https://issues.apache.org/jira/browse/KAFKA-7245 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Omkar Mestry >Priority: Minor > Labels: kip, newbie > > We want to remove `WindowStore#put(key, value)` – for this, we first need to > deprecate is via a KIP and remove later. > Instead of using `WindowStore#put(key, value)` we need to migrate code to > specify the timestamp explicitly using `WindowStore#put(key, value, > timestamp)`. The current code base use the explicit call to set the timestamp > in production code already. The simplified `put(key, value)` is only used in > tests, and thus, we would need to update those tests. > KIP-474 :- > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=115526545] -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (KAFKA-7711) Add a bounded flush() API to Kafka Producer
[ https://issues.apache.org/jira/browse/KAFKA-7711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885795#comment-16885795 ] Richard Yu edited comment on KAFKA-7711 at 7/16/19 3:01 AM: [~hachikuji] I think I've seen this issue come up multiple times in the past. Not sure exactly which issue numbers. Do you think this is a issue that really needs to be tackled? Edit: Got it somewhat mixed up. There had been issues with Kafka's send API in the past. For example, KAFKA-6705. But not the flush() API. It might be good to have something like it though. was (Author: yohan123): [~hachikuji] I think I've seen this issue come up multiple times in the past. Not sure exactly which issue numbers. Do you think this is a issue that really needs to be tackled? > Add a bounded flush() API to Kafka Producer > > > Key: KAFKA-7711 > URL: https://issues.apache.org/jira/browse/KAFKA-7711 > Project: Kafka > Issue Type: Improvement > Components: producer >Reporter: kun du >Priority: Minor > Labels: needs-kip > > Currently the call to Producer.flush() can be hang there for indeterminate > time. > It is a good idea to add a bounded flush() API and timeout if producer is > unable to flush all the batch records in a limited time. In this way the > caller of flush() has a chance to decide what to do next instead of just wait > forever. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-7711) Add a bounded flush() API to Kafka Producer
[ https://issues.apache.org/jira/browse/KAFKA-7711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885795#comment-16885795 ] Richard Yu commented on KAFKA-7711: --- [~hachikuji] I think I've seen this issue come up multiple times in the past. Not sure exactly which issue numbers. Do you think this is a issue that really needs to be tackled? > Add a bounded flush() API to Kafka Producer > > > Key: KAFKA-7711 > URL: https://issues.apache.org/jira/browse/KAFKA-7711 > Project: Kafka > Issue Type: Improvement > Components: producer >Reporter: kun du >Priority: Minor > Labels: needs-kip > > Currently the call to Producer.flush() can be hang there for indeterminate > time. > It is a good idea to add a bounded flush() API and timeout if producer is > unable to flush all the batch records in a limited time. In this way the > caller of flush() has a chance to decide what to do next instead of just wait > forever. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8555) Flaky test ExampleConnectIntegrationTest#testSourceConnector
[ https://issues.apache.org/jira/browse/KAFKA-8555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885767#comment-16885767 ] Matthias J. Sax commented on KAFKA-8555: [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/326/testReport/junit/org.apache.kafka.connect.integration/ExampleConnectIntegrationTest/testSourceConnector/] > Flaky test ExampleConnectIntegrationTest#testSourceConnector > > > Key: KAFKA-8555 > URL: https://issues.apache.org/jira/browse/KAFKA-8555 > Project: Kafka > Issue Type: Bug >Reporter: Boyang Chen >Priority: Major > Attachments: log-job139.txt, log-job141.txt, log-job23145.txt, > log-job23215.txt, log-job6046.txt > > > [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/22798/console] > *02:03:21* > org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSourceConnector > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.11/connect/runtime/build/reports/testOutput/org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSourceConnector.test.stdout*02:03:21* > *02:03:21* > org.apache.kafka.connect.integration.ExampleConnectIntegrationTest > > testSourceConnector FAILED*02:03:21* > org.apache.kafka.connect.errors.DataException: Insufficient records committed > by connector simple-conn in 15000 millis. Records expected=2000, > actual=1013*02:03:21* at > org.apache.kafka.connect.integration.ConnectorHandle.awaitCommits(ConnectorHandle.java:188)*02:03:21* > at > org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSourceConnector(ExampleConnectIntegrationTest.java:181)*02:03:21* -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8672) RebalanceSourceConnectorsIntegrationTest#testReconfigConnector
Matthias J. Sax created KAFKA-8672: -- Summary: RebalanceSourceConnectorsIntegrationTest#testReconfigConnector Key: KAFKA-8672 URL: https://issues.apache.org/jira/browse/KAFKA-8672 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.4.0 Reporter: Matthias J. Sax [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6281/testReport/junit/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/testReconfigConnector/] {quote}java.lang.RuntimeException: Could not find enough records. found 33, expected 100 at org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.consume(EmbeddedKafkaCluster.java:306) at org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testReconfigConnector(RebalanceSourceConnectorsIntegrationTest.java:180){quote} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8412) Still a nullpointer exception thrown on shutdown while flushing before closing producers
[ https://issues.apache.org/jira/browse/KAFKA-8412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885740#comment-16885740 ] Matthias J. Sax commented on KAFKA-8412: Just cycling back to this. I am not sure if we should do the "close-and-then-recreate" strategy. The issues seems to be, that we blindly call `task#close()` for all tasks within `AssingedTasks#close()`. However, it seems that a proper fix would be to distinguish between active and suspended tasks, and call `closeSuspended()` for suspended ones, and `close()` for all others? With regard to KAFKA-7678: re-reading the ticket, the report says, that it should be a graceful shutdown via SIGTERM. Hence, I am wondering why the unclean shutdown path is actually executed (ie, why do we call `maybeAbortTransactionAndCloseRecordCollector()` – this should only happen for an unclean shutdown). \cc [~pachilo] who reported and work on the first NPE > Still a nullpointer exception thrown on shutdown while flushing before > closing producers > > > Key: KAFKA-8412 > URL: https://issues.apache.org/jira/browse/KAFKA-8412 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.1 >Reporter: Sebastiaan >Assignee: Matthias J. Sax >Priority: Minor > > I found a closed issue and replied there but decided to open one myself > because although they're related they're slightly different. The original > issue is at https://issues.apache.org/jira/browse/KAFKA-7678 > The fix there has been to implement a null check around closing a producer > because in some cases the producer is already null there (has been closed > already) > In version 2.1.1 we are getting a very similar exception, but in the 'flush' > method that is called pre-close. This is in the log: > {code:java} > message: stream-thread > [webhook-poster-7034dbb0-7423-476b-98f3-d18db675d6d6-StreamThread-1] Failed > while closing StreamTask 1_26 due to the following error: > logger_name: org.apache.kafka.streams.processor.internals.AssignedStreamsTasks > java.lang.NullPointerException: null > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245) > at > org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443) > at > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568) > at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397) > at > org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) > at > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code} > Followed by: > > {code:java} > message: task [1_26] Could not close task due to the following error: > logger_name: org.apache.kafka.streams.processor.internals.StreamTask > java.lang.NullPointerException: null > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245) > at > org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443) > at > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568) > at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397) > at > org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) > at > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code} > If I look at the source code at this point, I see a nice null check in the > close method, but not in the flush method that is called just before that: > {code:java} > public void flush() { > this.log.debug("Flushing producer"); > this.producer.flush(); > this.checkForException(); > } > public void close() { > this.log.debug("Closing producer"); > if (this.producer != null) { > this.producer.close(); > this.producer = null; > } > this.checkForException(); > }{code} > Seems to my (ignorant) eye that the flush method should also be wrapped in a > null check in the same way
[jira] [Commented] (KAFKA-8179) Incremental Rebalance Protocol for Kafka Consumer
[ https://issues.apache.org/jira/browse/KAFKA-8179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885726#comment-16885726 ] ASF GitHub Bot commented on KAFKA-8179: --- ableegoldman commented on pull request #7095: KAFKA-8179: Minor, add ownedPartitions to PartitionAssignor#subscription URL: https://github.com/apache/kafka/pull/7095 This is just a just a minor PR for KIP-429, which needs `ConsumerCoordinator` to pass the current assignment to the assignor when building the subscription. Since this is in the internal package and a cursory search of github suggests not too many people have implemented this method, we just change the signature rather than add a new method and deprecate the old one. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Incremental Rebalance Protocol for Kafka Consumer > - > > Key: KAFKA-8179 > URL: https://issues.apache.org/jira/browse/KAFKA-8179 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > > Recently Kafka community is promoting cooperative rebalancing to mitigate the > pain points in the stop-the-world rebalancing protocol. This ticket is > created to initiate that idea at the Kafka consumer client, which will be > beneficial for heavy-stateful consumers such as Kafka Streams applications. > In short, the scope of this ticket includes reducing unnecessary rebalance > latency due to heavy partition migration: i.e. partitions being revoked and > re-assigned. This would make the built-in consumer assignors (range, > round-robin etc) to be aware of previously assigned partitions and be sticky > in best-effort. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8671) NullPointerException occurs if topic associated with GlobalKTable changes
[ https://issues.apache.org/jira/browse/KAFKA-8671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885709#comment-16885709 ] Matthias J. Sax commented on KAFKA-8671: This issue might be related to KAFKA-5998 – if we cannot reproduce on `trunk` any longer, the fix of KAFKA-5998 might fix this one, too. > NullPointerException occurs if topic associated with GlobalKTable changes > - > > Key: KAFKA-8671 > URL: https://issues.apache.org/jira/browse/KAFKA-8671 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Alex Leung >Priority: Critical > > The following NullPointerException occurs when the global/.checkpoint file > contains a line with a topic previously associated with (but no longer > configured for) a GlobalKTable: > {code:java} > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.update(GlobalStateUpdateTask.java:85) > at > org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:241) > at > org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:290){code} > > After line 84 > ([https://github.com/apache/kafka/blob/2.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java#L84)] > `sourceNodeAndDeserializer` is null for the old, but still valid, topic. > This can be reproduced with the following sequence: > # create a GlobalKTable associated with topic, 'global-topic1' > # change the topic associated with the GlobalKTable to 'global-topic2' > ## at this point, the global/.checkpoint file will contain lines for both > topics > # produce messages to previous topic ('global-topic1') > # the consumer will attempt to consume from global-topic1, but no > deserializer associated with global-topic1 will be found and the NPE will > occur > It looks like the following recent commit has included checkpoint validations > that may prevent this issue: > https://github.com/apache/kafka/commit/53b4ce5c00d61be87962f603682873665155cec4#diff-cc98a6c20f2a8483e1849aea6921c34dR425 -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Resolved] (KAFKA-8637) WriteBatch objects leak off-heap memory
[ https://issues.apache.org/jira/browse/KAFKA-8637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman resolved KAFKA-8637. Resolution: Fixed > WriteBatch objects leak off-heap memory > --- > > Key: KAFKA-8637 > URL: https://issues.apache.org/jira/browse/KAFKA-8637 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0, 2.2.0, 2.3.0 >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Blocker > Fix For: 2.1.2, 2.2.2, 2.4.0, 2.3.1 > > > In 2.1 we did some refactoring that led to the WriteBatch objects in > RocksDBSegmentedBytesStore#restoreAllInternal being created in a separate > method, rather than in a try-with-resources statement as used elsewhere. This > causes a memory leak as the WriteBatches are no longer closed automatically -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8650) Streams does not work as expected with auto.offset.reset=none
[ https://issues.apache.org/jira/browse/KAFKA-8650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885707#comment-16885707 ] Matthias J. Sax commented on KAFKA-8650: Splitting out initialization is not easy. The problem is that one might start multiple instances at once, but only one instance should do the initialization. Furthermore, processing can only start after the initialization is completed. Atm, we rely on consumer rebalance protocol to ensure this property: the group leader will do the initialization. If we separate both, there are two problems if you start multiple instances at the same time: (1) which instance should do the initialization. (2) How do other instances know that the initialization is finished and that they can start processing? Maybe, it would be possible to enhance the reset-tool though: it might be possible to hand in the `StreamsBuilder` and the reset-tool could setup the topics accordingly (may open question raises though; it's just a rough idea). > Streams does not work as expected with auto.offset.reset=none > - > > Key: KAFKA-8650 > URL: https://issues.apache.org/jira/browse/KAFKA-8650 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.3.0 >Reporter: Raman Gupta >Priority: Major > Labels: needs-kip > > The auto.offset.reset policy of none is useful as a safety measure, > especially when > * exactly-once processing is desired, or > * at-least-once is desired, but it is expensive to reprocess from the > beginning. > In this case, using "none" forces the ops team to explicitly set the offset > before the stream can re-start processing, in the (hopefully rare) situations > in which the stream consumer offset has been lost for some reason, or in the > case of a new stream that should not start processing from the beginning or > the end, but somewhere in the middle (this scenario might occur during topic > migrations). > Kafka streams really only supports auto.offset.reset of earliest or latest > (see the `Topology.AutoOffsetReset` enum). It is also possible to use the > auto.offset.reset configuration value, but this works suboptimally because if > the streams application reset tool is used (even with a specific offset > specified), the offset is set for the input topic, but it is not, and cannot > be, set for the internal topics, which won't exist yet. > The internal topics are created by Kafka streams at startup time, but because > the auto.offset.reset policy of "none" is passed to the consumer of those > internal topics, the Kafka stream fails to start with a > "NoOffsetForPartitionException". > Proposals / options: > 1) Allow auto.offset.reset=none to be specified in Consumed.with() so that it > affects the input topics, but not the internal topics. > 2) Allow streams to be configured with auto.offset.reset=none, but explicitly > set the offset to 0 for newly created internal topics. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8671) NullPointerException occurs if topic associated with GlobalKTable changes
Alex Leung created KAFKA-8671: - Summary: NullPointerException occurs if topic associated with GlobalKTable changes Key: KAFKA-8671 URL: https://issues.apache.org/jira/browse/KAFKA-8671 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.0.0 Reporter: Alex Leung The following NullPointerException occurs when the global/.checkpoint file contains a line with a topic previously associated with (but no longer configured for) a GlobalKTable: {code:java} java.lang.NullPointerException at org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.update(GlobalStateUpdateTask.java:85) at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:241) at org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:290){code} After line 84 ([https://github.com/apache/kafka/blob/2.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java#L84)] `sourceNodeAndDeserializer` is null for the old, but still valid, topic. This can be reproduced with the following sequence: # create a GlobalKTable associated with topic, 'global-topic1' # change the topic associated with the GlobalKTable to 'global-topic2' ## at this point, the global/.checkpoint file will contain lines for both topics # produce messages to previous topic ('global-topic1') # the consumer will attempt to consume from global-topic1, but no deserializer associated with global-topic1 will be found and the NPE will occur It looks like the following recent commit has included checkpoint validations that may prevent this issue: https://github.com/apache/kafka/commit/53b4ce5c00d61be87962f603682873665155cec4#diff-cc98a6c20f2a8483e1849aea6921c34dR425 -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8670) kafka-topics.sh shows IllegalArgumentException when describing all topics if no topics exist on the cluster
[ https://issues.apache.org/jira/browse/KAFKA-8670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885704#comment-16885704 ] ASF GitHub Bot commented on KAFKA-8670: --- wyuka commented on pull request #7094: KAFKA-8670 and KAFKA-8053: Fix kafka-topics.sh --describe without --t… URL: https://github.com/apache/kafka/pull/7094 …opic mentioned if there are no topics in cluster; improve error message. If there are no topics in a cluster, kafka-topics.sh --describe without a --topic option should return empty list, do not throw an exception. If there are no matching topics, throw IllegalArgumentException but with better error message. ### KAFKA-8670 We pass a boolean flag to `ensureTopicExists` method indicating whether to throw an exception if there are no topics in the cluster. In case of `kafka-topics.sh --describe`, the exception **should NOT** be thrown if either of these are true - 1. A `--topic` option was not passed to the CLI. In that case, the output should be empty. 2. A `--if-exists` option was passed to the CLI. Earlier, the first condition was not part of the check. This bugfix adds the first condition mentioned above to the check. ### KAFKA-8053 Earlier, when `./kafka-topics.sh` was run with a `--topic` argument that was not found in the list of topics, the exception message was always "Topics in [] does not exist". This was a useless message. I have changed it to "No topic matching found". *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* I added the necessary unit test to check for this case. Also, I ran these commands. The output before and after my change are mentioned inline **Describe without topic name - Before** ``` ./kafka-topics.sh --zookeeper 10.0.32.180:2181,10.0.16.211:2181,10.0.0.220:2181 --describe Error while executing topic command : Topics in [] does not exist [2019-07-15 23:23:25,674] ERROR java.lang.IllegalArgumentException: Topics in [] does not exist at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:416) at kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:332) at kafka.admin.TopicCommand$.main(TopicCommand.scala:66) at kafka.admin.TopicCommand.main(TopicCommand.scala) (kafka.admin.TopicCommand$) ``` **Describe without topic name - After (no output)** ``` ./kafka-topics.sh --zookeeper 10.0.32.180:2181,10.0.16.211:2181,10.0.0.220:2181 --describe ``` **Describe with topic name - Before** ``` ./kafka-topics.sh --zookeeper 10.0.32.180:2181,10.0.16.211:2181,10.0.0.220:2181 --describe --topic topic-1 Error while executing topic command : Topics in [] does not exist [2019-07-15 23:23:36,152] ERROR java.lang.IllegalArgumentException: Topics in [] does not exist at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:416) at kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:332) at kafka.admin.TopicCommand$.main(TopicCommand.scala:66) at kafka.admin.TopicCommand.main(TopicCommand.scala) (kafka.admin.TopicCommand$) ``` **Describe with topic name - After** ``` Error while executing topic command : No topic matching name 'topic-1' found [2019-07-15 23:24:55,169] ERROR java.lang.IllegalArgumentException: No topic matching 'topic-1' found at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:445) at kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:358) at kafka.admin.TopicCommand$.main(TopicCommand.scala:67) at kafka.admin.TopicCommand.main(TopicCommand.scala) (kafka.admin.TopicCommand$) ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > kafka-topics.sh shows IllegalArgumentException when describing all topics if > no topics exist on the cluster > --- > > Key: KAFKA-8670 > URL: https://issues.apache.org/jira/browse/KAFKA-8670 > Project: Kafka >
[jira] [Commented] (KAFKA-8637) WriteBatch objects leak off-heap memory
[ https://issues.apache.org/jira/browse/KAFKA-8637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885702#comment-16885702 ] ASF GitHub Bot commented on KAFKA-8637: --- guozhangwang commented on pull request #7050: KAFKA-8637: WriteBatch objects leak off-heap memory URL: https://github.com/apache/kafka/pull/7050 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > WriteBatch objects leak off-heap memory > --- > > Key: KAFKA-8637 > URL: https://issues.apache.org/jira/browse/KAFKA-8637 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0, 2.2.0, 2.3.0 >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Blocker > Fix For: 2.1.2, 2.2.2, 2.4.0, 2.3.1 > > > In 2.1 we did some refactoring that led to the WriteBatch objects in > RocksDBSegmentedBytesStore#restoreAllInternal being created in a separate > method, rather than in a try-with-resources statement as used elsewhere. This > causes a memory leak as the WriteBatches are no longer closed automatically -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8620) Race condition in StreamThread state change
[ https://issues.apache.org/jira/browse/KAFKA-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8620: --- Fix Version/s: 2.3.1 2.2.2 2.1.2 > Race condition in StreamThread state change > --- > > Key: KAFKA-8620 > URL: https://issues.apache.org/jira/browse/KAFKA-8620 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Fix For: 2.1.2, 2.2.2, 2.4.0, 2.3.1 > > > In the call to `StreamThread.addRecordsToTasks` we don't have synchronization > when we attempt to extract active tasks. If after one long poll in runOnce > the application state changes to PENDING_SHUTDOWN, there is a potential close > on TaskManager which erases the active tasks map, thus triggering NPE and > bringing the thread state to a false shutdown. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8602) StreamThread Dies Because Restore Consumer is not Subscribed to Any Topic
[ https://issues.apache.org/jira/browse/KAFKA-8602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-8602: - Fix Version/s: 2.4.0 > StreamThread Dies Because Restore Consumer is not Subscribed to Any Topic > - > > Key: KAFKA-8602 > URL: https://issues.apache.org/jira/browse/KAFKA-8602 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.1 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Critical > Fix For: 2.4.0 > > > StreamThread dies with the following exception: > {code:java} > java.lang.IllegalStateException: Consumer is not subscribed to any topics or > assigned any partitions > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1199) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) > {code} > The reason is that the restore consumer is not subscribed to any topic. This > happens when a StreamThread gets assigned standby tasks for sub-topologies > with just state stores with disabled logging. > To reproduce the bug start two applications with one StreamThread and one > standby replica each and the following topology. The input topic should have > two partitions: > {code:java} > final StreamsBuilder builder = new StreamsBuilder(); > final String stateStoreName = "myTransformState"; > final StoreBuilder> keyValueStoreBuilder = > > Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName), > Serdes.Integer(), > Serdes.Integer()) > .withLoggingDisabled(); > builder.addStateStore(keyValueStoreBuilder); > builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer())) > .transform(() -> new Transformer Integer>>() { > private KeyValueStore state; > @SuppressWarnings("unchecked") > @Override > public void init(final ProcessorContext context) { > state = (KeyValueStore) > context.getStateStore(stateStoreName); > } > @Override > public KeyValue transform(final Integer key, > final Integer value) { > final KeyValue result = new KeyValue<>(key, > value); > return result; > } > @Override > public void close() {} > }, stateStoreName) > .to(OUTPUT_TOPIC); > {code} > Both StreamThreads should die with the above exception. > The root cause is that standby tasks are created although all state stores of > the sub-topology have logging disabled. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8602) StreamThread Dies Because Restore Consumer is not Subscribed to Any Topic
[ https://issues.apache.org/jira/browse/KAFKA-8602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885676#comment-16885676 ] ASF GitHub Bot commented on KAFKA-8602: --- bbejeck commented on pull request #7092: KAFKA-8602: Separate PR for 2.3 branch URL: https://github.com/apache/kafka/pull/7092 This PR is from the original work by @cadonna in #7008. Due to incompatible changes in trunk that should not get cherry-picked back, a separate PR is required for this bug fix ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > StreamThread Dies Because Restore Consumer is not Subscribed to Any Topic > - > > Key: KAFKA-8602 > URL: https://issues.apache.org/jira/browse/KAFKA-8602 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.1 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Critical > > StreamThread dies with the following exception: > {code:java} > java.lang.IllegalStateException: Consumer is not subscribed to any topics or > assigned any partitions > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1199) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) > {code} > The reason is that the restore consumer is not subscribed to any topic. This > happens when a StreamThread gets assigned standby tasks for sub-topologies > with just state stores with disabled logging. > To reproduce the bug start two applications with one StreamThread and one > standby replica each and the following topology. The input topic should have > two partitions: > {code:java} > final StreamsBuilder builder = new StreamsBuilder(); > final String stateStoreName = "myTransformState"; > final StoreBuilder> keyValueStoreBuilder = > > Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName), > Serdes.Integer(), > Serdes.Integer()) > .withLoggingDisabled(); > builder.addStateStore(keyValueStoreBuilder); > builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer())) > .transform(() -> new Transformer Integer>>() { > private KeyValueStore state; > @SuppressWarnings("unchecked") > @Override > public void init(final ProcessorContext context) { > state = (KeyValueStore) > context.getStateStore(stateStoreName); > } > @Override > public KeyValue transform(final Integer key, > final Integer value) { > final KeyValue result = new KeyValue<>(key, > value); > return result; > } > @Override > public void close() {} > }, stateStoreName) > .to(OUTPUT_TOPIC); > {code} > Both StreamThreads should die with the above exception. > The root cause is that standby tasks are created although all state stores of > the sub-topology have logging disabled. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8670) kafka-topics.sh shows IllegalArgumentException when describing all topics if no topics exist on the cluster
[ https://issues.apache.org/jira/browse/KAFKA-8670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885667#comment-16885667 ] ASF GitHub Bot commented on KAFKA-8670: --- wyuka commented on pull request #7091: KAFKA-8670: Fix kafka-topics.sh --describe without --topic when cluster has no topics. URL: https://github.com/apache/kafka/pull/7091 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > kafka-topics.sh shows IllegalArgumentException when describing all topics if > no topics exist on the cluster > --- > > Key: KAFKA-8670 > URL: https://issues.apache.org/jira/browse/KAFKA-8670 > Project: Kafka > Issue Type: Bug > Components: admin, tools >Affects Versions: 2.2.0, 2.3.0, 2.2.1 >Reporter: Tirtha Chatterjee >Assignee: Tirtha Chatterjee >Priority: Major > > When trying to describe all the kafka-topics.sh utility, a user would run > kafka-topics.sh --describe without passing a --topic option. If there are no > topics on the cluster, Kafka returns an error with IllegalArgumentException. > {code:java} > ./kafka-topics.sh --zookeeper > 172.16.7.230:2181,172.16.17.27:2181,172.16.10.89:2181 --describe > Error while executing topic command : Topics in [] does not exist > [2019-07-07 03:33:15,288] ERROR java.lang.IllegalArgumentException: Topics in > [] does not exist > at > kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:416) > at > kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:332) > at kafka.admin.TopicCommand$.main(TopicCommand.scala:66) > at kafka.admin.TopicCommand.main(TopicCommand.scala) > (kafka.admin.TopicCommand$) > {code} > > If no --topic option is passed to the command, and there are no topics on > the cluster, the command should not fail, rather have empty output. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8620) Race condition in StreamThread state change
[ https://issues.apache.org/jira/browse/KAFKA-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885655#comment-16885655 ] ASF GitHub Bot commented on KAFKA-8620: --- mjsax commented on pull request #7021: KAFKA-8620: fix NPE due to race condition during shutdown while rebalancing URL: https://github.com/apache/kafka/pull/7021 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Race condition in StreamThread state change > --- > > Key: KAFKA-8620 > URL: https://issues.apache.org/jira/browse/KAFKA-8620 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > In the call to `StreamThread.addRecordsToTasks` we don't have synchronization > when we attempt to extract active tasks. If after one long poll in runOnce > the application state changes to PENDING_SHUTDOWN, there is a potential close > on TaskManager which erases the active tasks map, thus triggering NPE and > bringing the thread state to a false shutdown. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8620) Race condition in StreamThread state change
[ https://issues.apache.org/jira/browse/KAFKA-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8620: --- Affects Version/s: 2.1.0 > Race condition in StreamThread state change > --- > > Key: KAFKA-8620 > URL: https://issues.apache.org/jira/browse/KAFKA-8620 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > In the call to `StreamThread.addRecordsToTasks` we don't have synchronization > when we attempt to extract active tasks. If after one long poll in runOnce > the application state changes to PENDING_SHUTDOWN, there is a potential close > on TaskManager which erases the active tasks map, thus triggering NPE and > bringing the thread state to a false shutdown. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8650) Streams does not work as expected with auto.offset.reset=none
[ https://issues.apache.org/jira/browse/KAFKA-8650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885640#comment-16885640 ] Raman Gupta commented on KAFKA-8650: There is also a related API change / improvement that I believe should be in the KIP if one is created. Currently, a new (or reset) stream initializes itself when "start" is called i.e. "start" is overloaded to mean "initialize and start". If "initialize" was possible without start, then it would be easy to call "initialize" to create the stream's topology, then use the application reset tool to set the desired offsets, then use "start" to actually start the stream at the desired place. This would allow the user to carry out this process without actually relying on the system to throw an exception at stream startup time, post-init without an offset. This can also be done in advance of allowing auto.offset.reset = none for streams, which, when done, would just become a safety on the stream.start trigger. > Streams does not work as expected with auto.offset.reset=none > - > > Key: KAFKA-8650 > URL: https://issues.apache.org/jira/browse/KAFKA-8650 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.3.0 >Reporter: Raman Gupta >Priority: Major > Labels: needs-kip > > The auto.offset.reset policy of none is useful as a safety measure, > especially when > * exactly-once processing is desired, or > * at-least-once is desired, but it is expensive to reprocess from the > beginning. > In this case, using "none" forces the ops team to explicitly set the offset > before the stream can re-start processing, in the (hopefully rare) situations > in which the stream consumer offset has been lost for some reason, or in the > case of a new stream that should not start processing from the beginning or > the end, but somewhere in the middle (this scenario might occur during topic > migrations). > Kafka streams really only supports auto.offset.reset of earliest or latest > (see the `Topology.AutoOffsetReset` enum). It is also possible to use the > auto.offset.reset configuration value, but this works suboptimally because if > the streams application reset tool is used (even with a specific offset > specified), the offset is set for the input topic, but it is not, and cannot > be, set for the internal topics, which won't exist yet. > The internal topics are created by Kafka streams at startup time, but because > the auto.offset.reset policy of "none" is passed to the consumer of those > internal topics, the Kafka stream fails to start with a > "NoOffsetForPartitionException". > Proposals / options: > 1) Allow auto.offset.reset=none to be specified in Consumed.with() so that it > affects the input topics, but not the internal topics. > 2) Allow streams to be configured with auto.offset.reset=none, but explicitly > set the offset to 0 for newly created internal topics. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8670) kafka-topics.sh shows IllegalArgumentException when describing all topics if no topics exist on the cluster
[ https://issues.apache.org/jira/browse/KAFKA-8670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885632#comment-16885632 ] ASF GitHub Bot commented on KAFKA-8670: --- wyuka commented on pull request #7091: KAFKA-8670: Fix kafka-topics.sh --describe without --topic when cluster has no topics. URL: https://github.com/apache/kafka/pull/7091 If there are no topics in a cluster, kafka-topics.sh --describe without a --topic option should return empty list, not throw an exception. *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* We pass a boolean flag to `ensureTopicExists` method indicating whether to throw an exception if there are no topics in the cluster. In case of `kafka-topics.sh --describe`, the exception **should NOT** be thrown if either of these are true - 1. A `--topic` option was not passed to the CLI. In that case, the output should be empty. 2. A `--if-exists` option was passed to the CLI. Earlier, the first condition was not part of the check. This bugfix adds the first condition mentioned above to the check. I added the necessary unit test to check for this case. Also, I created a Kafka cluster, and without creating any topics on it, ran ``` ./kafka-topics.sh --zookeeper --describe ``` This did not throw any exception, like it used to earlier. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > kafka-topics.sh shows IllegalArgumentException when describing all topics if > no topics exist on the cluster > --- > > Key: KAFKA-8670 > URL: https://issues.apache.org/jira/browse/KAFKA-8670 > Project: Kafka > Issue Type: Bug > Components: admin, tools >Affects Versions: 2.2.0, 2.3.0, 2.2.1 >Reporter: Tirtha Chatterjee >Assignee: Tirtha Chatterjee >Priority: Major > > When trying to describe all the kafka-topics.sh utility, a user would run > kafka-topics.sh --describe without passing a --topic option. If there are no > topics on the cluster, Kafka returns an error with IllegalArgumentException. > {code:java} > ./kafka-topics.sh --zookeeper > 172.16.7.230:2181,172.16.17.27:2181,172.16.10.89:2181 --describe > Error while executing topic command : Topics in [] does not exist > [2019-07-07 03:33:15,288] ERROR java.lang.IllegalArgumentException: Topics in > [] does not exist > at > kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:416) > at > kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:332) > at kafka.admin.TopicCommand$.main(TopicCommand.scala:66) > at kafka.admin.TopicCommand.main(TopicCommand.scala) > (kafka.admin.TopicCommand$) > {code} > > If no --topic option is passed to the command, and there are no topics on > the cluster, the command should not fail, rather have empty output. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8638) Preferred Leader Blacklist (deprioritized list)
[ https://issues.apache.org/jira/browse/KAFKA-8638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana updated KAFKA-8638: -- Description: Currently, the kafka preferred leader election will pick the broker_id in the topic/partition replica assignments in a priority order when the broker is in ISR. The preferred leader is the broker id in the first position of replica. There are use-cases that, even the first broker in the replica assignment is in ISR, there is a need for it to be moved to the end of ordering (lowest priority) when deciding leadership during preferred leader election. Let’s use topic/partition replica (1,2,3) as an example. 1 is the preferred leader. When preferred leadership is run, it will pick 1 as the leader if it's ISR, if 1 is not online and in ISR, then pick 2, if 2 is not in ISR, then pick 3 as the leader. There are use cases that, even 1 is in ISR, we would like it to be moved to the end of ordering (lowest priority) when deciding leadership during preferred leader election. Below is a list of use cases: * (If broker_id 1 is a swapped failed host and brought up with last segments or latest offset without historical data (There is another effort on this), it's better for it to not serve leadership till it's caught-up. * The cross-data center cluster has AWS instances which have less computing power than the on-prem bare metal machines. We could put the AWS broker_ids in Preferred Leader Blacklist, so on-prem brokers can be elected leaders, without changing the reassignments ordering of the replicas. * If the broker_id 1 is constantly losing leadership after some time: "Flapping". we would want to exclude 1 to be a leader unless all other brokers of this topic/partition are offline. The “Flapping” effect was seen in the past when 2 or more brokers were bad, when they lost leadership constantly/quickly, the sets of partition replicas they belong to will see leadership constantly changing. The ultimate solution is to swap these bad hosts. But for quick mitigation, we can also put the bad hosts in the Preferred Leader Blacklist to move the priority of its being elected as leaders to the lowest. * If the controller is busy serving an extra load of metadata requests and other tasks. we would like to put the controller's leaders to other brokers to lower its CPU load. currently bouncing to lose leadership would not work for Controller, because after the bounce, the controller fails over to another broker. * Avoid bouncing broker in order to lose its leadership: it would be good if we have a way to specify which broker should be excluded from serving traffic/leadership (without changing the replica assignment ordering by reassignments, even though that's quick), and run preferred leader election. A bouncing broker will cause temporary URP, and sometimes other issues. Also a bouncing of broker (e.g. broker_id 1) can temporarily lose all its leadership, but if another broker (e.g. broker_id 2) fails or gets bounced, some of its leaderships will likely failover to broker_id 1 on a replica with 3 brokers. If broker_id 1 is in the blacklist, then in such a scenario even broker_id 2 offline, the 3rd broker can take leadership. The current work-around of the above is to change the topic/partition's replica reassignments to move the broker_id 1 from the first position to the last position and run preferred leader election. e.g. (1, 2, 3) => (2, 3, 1). This changes the replica reassignments, and we need to keep track of the original one and restore if things change (e.g. controller fails over to another broker, the swapped empty broker caught up). That’s a rather tedious task. KIP is located at [KIP-491|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=120736982] was: Currently, the kafka preferred leader election will pick the broker_id in the topic/partition replica assignments in a priority order when the broker is in ISR. The preferred leader is the broker id in the first position of replica. There are use-cases that, even the first broker in the replica assignment is in ISR, there is a need for it to be moved to the end of ordering (lowest priority) when deciding leadership during preferred leader election. Let’s use topic/partition replica (1,2,3) as an example. 1 is the preferred leader. When preferred leadership is run, it will pick 1 as the leader if it's ISR, if 1 is not online and in ISR, then pick 2, if 2 is not in ISR, then pick 3 as the leader. There are use cases that, even 1 is in ISR, we would like it to be moved to the end of ordering (lowest priority) when deciding leadership during preferred leader election. Below is a list of use cases: * (If broker_id 1 is a swapped failed host and brought up with last segments or latest offset without historical data (There is another effort on this), it's better for it to not
[jira] [Resolved] (KAFKA-8602) StreamThread Dies Because Restore Consumer is not Subscribed to Any Topic
[ https://issues.apache.org/jira/browse/KAFKA-8602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck resolved KAFKA-8602. Resolution: Fixed > StreamThread Dies Because Restore Consumer is not Subscribed to Any Topic > - > > Key: KAFKA-8602 > URL: https://issues.apache.org/jira/browse/KAFKA-8602 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.1 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Critical > > StreamThread dies with the following exception: > {code:java} > java.lang.IllegalStateException: Consumer is not subscribed to any topics or > assigned any partitions > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1199) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) > {code} > The reason is that the restore consumer is not subscribed to any topic. This > happens when a StreamThread gets assigned standby tasks for sub-topologies > with just state stores with disabled logging. > To reproduce the bug start two applications with one StreamThread and one > standby replica each and the following topology. The input topic should have > two partitions: > {code:java} > final StreamsBuilder builder = new StreamsBuilder(); > final String stateStoreName = "myTransformState"; > final StoreBuilder> keyValueStoreBuilder = > > Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName), > Serdes.Integer(), > Serdes.Integer()) > .withLoggingDisabled(); > builder.addStateStore(keyValueStoreBuilder); > builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer())) > .transform(() -> new Transformer Integer>>() { > private KeyValueStore state; > @SuppressWarnings("unchecked") > @Override > public void init(final ProcessorContext context) { > state = (KeyValueStore) > context.getStateStore(stateStoreName); > } > @Override > public KeyValue transform(final Integer key, > final Integer value) { > final KeyValue result = new KeyValue<>(key, > value); > return result; > } > @Override > public void close() {} > }, stateStoreName) > .to(OUTPUT_TOPIC); > {code} > Both StreamThreads should die with the above exception. > The root cause is that standby tasks are created although all state stores of > the sub-topology have logging disabled. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8602) StreamThread Dies Because Restore Consumer is not Subscribed to Any Topic
[ https://issues.apache.org/jira/browse/KAFKA-8602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885595#comment-16885595 ] ASF GitHub Bot commented on KAFKA-8602: --- bbejeck commented on pull request #7008: KAFKA-8602: Fix bug in stand-by task creation URL: https://github.com/apache/kafka/pull/7008 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > StreamThread Dies Because Restore Consumer is not Subscribed to Any Topic > - > > Key: KAFKA-8602 > URL: https://issues.apache.org/jira/browse/KAFKA-8602 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.1 >Reporter: Bruno Cadonna >Assignee: Bruno Cadonna >Priority: Critical > > StreamThread dies with the following exception: > {code:java} > java.lang.IllegalStateException: Consumer is not subscribed to any topics or > assigned any partitions > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1199) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1126) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:923) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774) > {code} > The reason is that the restore consumer is not subscribed to any topic. This > happens when a StreamThread gets assigned standby tasks for sub-topologies > with just state stores with disabled logging. > To reproduce the bug start two applications with one StreamThread and one > standby replica each and the following topology. The input topic should have > two partitions: > {code:java} > final StreamsBuilder builder = new StreamsBuilder(); > final String stateStoreName = "myTransformState"; > final StoreBuilder> keyValueStoreBuilder = > > Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(stateStoreName), > Serdes.Integer(), > Serdes.Integer()) > .withLoggingDisabled(); > builder.addStateStore(keyValueStoreBuilder); > builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer())) > .transform(() -> new Transformer Integer>>() { > private KeyValueStore state; > @SuppressWarnings("unchecked") > @Override > public void init(final ProcessorContext context) { > state = (KeyValueStore) > context.getStateStore(stateStoreName); > } > @Override > public KeyValue transform(final Integer key, > final Integer value) { > final KeyValue result = new KeyValue<>(key, > value); > return result; > } > @Override > public void close() {} > }, stateStoreName) > .to(OUTPUT_TOPIC); > {code} > Both StreamThreads should die with the above exception. > The root cause is that standby tasks are created although all state stores of > the sub-topology have logging disabled. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-4212) Add a key-value store that is a TTL persistent cache
[ https://issues.apache.org/jira/browse/KAFKA-4212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885498#comment-16885498 ] James Ritt commented on KAFKA-4212: --- Thanks [~ableegoldman] & [~mjsax], you both make good points. Unfortunately the `Map` argument to `setConfig()` appears to be populated from `org.apache.kafka.streams.processor.ProcessorContext#appConfigs`, meaning it's effectively just pulling in the current `StreamsConfig`. In order to keep the configs consolidated as per Sophie's request, what do we think about defining a new method on the `RocksDBConfigSetter` interface? We could possibly create it with a default (for backwards compatibility): something like `default Integer getTtl(final String storeName, final Options options) \{return null;}`, with the idea that we'd check this method's result to determine which RocksDB constructor to call. > Add a key-value store that is a TTL persistent cache > > > Key: KAFKA-4212 > URL: https://issues.apache.org/jira/browse/KAFKA-4212 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.10.0.1 >Reporter: Elias Levy >Priority: Major > Labels: api > > Some jobs needs to maintain as state a large set of key-values for some > period of time. I.e. they need to maintain a TTL cache of values potentially > larger than memory. > Currently Kafka Streams provides non-windowed and windowed key-value stores. > Neither is an exact fit to this use case. > The {{RocksDBStore}}, a {{KeyValueStore}}, stores one value per key as > required, but does not support expiration. The TTL option of RocksDB is > explicitly not used. > The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via segment > dropping, but it stores multiple items per key, based on their timestamp. > But this store can be repurposed as a cache by fetching the items in reverse > chronological order and returning the first item found. > KAFKA-2594 introduced a fixed-capacity in-memory LRU caching store, but here > we desire a variable-capacity memory-overflowing TTL caching store. > Although {{RocksDBWindowsStore}} can be repurposed as a cache, it would be > useful to have an official and proper TTL cache API and implementation. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-7931) Java Client: if all ephemeral brokers fail, client can never reconnect to brokers
[ https://issues.apache.org/jira/browse/KAFKA-7931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885489#comment-16885489 ] Brian commented on KAFKA-7931: -- Would love to see a patch. How do you know you've solved the issue? Are you going to try and get this merged back into [https://github.com/helm/charts/tree/master/incubator/kafka] > Java Client: if all ephemeral brokers fail, client can never reconnect to > brokers > - > > Key: KAFKA-7931 > URL: https://issues.apache.org/jira/browse/KAFKA-7931 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.1.0 >Reporter: Brian >Priority: Critical > > Steps to reproduce: > * Setup kafka cluster in GKE, with bootstrap server address configured to > point to a load balancer that exposes all GKE nodes > * Run producer that emits values into a partition with 3 replicas > * Kill every broker in the cluster > * Wait for brokers to restart > Observed result: > The java client cannot find any of the nodes even though they have all > recovered. I see messages like "Connection to node 30 (/10.6.0.101:9092) > could not be established. Broker may not be available.". > Note, this is *not* a duplicate of > https://issues.apache.org/jira/browse/KAFKA-7890. I'm using the client > version that contains the fix for > https://issues.apache.org/jira/browse/KAFKA-7890. > Versions: > Kakfa: kafka version 2.1.0, using confluentinc/cp-kafka/5.1.0 docker image > Client: trunk from a few days ago (git sha > 9f7e6b291309286e3e3c1610e98d978773c9d504), to pull in the fix for KAFKA-7890 > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8670) kafka-topics.sh shows IllegalArgumentException when describing all topics if no topics exist on the cluster
[ https://issues.apache.org/jira/browse/KAFKA-8670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tirtha Chatterjee updated KAFKA-8670: - Summary: kafka-topics.sh shows IllegalArgumentException when describing all topics if no topics exist on the cluster (was: kafka-topics.sh shows IllegalArgumentException when describing all topics if no topics exist) > kafka-topics.sh shows IllegalArgumentException when describing all topics if > no topics exist on the cluster > --- > > Key: KAFKA-8670 > URL: https://issues.apache.org/jira/browse/KAFKA-8670 > Project: Kafka > Issue Type: Bug > Components: admin, tools >Affects Versions: 2.2.0, 2.3.0, 2.2.1 >Reporter: Tirtha Chatterjee >Assignee: Tirtha Chatterjee >Priority: Major > > When trying to describe all the kafka-topics.sh utility, a user would run > kafka-topics.sh --describe without passing a --topic option. If there are no > topics on the cluster, Kafka returns an error with IllegalArgumentException. > {code:java} > ./kafka-topics.sh --zookeeper > 172.16.7.230:2181,172.16.17.27:2181,172.16.10.89:2181 --describe > Error while executing topic command : Topics in [] does not exist > [2019-07-07 03:33:15,288] ERROR java.lang.IllegalArgumentException: Topics in > [] does not exist > at > kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:416) > at > kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:332) > at kafka.admin.TopicCommand$.main(TopicCommand.scala:66) > at kafka.admin.TopicCommand.main(TopicCommand.scala) > (kafka.admin.TopicCommand$) > {code} > > If no --topic option is passed to the command, and there are no topics on > the cluster, the command should not fail, rather have empty output. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8670) kafka-topics.sh shows IllegalArgumentException when describing all topics if no topics exist
[ https://issues.apache.org/jira/browse/KAFKA-8670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tirtha Chatterjee updated KAFKA-8670: - Description: When trying to describe all the kafka-topics.sh utility, a user would run kafka-topics.sh --describe without passing a --topic option. If there are no topics on the cluster, Kafka returns an error with IllegalArgumentException. {code:java} ./kafka-topics.sh --zookeeper 172.16.7.230:2181,172.16.17.27:2181,172.16.10.89:2181 --describe Error while executing topic command : Topics in [] does not exist [2019-07-07 03:33:15,288] ERROR java.lang.IllegalArgumentException: Topics in [] does not exist at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:416) at kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:332) at kafka.admin.TopicCommand$.main(TopicCommand.scala:66) at kafka.admin.TopicCommand.main(TopicCommand.scala) (kafka.admin.TopicCommand$) {code} If no --topic option is passed to the command, and there are no topics on the cluster, the command should not fail, rather have empty output. was: When trying to describe all the kafka-topics.sh utility, a user would run kafka-topics.sh --describe without passing a --topic option. If there are no topics on the cluster, Kafka returns an error with IllegalArgumentException. {code:java} ./kafka-topics.sh --zookeeper 172.16.7.230:2181,172.16.17.27:2181,172.16.10.89:2181 --describe Error while executing topic command : Topics in [] does not exist [2019-07-07 03:33:15,288] ERROR java.lang.IllegalArgumentException: Topics in [] does not exist at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:416) at kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:332) at kafka.admin.TopicCommand$.main(TopicCommand.scala:66) at kafka.admin.TopicCommand.main(TopicCommand.scala) (kafka.admin.TopicCommand$) {code} If no --topic option is passed to the command, it should not fail, rather have empty output. > kafka-topics.sh shows IllegalArgumentException when describing all topics if > no topics exist > > > Key: KAFKA-8670 > URL: https://issues.apache.org/jira/browse/KAFKA-8670 > Project: Kafka > Issue Type: Bug > Components: admin, tools >Affects Versions: 2.2.0, 2.3.0, 2.2.1 >Reporter: Tirtha Chatterjee >Assignee: Tirtha Chatterjee >Priority: Major > > When trying to describe all the kafka-topics.sh utility, a user would run > kafka-topics.sh --describe without passing a --topic option. If there are no > topics on the cluster, Kafka returns an error with IllegalArgumentException. > {code:java} > ./kafka-topics.sh --zookeeper > 172.16.7.230:2181,172.16.17.27:2181,172.16.10.89:2181 --describe > Error while executing topic command : Topics in [] does not exist > [2019-07-07 03:33:15,288] ERROR java.lang.IllegalArgumentException: Topics in > [] does not exist > at > kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:416) > at > kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:332) > at kafka.admin.TopicCommand$.main(TopicCommand.scala:66) > at kafka.admin.TopicCommand.main(TopicCommand.scala) > (kafka.admin.TopicCommand$) > {code} > > If no --topic option is passed to the command, and there are no topics on > the cluster, the command should not fail, rather have empty output. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8670) kafka-topics.sh shows IllegalArgumentException when describing all topics if no topics exist
[ https://issues.apache.org/jira/browse/KAFKA-8670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885476#comment-16885476 ] Tirtha Chatterjee commented on KAFKA-8670: -- This bug was introduced as part of this commit [https://github.com/apache/kafka/commit/55334453a561646b303e7de961e5990345effc15] I have written a fix for this issue. Will be publishing it soon. > kafka-topics.sh shows IllegalArgumentException when describing all topics if > no topics exist > > > Key: KAFKA-8670 > URL: https://issues.apache.org/jira/browse/KAFKA-8670 > Project: Kafka > Issue Type: Bug > Components: admin, tools >Affects Versions: 2.2.0, 2.3.0, 2.2.1 >Reporter: Tirtha Chatterjee >Assignee: Tirtha Chatterjee >Priority: Major > > When trying to describe all the kafka-topics.sh utility, a user would run > kafka-topics.sh --describe without passing a --topic option. If there are no > topics on the cluster, Kafka returns an error with IllegalArgumentException. > {code:java} > ./kafka-topics.sh --zookeeper > 172.16.7.230:2181,172.16.17.27:2181,172.16.10.89:2181 --describe > Error while executing topic command : Topics in [] does not exist > [2019-07-07 03:33:15,288] ERROR java.lang.IllegalArgumentException: Topics in > [] does not exist > at > kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:416) > at > kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:332) > at kafka.admin.TopicCommand$.main(TopicCommand.scala:66) > at kafka.admin.TopicCommand.main(TopicCommand.scala) > (kafka.admin.TopicCommand$) > {code} > > If no --topic option is passed to the command, it should not fail, rather > have empty output. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8670) kafka-topics.sh shows IllegalArgumentException when describing all topics if no topics exist
[ https://issues.apache.org/jira/browse/KAFKA-8670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885474#comment-16885474 ] Tirtha Chatterjee commented on KAFKA-8670: -- This issue is distinct from https://issues.apache.org/jira/browse/KAFKA-8053 which is about Kafka's error message being confusing when --describe is called with a --topic argument, but there are no topics on the cluster. The distinction here is that this ticket addresses the case where no --topic is passed, while the other focuses on a better error message when --topic is indeed passed, but there are no topics on the cluster. > kafka-topics.sh shows IllegalArgumentException when describing all topics if > no topics exist > > > Key: KAFKA-8670 > URL: https://issues.apache.org/jira/browse/KAFKA-8670 > Project: Kafka > Issue Type: Bug > Components: admin, tools >Affects Versions: 2.2.0, 2.3.0, 2.2.1 >Reporter: Tirtha Chatterjee >Assignee: Tirtha Chatterjee >Priority: Major > > When trying to describe all the kafka-topics.sh utility, a user would run > kafka-topics.sh --describe without passing a --topic option. If there are no > topics on the cluster, Kafka returns an error with IllegalArgumentException. > {code:java} > ./kafka-topics.sh --zookeeper > 172.16.7.230:2181,172.16.17.27:2181,172.16.10.89:2181 --describe > Error while executing topic command : Topics in [] does not exist > [2019-07-07 03:33:15,288] ERROR java.lang.IllegalArgumentException: Topics in > [] does not exist > at > kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:416) > at > kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:332) > at kafka.admin.TopicCommand$.main(TopicCommand.scala:66) > at kafka.admin.TopicCommand.main(TopicCommand.scala) > (kafka.admin.TopicCommand$) > {code} > > If no --topic option is passed to the command, it should not fail, rather > have empty output. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8670) kafka-topics.sh shows IllegalArgumentException when describing all topics if no topics exist
Tirtha Chatterjee created KAFKA-8670: Summary: kafka-topics.sh shows IllegalArgumentException when describing all topics if no topics exist Key: KAFKA-8670 URL: https://issues.apache.org/jira/browse/KAFKA-8670 Project: Kafka Issue Type: Bug Components: admin, tools Affects Versions: 2.2.1, 2.3.0, 2.2.0 Reporter: Tirtha Chatterjee Assignee: Tirtha Chatterjee When trying to describe all the kafka-topics.sh utility, a user would run kafka-topics.sh --describe without passing a --topic option. If there are no topics on the cluster, Kafka returns an error with IllegalArgumentException. {code:java} ./kafka-topics.sh --zookeeper 172.16.7.230:2181,172.16.17.27:2181,172.16.10.89:2181 --describe Error while executing topic command : Topics in [] does not exist [2019-07-07 03:33:15,288] ERROR java.lang.IllegalArgumentException: Topics in [] does not exist at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:416) at kafka.admin.TopicCommand$ZookeeperTopicService.describeTopic(TopicCommand.scala:332) at kafka.admin.TopicCommand$.main(TopicCommand.scala:66) at kafka.admin.TopicCommand.main(TopicCommand.scala) (kafka.admin.TopicCommand$) {code} If no --topic option is passed to the command, it should not fail, rather have empty output. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8669) Add java security providers in Kafka Security config
[ https://issues.apache.org/jira/browse/KAFKA-8669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sai Sandeep updated KAFKA-8669: --- Description: Currently kafka supports ssl.keymanager.algorithm and ssl.trustmanager.algorithm parameters as part of secure config. These parameters can be configured to load the key manager and trust managers which provide keys and certificates for ssl handshakes with the clients/server. The algorithms configured by parameters need to be registered by Java security provider classes. These provider classes are configured as JVM properties through java.security file. An example file given below {code:java} $ cat /usr/lib/jvm/jdk-8-oracle-x64/jre/lib/security/java.security ... security.provider.1=sun.security.provider.Sun security.provider.2=sun.security.rsa.SunRsaSign security.provider.3=sun.security.ec.SunEC … {code} Custom keymanager and trustmanager algorithms can be used to supply the kafka brokers with keys and certificates, these algorithms can be used to replace the traditional, non-scalable static keystore and truststore jks files. To take advantage of these custom algorithms, we want to support java security provider parameter in security config. This param can be used by kafka brokers or kafka clients(when connecting to the kafka brokers). The security providers can also be used for configuring security in SASL based communication too. was: Currently kafka supports ssl.keymanager.algorithm and ssl.trustmanager.algorithm parameters as part of secure config. These parameters can be configured to load the key manager and trust managers which provide keys and certificates for ssl handshakes with the clients/server. The algorithms configured by parameters need to be registered by Java security provider classes. These provider classes are configured as JVM properties through java.security file. An example file given below ``` $ cat /usr/lib/jvm/jdk-8-oracle-x64/jre/lib/security/java.security ... security.provider.1=sun.security.provider.Sun security.provider.2=sun.security.rsa.SunRsaSign security.provider.3=sun.security.ec.SunEC … ``` Custom keymanager and trustmanager algorithms can be used to supply the kafka brokers with keys and certificates, these algorithms can be used to replace the traditional, non-scalable static keystore and truststore jks files. To take advantage of these custom algorithms, we want to support java security provider parameter in security config. This param can be used by kafka brokers or kafka clients(when connecting to the kafka brokers). The security providers can also be used for configuring security in SASL based communication too. > Add java security providers in Kafka Security config > > > Key: KAFKA-8669 > URL: https://issues.apache.org/jira/browse/KAFKA-8669 > Project: Kafka > Issue Type: Improvement >Reporter: Sai Sandeep >Priority: Minor > > Currently kafka supports ssl.keymanager.algorithm and > ssl.trustmanager.algorithm parameters as part of secure config. These > parameters can be configured to load the key manager and trust managers which > provide keys and certificates for ssl handshakes with the clients/server. The > algorithms configured by parameters need to be registered by Java security > provider classes. These provider classes are configured as JVM properties > through java.security file. An example file given below > {code:java} > $ cat /usr/lib/jvm/jdk-8-oracle-x64/jre/lib/security/java.security > ... > security.provider.1=sun.security.provider.Sun > security.provider.2=sun.security.rsa.SunRsaSign > security.provider.3=sun.security.ec.SunEC > … > {code} > Custom keymanager and trustmanager algorithms can be used to supply the kafka > brokers with keys and certificates, these algorithms can be used to replace > the traditional, non-scalable static keystore and truststore jks files. > To take advantage of these custom algorithms, we want to support java > security provider parameter in security config. This param can be used by > kafka brokers or kafka clients(when connecting to the kafka brokers). The > security providers can also be used for configuring security in SASL based > communication too. > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8669) Add java security providers in Kafka Security config
Sai Sandeep created KAFKA-8669: -- Summary: Add java security providers in Kafka Security config Key: KAFKA-8669 URL: https://issues.apache.org/jira/browse/KAFKA-8669 Project: Kafka Issue Type: Improvement Reporter: Sai Sandeep Currently kafka supports ssl.keymanager.algorithm and ssl.trustmanager.algorithm parameters as part of secure config. These parameters can be configured to load the key manager and trust managers which provide keys and certificates for ssl handshakes with the clients/server. The algorithms configured by parameters need to be registered by Java security provider classes. These provider classes are configured as JVM properties through java.security file. An example file given below ``` $ cat /usr/lib/jvm/jdk-8-oracle-x64/jre/lib/security/java.security ... security.provider.1=sun.security.provider.Sun security.provider.2=sun.security.rsa.SunRsaSign security.provider.3=sun.security.ec.SunEC … ``` Custom keymanager and trustmanager algorithms can be used to supply the kafka brokers with keys and certificates, these algorithms can be used to replace the traditional, non-scalable static keystore and truststore jks files. To take advantage of these custom algorithms, we want to support java security provider parameter in security config. This param can be used by kafka brokers or kafka clients(when connecting to the kafka brokers). The security providers can also be used for configuring security in SASL based communication too. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8613) Set default grace period to 0
[ https://issues.apache.org/jira/browse/KAFKA-8613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885438#comment-16885438 ] Matthias J. Sax commented on KAFKA-8613: [~LillianY] – thanks for you interest. However, this ticket can only be addressed in a major release, as we need to maintain backward compatibility in minor releases. There are no plans atm, to do a major release. Hence, this ticket is blocked until the community decided to do a 3.0 release. > Set default grace period to 0 > - > > Key: KAFKA-8613 > URL: https://issues.apache.org/jira/browse/KAFKA-8613 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 3.0.0 >Reporter: Bruno Cadonna >Priority: Blocker > > Currently, the grace period is set to retention time if the grace period is > not specified explicitly. The reason for setting the default grace period to > retention time was backward compatibility. Topologies that were implemented > before the introduction of the grace period, added late arriving records to a > window as long as the window existed, i.e., as long as its retention time was > not elapsed. > This unintuitive default grace period has already caused confusion among > users. > For the next major release, we should set the default grace period to > {{Duration.ZERO}}. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8668) Improve broker shutdown time
Zhanxiang (Patrick) Huang created KAFKA-8668: Summary: Improve broker shutdown time Key: KAFKA-8668 URL: https://issues.apache.org/jira/browse/KAFKA-8668 Project: Kafka Issue Type: Improvement Reporter: Zhanxiang (Patrick) Huang Assignee: Zhanxiang (Patrick) Huang During LogManager shutdown, we need to call {{timeIndex.maybeAppend()}} on each log segment close. Since in 2.0 we completely skip sanity check and do lazy mmap on indexes of segments below recovery point, in {{timeIndex.maybeAppend()}} we may need to mmap the time index file for the inactive segment if it didn't get mmap before, which is not neccessary becuase the time index hasn't changed at all. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8667) Improve leadership transition time
Zhanxiang (Patrick) Huang created KAFKA-8667: Summary: Improve leadership transition time Key: KAFKA-8667 URL: https://issues.apache.org/jira/browse/KAFKA-8667 Project: Kafka Issue Type: Improvement Reporter: Zhanxiang (Patrick) Huang Assignee: Zhanxiang (Patrick) Huang When the replica fetcher thread processes fetch response, it will hold the {{partitionMapLock}}. If at the same time, a LeaderAndIsr request comes in, it will be blocked at the end of its processing when calling {{shutdownIdleFetcherThread}} because it will need to wait for the {{partitionMapLock}} of each replica fetcher thread to be acquired to check whether there is any partition assigned to each fetcher and the request handler thread performs this check sequentially for the fetcher threads For example, in a cluster with 20 brokers and num.replica.fetcher.thread set to 32, if each fetcher thread holds lock for a little bit longer, the total time for the request handler thread to finish shutdownIdleFetcherThread can be a lot larger due to waiting for the partitionMapLock for a longer time for each fetcher thread. If the LeaderAndIsr gets blocked for >request.timeout.ms (default to 30s) in the broker, request send thread in the controller side will timeout while waiting for the response and try to establish a new connection to the broker and re-send the request, which will break in-order delivery because we will have more than one channel talking to the broker. Moreover, this may make the lock contention problem worse or saturate request handler threads because duplicate control requests are sent to the broker for multiple time. In our own testing, we saw up to *8 duplicate LeaderAndIsrRequest* being sent to the broker during bounce and the 99th LeaderAndIsr local time goes up to ~500s. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8650) Streams does not work as expected with auto.offset.reset=none
[ https://issues.apache.org/jira/browse/KAFKA-8650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885435#comment-16885435 ] Matthias J. Sax commented on KAFKA-8650: It's a first time report / request. Hence, it does not seem to be a problem for many users atm. Hard to judge. – Also, there is a workaround (even if not easy to apply). > Streams does not work as expected with auto.offset.reset=none > - > > Key: KAFKA-8650 > URL: https://issues.apache.org/jira/browse/KAFKA-8650 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.3.0 >Reporter: Raman Gupta >Priority: Major > Labels: needs-kip > > The auto.offset.reset policy of none is useful as a safety measure, > especially when > * exactly-once processing is desired, or > * at-least-once is desired, but it is expensive to reprocess from the > beginning. > In this case, using "none" forces the ops team to explicitly set the offset > before the stream can re-start processing, in the (hopefully rare) situations > in which the stream consumer offset has been lost for some reason, or in the > case of a new stream that should not start processing from the beginning or > the end, but somewhere in the middle (this scenario might occur during topic > migrations). > Kafka streams really only supports auto.offset.reset of earliest or latest > (see the `Topology.AutoOffsetReset` enum). It is also possible to use the > auto.offset.reset configuration value, but this works suboptimally because if > the streams application reset tool is used (even with a specific offset > specified), the offset is set for the input topic, but it is not, and cannot > be, set for the internal topics, which won't exist yet. > The internal topics are created by Kafka streams at startup time, but because > the auto.offset.reset policy of "none" is passed to the consumer of those > internal topics, the Kafka stream fails to start with a > "NoOffsetForPartitionException". > Proposals / options: > 1) Allow auto.offset.reset=none to be specified in Consumed.with() so that it > affects the input topics, but not the internal topics. > 2) Allow streams to be configured with auto.offset.reset=none, but explicitly > set the offset to 0 for newly created internal topics. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8666) Improve Documentation on usage of Materialized config object
Bill Bejeck created KAFKA-8666: -- Summary: Improve Documentation on usage of Materialized config object Key: KAFKA-8666 URL: https://issues.apache.org/jira/browse/KAFKA-8666 Project: Kafka Issue Type: Improvement Components: documentation, streams Reporter: Bill Bejeck When using the Materialized object if the user wants to name the statestore with {code:java} Materialized.as("MyStoreName"){code} then subsequently provide the key and value serde the calls to do so must take the form of {code:java} Materialized.as("MyStoreName").withKeySerde(keySerde).withValueSerde(valueSerde) {code} If users do the following {code:java} Materialized.as("MyStoreName").with(keySerde, valueSerde) {code} the Materialized instance created by the "as(storeName)" call is replaced by a new Materialized instance resulting from the "with(...)" call and any configuration on the first Materialized instance is lost. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8665) WorkerSourceTask race condition when rebalance occurs before task has started
Chris Cranford created KAFKA-8665: - Summary: WorkerSourceTask race condition when rebalance occurs before task has started Key: KAFKA-8665 URL: https://issues.apache.org/jira/browse/KAFKA-8665 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.2.0 Reporter: Chris Cranford In our project we have several {{SourceTask}} implementations that perform a set of sequential steps in the {{SourceTask#start}} call. It's possible that during these sequential operations, a rebalance occurs which leads to the situation where the {{WorkerSourceTask}} transitions to the state where {{startedShutdownBeforeStartCompleted=true}} and then the runtime starts a brand new task. This is mostly a problem around specific named resources that are registered when a call to {{SourceTask#start}} happens and those same resources are unregistered later when the call to {{SourceTask#stop}} occurs. For us specifically, this is a problem with JMX resource registration/unregistration. We register those beans at the end of the call to {{SourceTask#start}} and unregister in the call to {{stop}}. Due to the order of start/stop pairs combined with where a rebalance is triggered, this leads to # Register JMX beans when SourceTask A1 is started. # Register JMX beans when SourceTask A2 is started with rebalance. ## JMX beans failed to register as they're already registered. # SourceTask A1 finally stops, triggers unregistering JMX beans In our use case we're experiencing a problem with the registration/unregistration of JMX resources with the nature of how a rebalance is triggered while the task hasn't yet fully started and never gets stopped -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8646) Materialized.withLoggingDisabled() does not disable changelog topics creation
[ https://issues.apache.org/jira/browse/KAFKA-8646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885391#comment-16885391 ] Bill Bejeck commented on KAFKA-8646: Thanks again [~jmhostalet] for reporting this. After looking into this, I've been able to determine that Kafka Streams does the right thing and does not create changelog topics when using Materialized.withLoggingDisabled(). The issue seems to be how you are passing your key and value serdes to the materialized object. This issue is a subtle one, so, please bear with a somewhat lengthy explanation. The call "Materialized.as(aggregation-updater)" returns a _*new Materialized instance*_, using the provided name for the state store (which in turn would be the base name for the changelog topic if needed, but I'll get to that in a minute). Now the chained call "withLoggingDisabled()" sets "loggingEnabled" flag to false and returns _*the same Materialized instance*_ created before by the call "as(store-name)". But the method call "Materialized.with(keySerde, valueSerde)" is a static method on Materialized and returns a _*brand new Materialized instance,*_ and this new Materialize instance is the one passed into Kafka Streams. Since the "loggingEnabled" flag defaults to true, Kafka Streams is creating the changelog topic. Here's some example code I ran to prove my theory. First, I'll change from using the static "with(keySerde, valueSerde)" method to use "withKeySerde" followed by the "withValueSerde" method. {code:java} KStream intKstream = builder.stream("input"); Aggregator aggregator = (k, v, va) -> v + va; intKstream.groupByKey().aggregate(() -> 0, aggregator, Materialized.>as("aggregation-updater") .withLoggingDisabled().withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer())).toStream().to("output"); {code} And here's the results of running "kafka-topics --list" after starting the streams application . {noformat} /kafka-topics.sh --zookeeper localhost:2181 --list __consumer_offsets input output{noformat} Now I'll change the streams application to use the static "with(keySerde, valueSerde)" method. {code:java} intKstream.groupByKey().aggregate(() -> 0, aggregator, Materialized.>as("aggregation-updater") .withLoggingDisabled().with(Serdes.String(), Serdes.Integer())).toStream().to("output"); {code} Now this time we'll see Kafka Streams has created the changelog topic: {noformat} ./kafka-topics.sh --zookeeper localhost:2181 --list __consumer_offsets create-unnecessary-changelog-internal-KSTREAM-AGGREGATE-STATE-STORE-01-changelog input output{noformat} Also, you'll notice the name of the changelog topic is a generated one but should be "aggregation-updater-changelog", but since the new Materialized instance is used the store name is lost as well. So if you could first delete any unwanted changelog topics then update your code to use {code:java} Materialized.as("aggregation-updater") .withLoggingDisabled() .withKeySerde(Serdes.String) .withValueSerde(new MyDtoSerde())){code} restart your application and confirm that Kafka Streams is not creating changelog topics; we can close this ticket. But we should update our documentation to clearly state the correct method calls to use when naming a state store. Thanks again, Bill > Materialized.withLoggingDisabled() does not disable changelog topics creation > - > > Key: KAFKA-8646 > URL: https://issues.apache.org/jira/browse/KAFKA-8646 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0 >Reporter: jmhostalet >Assignee: Bill Bejeck >Priority: Minor > > I have a cluster with 3 brokers running version 0.11 > My kafka-streams app was using kafka-client 0.11.0.1 but recently I've > migrated to 2.3.0 > I have no executed any migration as my data is disposable, therefore I have > deleted all intermediate topics, except input and output topics. > My streams config is: > {code:java} > application.id = consumer-id-v1.00 > application.server = > bootstrap.servers = [foo1:9092, foo2:9092, foo3:9092] > buffered.records.per.partition = 1000 > cache.max.bytes.buffering = 524288000 > client.id = > commit.interval.ms = 3 > connections.max.idle.ms = 54 > default.deserialization.exception.handler = class > org.apache.kafka.streams.errors.LogAndFailExceptionHandler > default.key.serde = class > org.apache.kafka.common.serialization.Serdes$StringSerde > default.production.exception.handler = class > org.apache.kafka.streams.errors.DefaultProductionExceptionHandler > default.timestamp.extractor = class com.acme.stream.TimeExtractor > default.value.serde = class com.acme.serde.MyDtoSerde > max.task.idle.ms = 0 > metadata.max.age.ms = 30 > metric.reporters
[jira] [Commented] (KAFKA-8604) kafka log dir was marked as offline because of deleting segments of __consumer_offsets failed
[ https://issues.apache.org/jira/browse/KAFKA-8604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885316#comment-16885316 ] Alex Bertzeletos commented on KAFKA-8604: - [~ymxz] So, in your case, setting the log.retention.check.interval.ms=3000 helped as a workaround? > kafka log dir was marked as offline because of deleting segments of > __consumer_offsets failed > - > > Key: KAFKA-8604 > URL: https://issues.apache.org/jira/browse/KAFKA-8604 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Affects Versions: 1.0.1 >Reporter: songyingshuan >Priority: Major > Attachments: error-logs.log > > > We encountered a problem in our product env without any foresight. When kafka > broker trying to clean __consumer_offsets-38 (and only happents to this > partition), the log shows > it failed, and marking the whole disk/log dir offline, and this leads to a > negative impact on some normal partitions (because of the ISR list of those > partitions decrease). > we had to restart the broker server to reuse the disk/dir which was marked as > offline. BUT!! this problem occurs periodically with the same reason so we > have to restart broker periodically. > we read some source code of kafka-1.0.1, but cannot make sure why this > happends. And The cluster status had been good until this problem suddenly > attacked us. > the error log is something like this : > > {code:java} > 2019-06-25 00:11:26,241 INFO kafka.log.TimeIndex: Deleting index > /data6/kafka/data/__consumer_offsets-38/012855596978.timeindex.deleted > 2019-06-25 00:11:26,258 ERROR kafka.server.LogDirFailureChannel: Error while > deleting segments for __consumer_offsets-38 in dir /data6/kafka/data > java.io.IOException: Delete of log .log.deleted failed. > at kafka.log.LogSegment.delete(LogSegment.scala:496) > at > kafka.log.Log$$anonfun$kafka$log$Log$$deleteSeg$1$1.apply$mcV$sp(Log.scala:1596) > at kafka.log.Log$$anonfun$kafka$log$Log$$deleteSeg$1$1.apply(Log.scala:1596) > at kafka.log.Log$$anonfun$kafka$log$Log$$deleteSeg$1$1.apply(Log.scala:1596) > at kafka.log.Log.maybeHandleIOException(Log.scala:1669) > at kafka.log.Log.kafka$log$Log$$deleteSeg$1(Log.scala:1595) > at > kafka.log.Log$$anonfun$kafka$log$Log$$asyncDeleteSegment$1.apply$mcV$sp(Log.scala:1599) > at > kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 2019-06-25 00:11:26,265 ERROR kafka.utils.KafkaScheduler: Uncaught exception > in scheduled task 'delete-file' > org.apache.kafka.common.errors.KafkaStorageException: Error while deleting > segments for __consumer_offsets-38 in dir /data6/kafka/data > Caused by: java.io.IOException: Delete of log > .log.deleted failed. > at kafka.log.LogSegment.delete(LogSegment.scala:496) > at > kafka.log.Log$$anonfun$kafka$log$Log$$deleteSeg$1$1.apply$mcV$sp(Log.scala:1596) > at kafka.log.Log$$anonfun$kafka$log$Log$$deleteSeg$1$1.apply(Log.scala:1596) > at kafka.log.Log$$anonfun$kafka$log$Log$$deleteSeg$1$1.apply(Log.scala:1596) > at kafka.log.Log.maybeHandleIOException(Log.scala:1669) > at kafka.log.Log.kafka$log$Log$$deleteSeg$1(Log.scala:1595) > at > kafka.log.Log$$anonfun$kafka$log$Log$$asyncDeleteSegment$1.apply$mcV$sp(Log.scala:1599) > at > kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 2019-06-25 00:11:26,268 INFO kafka.server.ReplicaManager: [ReplicaManager >
[jira] [Assigned] (KAFKA-8453) AdminClient describeTopic should handle partition level errors
[ https://issues.apache.org/jira/browse/KAFKA-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lee Dongjin reassigned KAFKA-8453: -- Assignee: Lee Dongjin > AdminClient describeTopic should handle partition level errors > -- > > Key: KAFKA-8453 > URL: https://issues.apache.org/jira/browse/KAFKA-8453 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Lee Dongjin >Priority: Major > > The Metadata response may contain the following partition-level error codes: > LEADER_NOT_AVAILABLE, REPLICA_NOT_AVAILABLE, and LISTENER_NOT_FOUND. The > AdminClient at the moment does not appear to be checking these error codes. > This seems mostly harmless in the case of LEADER_NOT_AVAILABLE and > REPLICA_NOT_AVAILABLE, but potentially we should raise an error in the case > of LISTENER_NOT_FOUND since the result would otherwise be misleading. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8453) AdminClient describeTopic should handle partition level errors
[ https://issues.apache.org/jira/browse/KAFKA-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885289#comment-16885289 ] ASF GitHub Bot commented on KAFKA-8453: --- dongjinleekr commented on pull request #7089: KAFKA-8453: AdminClient describeTopic should handle partition level errors URL: https://github.com/apache/kafka/pull/7089 Here is the draft fix. The approach is simple: 1. Add a new method, `MetadataResponse#partitionErrors`, which returns a map from topic to the set of its partition errors. (It follows how `MetadataResponse#topicMetadata` does.) 2. `Call#handleResponse` in `KafkaAdminClient#describeTopics` now uses both of topic level error map and partition level error map; if there is `ListenerNotFound` error in partition level error map, it calls `KafkaFuture#completeExceptionally`. However, since it now throws a new Exception, it brokes following tests which are related to the leader election. - `kafka.server.DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable` - `kafka.api.AdminClientIntegrationTest.testElectUncleanLeadersAndNoop` - `kafka.api.AdminClientIntegrationTest.testElectUncleanLeadersForManyPartitions` - `kafka.api.AdminClientIntegrationTest.testElectUncleanLeadersForAllPartitions` - `kafka.api.AdminClientIntegrationTest.testElectUncleanLeadersForOnePartition` - `kafka.api.AdminClientIntegrationTest.testElectUncleanLeadersWhenNoLiveBrokers` - `kafka.api.SaslSslAdminClientIntegrationTest.testElectUncleanLeadersAndNoop` - `kafka.api.SaslSslAdminClientIntegrationTest.testElectUncleanLeadersForManyPartitions` - `kafka.api.SaslSslAdminClientIntegrationTest.testElectUncleanLeadersForAllPartitions` - `kafka.api.SaslSslAdminClientIntegrationTest.testElectUncleanLeadersForOnePartition` - `kafka.api.SaslSslAdminClientIntegrationTest.testElectUncleanLeadersWhenNoLiveBrokers` - `kafka.admin.TopicCommandWithAdminClientTest.testDescribeUnavailablePartitions` - `kafka.admin.TopicCommandWithAdminClientTest.testDescribeUnderMinIsrPartitionsMixed` Which approach would be appropriate to fix the problem above? Do you have something in mind? cc/ @hachikuji ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > AdminClient describeTopic should handle partition level errors > -- > > Key: KAFKA-8453 > URL: https://issues.apache.org/jira/browse/KAFKA-8453 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Major > > The Metadata response may contain the following partition-level error codes: > LEADER_NOT_AVAILABLE, REPLICA_NOT_AVAILABLE, and LISTENER_NOT_FOUND. The > AdminClient at the moment does not appear to be checking these error codes. > This seems mostly harmless in the case of LEADER_NOT_AVAILABLE and > REPLICA_NOT_AVAILABLE, but potentially we should raise an error in the case > of LISTENER_NOT_FOUND since the result would otherwise be misleading. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8024) UtilsTest.testFormatBytes fails with german locale
[ https://issues.apache.org/jira/browse/KAFKA-8024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885284#comment-16885284 ] Patrik Kleindl commented on KAFKA-8024: --- [~sujayopensource] You need to ask for permissions on the mailing list. I have assigned this to myself now as a PR is already available. > UtilsTest.testFormatBytes fails with german locale > -- > > Key: KAFKA-8024 > URL: https://issues.apache.org/jira/browse/KAFKA-8024 > Project: Kafka > Issue Type: Bug >Reporter: Patrik Kleindl >Assignee: Patrik Kleindl >Priority: Trivial > > The unit test fails when the default locale is not English (in my case, deAT) > assertEquals("1.1 MB", formatBytes((long) (1.1 * 1024 * 1024))); > > org.apache.kafka.common.utils.UtilsTest > testFormatBytes FAILED > org.junit.ComparisonFailure: expected:<1[.]1 MB> but was:<1[,]1 MB> > at org.junit.Assert.assertEquals(Assert.java:115) > at org.junit.Assert.assertEquals(Assert.java:144) > at > org.apache.kafka.common.utils.UtilsTest.testFormatBytes(UtilsTest.java:106) > > The easiest fix in this case should be adding > {code:java} > jvmArgs '-Duser.language=en -Duser.country=US'{code} > to the test configuration > [https://github.com/apache/kafka/blob/b03e8c234a8aeecd10c2c96b683cfb39b24b548a/build.gradle#L270] > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Assigned] (KAFKA-8024) UtilsTest.testFormatBytes fails with german locale
[ https://issues.apache.org/jira/browse/KAFKA-8024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Patrik Kleindl reassigned KAFKA-8024: - Assignee: Patrik Kleindl > UtilsTest.testFormatBytes fails with german locale > -- > > Key: KAFKA-8024 > URL: https://issues.apache.org/jira/browse/KAFKA-8024 > Project: Kafka > Issue Type: Bug >Reporter: Patrik Kleindl >Assignee: Patrik Kleindl >Priority: Trivial > > The unit test fails when the default locale is not English (in my case, deAT) > assertEquals("1.1 MB", formatBytes((long) (1.1 * 1024 * 1024))); > > org.apache.kafka.common.utils.UtilsTest > testFormatBytes FAILED > org.junit.ComparisonFailure: expected:<1[.]1 MB> but was:<1[,]1 MB> > at org.junit.Assert.assertEquals(Assert.java:115) > at org.junit.Assert.assertEquals(Assert.java:144) > at > org.apache.kafka.common.utils.UtilsTest.testFormatBytes(UtilsTest.java:106) > > The easiest fix in this case should be adding > {code:java} > jvmArgs '-Duser.language=en -Duser.country=US'{code} > to the test configuration > [https://github.com/apache/kafka/blob/b03e8c234a8aeecd10c2c96b683cfb39b24b548a/build.gradle#L270] > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (KAFKA-7931) Java Client: if all ephemeral brokers fail, client can never reconnect to brokers
[ https://issues.apache.org/jira/browse/KAFKA-7931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885113#comment-16885113 ] Sam Weston edited comment on KAFKA-7931 at 7/15/19 11:40 AM: - Good news! I've got to the bottom of it! The fix is to use a DNS name as the advertised listener instead of the Pod IP address (in my case the Kubernetes headless service). Now I can restart containers as quickly as I like and my Java apps don't get upset. e.g. KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://pulseplatform-dev-kafka-0.pulseplatform-dev-kafka-headless.pulseplatform-dev:9092 where the headless service is called pulseplatform-dev-kafka-headless, my namespace is pulseplatform-dev and the pod is called pulseplatform-dev-kafka-0 If you're using the incubator helm chart let me know and I'll provide more details of my values file. was (Author: cablespaghetti): Good news! I've got to the bottom of it! The fix is to use a DNS name as the advertised listener instead of the Pod IP address (in my case the Kubernetes headless service). Now I can restart containers as quickly as I like and my Java apps don't get upset. e.g. KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://pulseplatform-dev-kafka-0.pulseplatform-dev-kafka-headless.pulseplatform-dev:9092 where the headless service is called pulseplatform-dev-kafka-headless, my namespace is pulseplatform-dev and the pod is called pulseplatform-dev-kafka-0 > Java Client: if all ephemeral brokers fail, client can never reconnect to > brokers > - > > Key: KAFKA-7931 > URL: https://issues.apache.org/jira/browse/KAFKA-7931 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.1.0 >Reporter: Brian >Priority: Critical > > Steps to reproduce: > * Setup kafka cluster in GKE, with bootstrap server address configured to > point to a load balancer that exposes all GKE nodes > * Run producer that emits values into a partition with 3 replicas > * Kill every broker in the cluster > * Wait for brokers to restart > Observed result: > The java client cannot find any of the nodes even though they have all > recovered. I see messages like "Connection to node 30 (/10.6.0.101:9092) > could not be established. Broker may not be available.". > Note, this is *not* a duplicate of > https://issues.apache.org/jira/browse/KAFKA-7890. I'm using the client > version that contains the fix for > https://issues.apache.org/jira/browse/KAFKA-7890. > Versions: > Kakfa: kafka version 2.1.0, using confluentinc/cp-kafka/5.1.0 docker image > Client: trunk from a few days ago (git sha > 9f7e6b291309286e3e3c1610e98d978773c9d504), to pull in the fix for KAFKA-7890 > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-7931) Java Client: if all ephemeral brokers fail, client can never reconnect to brokers
[ https://issues.apache.org/jira/browse/KAFKA-7931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16885113#comment-16885113 ] Sam Weston commented on KAFKA-7931: --- Good news! I've got to the bottom of it! The fix is to use a DNS name as the advertised listener instead of the Pod IP address (in my case the Kubernetes headless service). Now I can restart containers as quickly as I like and my Java apps don't get upset. e.g. KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://pulseplatform-dev-kafka-0.pulseplatform-dev-kafka-headless.pulseplatform-dev:9092 where the headless service is called pulseplatform-dev-kafka-headless, my namespace is pulseplatform-dev and the pod is called pulseplatform-dev-kafka-0 > Java Client: if all ephemeral brokers fail, client can never reconnect to > brokers > - > > Key: KAFKA-7931 > URL: https://issues.apache.org/jira/browse/KAFKA-7931 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.1.0 >Reporter: Brian >Priority: Critical > > Steps to reproduce: > * Setup kafka cluster in GKE, with bootstrap server address configured to > point to a load balancer that exposes all GKE nodes > * Run producer that emits values into a partition with 3 replicas > * Kill every broker in the cluster > * Wait for brokers to restart > Observed result: > The java client cannot find any of the nodes even though they have all > recovered. I see messages like "Connection to node 30 (/10.6.0.101:9092) > could not be established. Broker may not be available.". > Note, this is *not* a duplicate of > https://issues.apache.org/jira/browse/KAFKA-7890. I'm using the client > version that contains the fix for > https://issues.apache.org/jira/browse/KAFKA-7890. > Versions: > Kakfa: kafka version 2.1.0, using confluentinc/cp-kafka/5.1.0 docker image > Client: trunk from a few days ago (git sha > 9f7e6b291309286e3e3c1610e98d978773c9d504), to pull in the fix for KAFKA-7890 > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8658) A way to configure the jmx rmi port
[ https://issues.apache.org/jira/browse/KAFKA-8658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16884970#comment-16884970 ] Agostino Sarubbo commented on KAFKA-8658: - Hello Richard, thanks for the fast response. It LGTM. > A way to configure the jmx rmi port > --- > > Key: KAFKA-8658 > URL: https://issues.apache.org/jira/browse/KAFKA-8658 > Project: Kafka > Issue Type: Improvement > Components: metrics >Affects Versions: 1.0.0 > Environment: Centos 7 >Reporter: Agostino Sarubbo >Priority: Minor > > Hello, > I'm on kafka-1.0.0 so I'm not sure if it is fixed in the current version. > Atm we are using the following in the service script to use JMX: > Environment=JMX_PORT=7666 > However there is no way to set the jmx_rmi_port. When there is no > specification for jmx_rmi_port the jvm assigns a random port. This > complicates the way we manage the firewall. > Would be great if there is a way to set the jmx_rmi_port in the same way, > e.g.: > Environment=JMX_RMI_PORT=7667 > The variable used during the jvm start is: > -Dcom.sun.management.jmxremote.rmi.port= -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8664) non-JSON format messages when streaming data from Kafka to Mongo
[ https://issues.apache.org/jira/browse/KAFKA-8664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vu Le updated KAFKA-8664: - Description: Hi team, I can stream data from Kafka to MongoDB with JSON messages. I use MongoDB Kafka Connector ([https://github.com/mongodb/mongo-kafka/blob/master/docs/install.md]) However, if I send a non-JSON format message the Connector died. Please see the log file for details. My config file: {code:java} name=mongo-sink topics=testconnector.class=com.mongodb.kafka.connect.MongoSinkConnector tasks.max=1 key.ignore=true # Specific global MongoDB Sink Connector configuration connection.uri=mongodb://localhost:27017 database=test_kafka collection=transaction max.num.retries=3 retries.defer.timeout=5000 type.name=kafka-connect key.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter=org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable=false {code} I have 2 separated questions: # how to ignore the message which is non-json format? # how to defined a default-key for this kind of message (for example: abc -> \{ "non-json": "abc" } ) Thanks was: Hi team, I can stream data from Kafka to MongoDB with JSON messages. I use [MongoDB Kafka Connector|[https://github.com/mongodb/mongo-kafka/blob/master/docs/install.md]] However, if I send a non-JSON format message the Connector died. Please see the log file for details. My config file: {code:java} name=mongo-sink topics=testconnector.class=com.mongodb.kafka.connect.MongoSinkConnector tasks.max=1 key.ignore=true # Specific global MongoDB Sink Connector configuration connection.uri=mongodb://localhost:27017 database=test_kafka collection=transaction max.num.retries=3 retries.defer.timeout=5000 type.name=kafka-connect key.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter=org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable=false {code} I have 2 separated questions: # how to ignore the message which is non-json format? # how to defined a default-key for this kind of message (for example: abc -> \{ "non-json": "abc" } ) Thanks > non-JSON format messages when streaming data from Kafka to Mongo > > > Key: KAFKA-8664 > URL: https://issues.apache.org/jira/browse/KAFKA-8664 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.1.1 >Reporter: Vu Le >Priority: Major > Attachments: MongoSinkConnector.properties, > log_error_when_stream_data_not_a_json_format.txt > > > Hi team, > I can stream data from Kafka to MongoDB with JSON messages. I use MongoDB > Kafka Connector > ([https://github.com/mongodb/mongo-kafka/blob/master/docs/install.md]) > However, if I send a non-JSON format message the Connector died. Please see > the log file for details. > My config file: > {code:java} > name=mongo-sink > topics=testconnector.class=com.mongodb.kafka.connect.MongoSinkConnector > tasks.max=1 > key.ignore=true > # Specific global MongoDB Sink Connector configuration > connection.uri=mongodb://localhost:27017 > database=test_kafka > collection=transaction > max.num.retries=3 > retries.defer.timeout=5000 > type.name=kafka-connect > key.converter=org.apache.kafka.connect.json.JsonConverter > key.converter.schemas.enable=false > value.converter=org.apache.kafka.connect.json.JsonConverter > value.converter.schemas.enable=false > {code} > I have 2 separated questions: > # how to ignore the message which is non-json format? > # how to defined a default-key for this kind of message (for example: abc -> > \{ "non-json": "abc" } ) > Thanks -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8664) non-JSON format messages when streaming data from Kafka to Mongo
[ https://issues.apache.org/jira/browse/KAFKA-8664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vu Le updated KAFKA-8664: - Description: Hi team, I can stream data from Kafka to MongoDB with JSON messages. I use [MongoDB Kafka Connector|[https://github.com/mongodb/mongo-kafka/blob/master/docs/install.md]] However, if I send a non-JSON format message the Connector died. Please see the log file for details. My config file: {code:java} name=mongo-sink topics=testconnector.class=com.mongodb.kafka.connect.MongoSinkConnector tasks.max=1 key.ignore=true # Specific global MongoDB Sink Connector configuration connection.uri=mongodb://localhost:27017 database=test_kafka collection=transaction max.num.retries=3 retries.defer.timeout=5000 type.name=kafka-connect key.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter=org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable=false {code} I have 2 separated questions: # how to ignore the message which is non-json format? # how to defined a default-key for this kind of message (for example: abc -> \{ "non-json": "abc" } ) Thanks was: Hi team, I can stream data from Kafka to MongoDB with JSON messages. I use [MongoDB Kafka Connector|[https://github.com/mongodb/mongo-kafka/blob/master/docs/install.md]] However, if I send a non-JSON format message the Connector died. Please see the log file for details. My config file: {code:java} name=mongo-sink topics=testconnector.class=com.mongodb.kafka.connect.MongoSinkConnector tasks.max=1 key.ignore=true # Specific global MongoDB Sink Connector configuration connection.uri=mongodb://localhost:27017 database=test_kafka collection=transaction max.num.retries=3 retries.defer.timeout=5000 type.name=kafka-connect key.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter=org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable=false {code} I have 2 separated questions: # how to ignore the message which is non-json format? # how to defined a default-key for this kind of message (for example: abc -> \{{ { "non-json": "abc" } }} > non-JSON format messages when streaming data from Kafka to Mongo > > > Key: KAFKA-8664 > URL: https://issues.apache.org/jira/browse/KAFKA-8664 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.1.1 >Reporter: Vu Le >Priority: Major > Attachments: MongoSinkConnector.properties, > log_error_when_stream_data_not_a_json_format.txt > > > Hi team, > I can stream data from Kafka to MongoDB with JSON messages. I use [MongoDB > Kafka > Connector|[https://github.com/mongodb/mongo-kafka/blob/master/docs/install.md]] > However, if I send a non-JSON format message the Connector died. Please see > the log file for details. > My config file: > {code:java} > name=mongo-sink > topics=testconnector.class=com.mongodb.kafka.connect.MongoSinkConnector > tasks.max=1 > key.ignore=true > # Specific global MongoDB Sink Connector configuration > connection.uri=mongodb://localhost:27017 > database=test_kafka > collection=transaction > max.num.retries=3 > retries.defer.timeout=5000 > type.name=kafka-connect > key.converter=org.apache.kafka.connect.json.JsonConverter > key.converter.schemas.enable=false > value.converter=org.apache.kafka.connect.json.JsonConverter > value.converter.schemas.enable=false > {code} > I have 2 separated questions: > # how to ignore the message which is non-json format? > # how to defined a default-key for this kind of message (for example: abc -> > \{ "non-json": "abc" } ) > Thanks -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8664) non-JSON format messages when streaming data from Kafka to Mongo
Vu Le created KAFKA-8664: Summary: non-JSON format messages when streaming data from Kafka to Mongo Key: KAFKA-8664 URL: https://issues.apache.org/jira/browse/KAFKA-8664 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.1.1 Reporter: Vu Le Attachments: MongoSinkConnector.properties, log_error_when_stream_data_not_a_json_format.txt Hi team, I can stream data from Kafka to MongoDB with JSON messages. I use [MongoDB Kafka Connector|[https://github.com/mongodb/mongo-kafka/blob/master/docs/install.md]] However, if I send a non-JSON format message the Connector died. Please see the log file for details. My config file: {code:java} name=mongo-sink topics=testconnector.class=com.mongodb.kafka.connect.MongoSinkConnector tasks.max=1 key.ignore=true # Specific global MongoDB Sink Connector configuration connection.uri=mongodb://localhost:27017 database=test_kafka collection=transaction max.num.retries=3 retries.defer.timeout=5000 type.name=kafka-connect key.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter=org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable=false {code} I have 2 separated questions: # how to ignore the message which is non-json format? # how to defined a default-key for this kind of message (for example: abc -> \{{ { "non-json": "abc" } }} -- This message was sent by Atlassian JIRA (v7.6.14#76016)