[jira] [Updated] (KAFKA-9526) Augment topology description with serdes
[ https://issues.apache.org/jira/browse/KAFKA-9526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-9526: --- Labels: needs-kip (was: ) > Augment topology description with serdes > > > Key: KAFKA-9526 > URL: https://issues.apache.org/jira/browse/KAFKA-9526 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Labels: needs-kip > > Today we have multiple ways to infer and inherit serde along the topology, > and only fall back to the configured serde when inference does not apply. So > it is a bit hard for users to reason which operators inside the topology > still lacks serde specification. > So I'd propose we augment the topology description with serde information on > source / sink and state store operators. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9526) Augment topology description with serdes
[ https://issues.apache.org/jira/browse/KAFKA-9526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-9526: --- Component/s: streams > Augment topology description with serdes > > > Key: KAFKA-9526 > URL: https://issues.apache.org/jira/browse/KAFKA-9526 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > > Today we have multiple ways to infer and inherit serde along the topology, > and only fall back to the configured serde when inference does not apply. So > it is a bit hard for users to reason which operators inside the topology > still lacks serde specification. > So I'd propose we augment the topology description with serde information on > source / sink and state store operators. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9523) Reduce flakiness of BranchedMultiLevelRepartitionConnectedTopologyTest
[ https://issues.apache.org/jira/browse/KAFKA-9523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-9523: --- Component/s: unit tests streams > Reduce flakiness of BranchedMultiLevelRepartitionConnectedTopologyTest > -- > > Key: KAFKA-9523 > URL: https://issues.apache.org/jira/browse/KAFKA-9523 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Boyang Chen >Priority: Major > > KAFKA-9335 introduces an integration test to verify the topology builder > itself could survive from building a complex topology. This test gets flaky > some time for stream client to broker connection, so we should consider > making it less flaky by either converting to a unit test or just focus on > making the test logic more robust. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8307) Kafka Streams should provide some mechanism to determine topology equality and compatibility
[ https://issues.apache.org/jira/browse/KAFKA-8307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032806#comment-17032806 ] John Roesler commented on KAFKA-8307: - Thanks for the comment, [~guozhang]. It closely matches what has been bouncing around my head, in general that a full fix probably requires a number of different features, and specifically, that we need some way to map "compatible" topologies to each other. This would also let us deal better with topology evolution. But the thought is not very well formed at all. > Kafka Streams should provide some mechanism to determine topology equality > and compatibility > > > Key: KAFKA-8307 > URL: https://issues.apache.org/jira/browse/KAFKA-8307 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: user-experience > > Currently, Streams provides no mechanism to compare two topologies. This is a > common operation when users want to have tests verifying that they don't > accidentally alter their topology. They would save the known-good topology > and then add a unit test verifying the current code against that known-good > state. > However, because there's no way to do this comparison properly, everyone is > reduced to using the string format of the topology (from > `Topology#describe().toString()`). The major drawback is that the string > format is meant for human consumption. It is neither machine-parseable nor > stable. So, these compatibility tests are doomed to fail when any minor, > non-breaking, change is made either to the application, or to the library. > This trains everyone to update the test whenever it fails, undermining its > utility. > We should fix this problem, and provide both a mechanism to serialize the > topology and to compare two topologies for compatibility. All in all, I think > we need: > # a way to serialize/deserialize topology structure in a machine-parseable > format that is future-compatible. Offhand, I'd recommend serializing the > topology structure as JSON, and establishing a policy that attributes should > only be added to the object graph, never removed. Note, it's out of scope to > be able to actually run a deserialized topology; we only want to save and > load the structure (not the logic) to facilitate comparisons. > # a method to verify the *equality* of two topologies... This method tells > you that the two topologies are structurally identical. We can't know if the > logic of any operator has changed, only if the structure of the graph is > changed. We can consider whether other graph properties, like serdes, should > be included. > # a method to verify the *compatibility* of two topologies... This method > tells you that moving from topology A to topology B does not require an > application reset. Note that this operation is not commutative: > `A.compatibleWith(B)` does not imply `B.compatibleWith(A)`. We can discuss > whether `A.compatibleWith(B) && B.compatibleWith(A)` implies `A.equals(B)` (I > think not necessarily, because we may want "equality" to be stricter than > "compatibility"). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9518) NullPointerException on out-of-order topologies
[ https://issues.apache.org/jira/browse/KAFKA-9518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032805#comment-17032805 ] John Roesler commented on KAFKA-9518: - Sure thing. I've linked them. > NullPointerException on out-of-order topologies > --- > > Key: KAFKA-9518 > URL: https://issues.apache.org/jira/browse/KAFKA-9518 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.1, 2.4.0, 2.3.1 >Reporter: Murilo Tavares >Priority: Minor > Attachments: kafka-streams-testing.zip > > > I have a KafkaStreams that dinamically builds a topology based on a Map of > input-to-output topics. Since the map was not sorted, iteration was > unpredictable, and different instances could have different orders. When this > happen, KafkaStreams throws an exception during REBALANCE. > > I was able to reproduce this using the attached java project. The project is > a pretty simple Maven project with one class. It starts 2 instances in > parallel, with the same input-to-output topics, but one instance takes the > topics in a reversed order. > > The exception is this: > {noformat} > Exception in thread > "MY-APP-81e9dc0b-1459-4499-93d6-b5c03da60e18-StreamThread-1" > org.apache.kafka.streams.errors.StreamsException: stream-thread > [MY-APP-81e9dc0b-1459-4499-93d6-b5c03da60e18-StreamThread-1] Failed to > rebalance. > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:852) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) > Caused by: java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:234) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:176) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:355) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:313) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:298) > at > org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160) > at > org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120) > at > org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:272) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:400) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843) > ... 3 more{noformat} > > The topology for both instances: > {code:java} > // instance1 > Topologies: > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [topicA]) > --> KSTREAM-SINK-01 > Sink: KSTREAM-SINK-01 (topic: topicA-repartitioned) > <-- KSTREAM-SOURCE-00 > Sub-topology: 1 > Source: KSTREAM-SOURCE-02 (topics: [topicB]) > --> KSTREAM-SINK-03 > Sink: KSTREAM-SINK-03 (topic: topicB-repartitioned) > <-- KSTREAM-SOURCE-02 > // instance2 > Topologies: > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [topicB]) > --> KSTREAM-SINK-01 > Sink: KSTREAM-SINK-01 (topic: topicB-repartitioned) > <-- KSTREAM-SOURCE-00 > Sub-topology: 1 > Source: KSTREAM-SOURCE-02 (topics: [topicA]) > --> KSTREAM-SINK-03 > Sink: KSTREAM-SINK-03 (topic: topicA-repartitioned) > <-- KSTREAM-SOURCE-02{code} > In my actual project, I fixed the issue by sorting the topics map > accordingly, but it would be nice to have at least a
[jira] [Commented] (KAFKA-9374) Worker can be disabled by blocked connectors
[ https://issues.apache.org/jira/browse/KAFKA-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032791#comment-17032791 ] ASF GitHub Bot commented on KAFKA-9374: --- C0urante commented on pull request #8069: KAFKA-9374: Make connector interactions asynchronous URL: https://github.com/apache/kafka/pull/8069 [Jira ticket](https://issues.apache.org/jira/browse/KAFKA-9374) These changes allow herders to continue to function even when a connector they are running hangs in its start, stop, initialize, validate, and/or config methods. The main idea is to make these connector interactions asynchronous and accept a callback that can be invoked upon the completion (successful or otherwise) of these interactions. The distributed herder handles any follow-up logic by adding a new herder request to its queue in that callback, which helps preserve some synchronization and ordering guarantees provided by the current tick model. There are several items that still need to be addressed: 1) The standalone herder has not been updated to utilize the new asynchronous connector wrapper API provided by the Worker and AbstractHerder classes 2) There is a minor TODO in the DistributedHerderTest class regarding the need to migrate some testing logic into the AbstractHerderTest class 3) More significantly, since connector shutdown is now handled asynchronously, there are two problems with the current changes: a - It is possible (even likely) that a new instance of a connector will be created before an older instance has been shut down. This is especially problematic if a connector claims a shared resource such as a port number and could potentially lead to unnecessary connector failure on startup. b - There is no time allocated during shutdown of the herder for its connectors to shutdown, which may lead to improper resource cleanup. Existing unit tests for the distributed herder and worker have been modified to reflect these changes, and a new integration test named `BlockingConnectorTest` has been added to ensure that they work in practice. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Worker can be disabled by blocked connectors > > > Key: KAFKA-9374 > URL: https://issues.apache.org/jira/browse/KAFKA-9374 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, > 2.2.0, 2.1.1, 2.3.0, 2.2.1, 2.2.2, 2.4.0, 2.3.1 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > If a connector hangs during any of its {{initialize}}, {{start}}, {{stop}}, > \{taskConfigs}}, {{taskClass}}, {{version}}, {{config}}, or {{validate}} > methods, the worker will be disabled for some types of requests thereafter, > including connector creation, connector reconfiguration, and connector > deletion. > -This only occurs in distributed mode and is due to the threading model used > by the > [DistributedHerder|https://github.com/apache/kafka/blob/03f763df8a8d9482d8c099806336f00cf2521465/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java] > class.- This affects both distributed and standalone mode. Distributed > herders perform some connector work synchronously in their {{tick}} thread, > which also handles group membership and some REST requests. The majority of > the herder methods for the standalone herder are {{synchronized}}, including > those for creating, updating, and deleting connectors; as long as one of > those methods blocks, all subsequent calls to any of these methods will also > be blocked. > > One potential solution could be to treat connectors that fail to start, stop, > etc. in time similarly to tasks that fail to stop within the [task graceful > shutdown timeout > period|https://github.com/apache/kafka/blob/03f763df8a8d9482d8c099806336f00cf2521465/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java#L121-L126] > by handling all connector interactions on a separate thread, waiting for > them to complete within a timeout, and abandoning the thread (and > transitioning the connector to the {{FAILED}} state, if it has been created > at all) if that timeout expires. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9526) Augment topology description with serdes
Guozhang Wang created KAFKA-9526: Summary: Augment topology description with serdes Key: KAFKA-9526 URL: https://issues.apache.org/jira/browse/KAFKA-9526 Project: Kafka Issue Type: Improvement Reporter: Guozhang Wang Assignee: Guozhang Wang Today we have multiple ways to infer and inherit serde along the topology, and only fall back to the configured serde when inference does not apply. So it is a bit hard for users to reason which operators inside the topology still lacks serde specification. So I'd propose we augment the topology description with serde information on source / sink and state store operators. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9525) Allow explicit rebalance triggering on the Consumer
[ https://issues.apache.org/jira/browse/KAFKA-9525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman reassigned KAFKA-9525: -- Assignee: Sophie Blee-Goldman > Allow explicit rebalance triggering on the Consumer > --- > > Key: KAFKA-9525 > URL: https://issues.apache.org/jira/browse/KAFKA-9525 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > Labels: needs-kip > > Currently the only way to explicitly trigger a rebalance is by unsubscribing > the consumer. This has two drawbacks: it does not work with static > membership, and it causes the consumer to revoke all its currently owned > partitions. Streams relies on being able to enforce a rebalance for its > version probing upgrade protocol and the upcoming KIP-441, both of which > should be able to work with static membership and be able to leverage the > improvements of KIP-429 to no longer revoke all owned partitions. > We should add an API that will allow users to explicitly trigger a rebalance > without going through #unsubscribe -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9525) Allow explicit rebalance triggering on the Consumer
[ https://issues.apache.org/jira/browse/KAFKA-9525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032788#comment-17032788 ] John Roesler commented on KAFKA-9525: - Agreed. If "trigger a rebalance" is the desired operation, then the best thing is offer a way to do exactly that. > Allow explicit rebalance triggering on the Consumer > --- > > Key: KAFKA-9525 > URL: https://issues.apache.org/jira/browse/KAFKA-9525 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Sophie Blee-Goldman >Priority: Major > Labels: needs-kip > > Currently the only way to explicitly trigger a rebalance is by unsubscribing > the consumer. This has two drawbacks: it does not work with static > membership, and it causes the consumer to revoke all its currently owned > partitions. Streams relies on being able to enforce a rebalance for its > version probing upgrade protocol and the upcoming KIP-441, both of which > should be able to work with static membership and be able to leverage the > improvements of KIP-429 to no longer revoke all owned partitions. > We should add an API that will allow users to explicitly trigger a rebalance > without going through #unsubscribe -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9206) Consumer should handle `CORRUPT_MESSAGE` error code in fetch response
[ https://issues.apache.org/jira/browse/KAFKA-9206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Agam Brahma reassigned KAFKA-9206: -- Assignee: Agam Brahma > Consumer should handle `CORRUPT_MESSAGE` error code in fetch response > - > > Key: KAFKA-9206 > URL: https://issues.apache.org/jira/browse/KAFKA-9206 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Agam Brahma >Priority: Major > > This error code is possible, for example, when the broker scans the log to > find the fetch offset after the index lookup. Currently this results in a > slightly obscure message such as the following: > {code:java} > java.lang.IllegalStateException: Unexpected error code 2 while fetching > data{code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9507) AdminClient should check for missing committed offsets
[ https://issues.apache.org/jira/browse/KAFKA-9507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-9507. Fix Version/s: 2.4.1 2.3.2 2.5.0 Resolution: Fixed > AdminClient should check for missing committed offsets > -- > > Key: KAFKA-9507 > URL: https://issues.apache.org/jira/browse/KAFKA-9507 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: David Mao >Priority: Major > Labels: newbie > Fix For: 2.5.0, 2.3.2, 2.4.1 > > > I noticed this exception getting raised: > {code} > Caused by: java.lang.IllegalArgumentException: Invalid negative offset > at > org.apache.kafka.clients.consumer.OffsetAndMetadata.(OffsetAndMetadata.java:50) > at > org.apache.kafka.clients.admin.KafkaAdminClient$24$1.handleResponse(KafkaAdminClient.java:2832) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:1032) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1160) > {code} > The AdminClient should check for negative offsets in OffsetFetch responses in > the api `listConsumerGroupOffsets`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9507) AdminClient should check for missing committed offsets
[ https://issues.apache.org/jira/browse/KAFKA-9507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032770#comment-17032770 ] ASF GitHub Bot commented on KAFKA-9507: --- hachikuji commented on pull request #8057: KAFKA-9507 AdminClient should check for missing committed offsets URL: https://github.com/apache/kafka/pull/8057 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > AdminClient should check for missing committed offsets > -- > > Key: KAFKA-9507 > URL: https://issues.apache.org/jira/browse/KAFKA-9507 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: David Mao >Priority: Major > Labels: newbie > > I noticed this exception getting raised: > {code} > Caused by: java.lang.IllegalArgumentException: Invalid negative offset > at > org.apache.kafka.clients.consumer.OffsetAndMetadata.(OffsetAndMetadata.java:50) > at > org.apache.kafka.clients.admin.KafkaAdminClient$24$1.handleResponse(KafkaAdminClient.java:2832) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:1032) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1160) > {code} > The AdminClient should check for negative offsets in OffsetFetch responses in > the api `listConsumerGroupOffsets`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-8940) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
[ https://issues.apache.org/jira/browse/KAFKA-8940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032769#comment-17032769 ] Jason Gustafson edited comment on KAFKA-8940 at 2/8/20 12:41 AM: - Found a new instance of this: https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4554/testReport/junit/org.apache.kafka.streams.integration/SmokeTestDriverIntegrationTest/shouldWorkWithRebalance/ {code} java.lang.AssertionError: verifying tagg fail: key=470 tagg=[ConsumerRecord(topic = tagg, partition = 0, leaderEpoch = 0, offset = 32, CreateTime = 1581120009534, serialized key size = 3, serialized value size = 8, headers = RecordHeaders(headers = [], isReadOnly = false), key = 470, value = 1)] expected=0 taggEvents: [ConsumerRecord(topic = tagg, partition = 0, leaderEpoch = 0, offset = 32, CreateTime = 1581120009534, serialized key size = 3, serialized value size = 8, headers = RecordHeaders(headers = [], isReadOnly = false), key = 470, value = 1)] verifying suppressed min-suppressed verifying min-suppressed with 20 keys fail: resultCount=20 expectedCount=10 result=[[4-1003@158112000/158120640], [7-1006@158103360/158112000], [1-1000@158103360/158112000], [9-1008@158112000/158120640], [0-999@158112000/158120640], [9-1008@158103360/158112000], [3-1002@158103360/158112000], [3-1002@158112000/158120640], [8-1007@158112000/158120640], [8-1007@158103360/158112000], [2-1001@158112000/158120640], [2-1001@158103360/158112000], [7-1006@158112000/158120640], [1-1000@158112000/158120640], [0-999@158103360/158112000], [4-1003@158103360/158112000], [6-1005@158112000/158120640], [6-1005@158103360/158112000], [5-1004@158103360/158112000], [5-1004@158112000/158120640]] expected=[7-1006, 3-1002, 0-999, 1-1000, 4-1003, 2-1001, 5-1004, 8-1007, 6-1005, 9-1008] verifying suppressed sws-suppressed verifying min with 10 keys min fail: key=7-1006 actual=535 expected=7 {code} was (Author: hachikuji): Found a new instance of this: https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4554/testReport/junit/org.apache.kafka.streams.integration/SmokeTestDriverIntegrationTest/shouldWorkWithRebalance/ ``` java.lang.AssertionError: verifying tagg fail: key=470 tagg=[ConsumerRecord(topic = tagg, partition = 0, leaderEpoch = 0, offset = 32, CreateTime = 1581120009534, serialized key size = 3, serialized value size = 8, headers = RecordHeaders(headers = [], isReadOnly = false), key = 470, value = 1)] expected=0 taggEvents: [ConsumerRecord(topic = tagg, partition = 0, leaderEpoch = 0, offset = 32, CreateTime = 1581120009534, serialized key size = 3, serialized value size = 8, headers = RecordHeaders(headers = [], isReadOnly = false), key = 470, value = 1)] verifying suppressed min-suppressed verifying min-suppressed with 20 keys fail: resultCount=20 expectedCount=10 result=[[4-1003@158112000/158120640], [7-1006@158103360/158112000], [1-1000@158103360/158112000], [9-1008@158112000/158120640], [0-999@158112000/158120640], [9-1008@158103360/158112000], [3-1002@158103360/158112000], [3-1002@158112000/158120640], [8-1007@158112000/158120640], [8-1007@158103360/158112000], [2-1001@158112000/158120640], [2-1001@158103360/158112000], [7-1006@158112000/158120640], [1-1000@158112000/158120640], [0-999@158103360/158112000], [4-1003@158103360/158112000], [6-1005@158112000/158120640], [6-1005@158103360/158112000], [5-1004@158103360/158112000], [5-1004@158112000/158120640]] expected=[7-1006, 3-1002, 0-999, 1-1000, 4-1003, 2-1001, 5-1004, 8-1007, 6-1005, 9-1008] verifying suppressed sws-suppressed verifying min with 10 keys min fail: key=7-1006 actual=535 expected=7 ``` > Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance > - > > Key: KAFKA-8940 > URL: https://issues.apache.org/jira/browse/KAFKA-8940 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Labels: flaky-test > Fix For: 2.5.0 > > > I lost the screen shot unfortunately... it reports the set of expected > records does not match the received records. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8940) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
[ https://issues.apache.org/jira/browse/KAFKA-8940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032769#comment-17032769 ] Jason Gustafson commented on KAFKA-8940: Found a new instance of this: https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4554/testReport/junit/org.apache.kafka.streams.integration/SmokeTestDriverIntegrationTest/shouldWorkWithRebalance/ ``` java.lang.AssertionError: verifying tagg fail: key=470 tagg=[ConsumerRecord(topic = tagg, partition = 0, leaderEpoch = 0, offset = 32, CreateTime = 1581120009534, serialized key size = 3, serialized value size = 8, headers = RecordHeaders(headers = [], isReadOnly = false), key = 470, value = 1)] expected=0 taggEvents: [ConsumerRecord(topic = tagg, partition = 0, leaderEpoch = 0, offset = 32, CreateTime = 1581120009534, serialized key size = 3, serialized value size = 8, headers = RecordHeaders(headers = [], isReadOnly = false), key = 470, value = 1)] verifying suppressed min-suppressed verifying min-suppressed with 20 keys fail: resultCount=20 expectedCount=10 result=[[4-1003@158112000/158120640], [7-1006@158103360/158112000], [1-1000@158103360/158112000], [9-1008@158112000/158120640], [0-999@158112000/158120640], [9-1008@158103360/158112000], [3-1002@158103360/158112000], [3-1002@158112000/158120640], [8-1007@158112000/158120640], [8-1007@158103360/158112000], [2-1001@158112000/158120640], [2-1001@158103360/158112000], [7-1006@158112000/158120640], [1-1000@158112000/158120640], [0-999@158103360/158112000], [4-1003@158103360/158112000], [6-1005@158112000/158120640], [6-1005@158103360/158112000], [5-1004@158103360/158112000], [5-1004@158112000/158120640]] expected=[7-1006, 3-1002, 0-999, 1-1000, 4-1003, 2-1001, 5-1004, 8-1007, 6-1005, 9-1008] verifying suppressed sws-suppressed verifying min with 10 keys min fail: key=7-1006 actual=535 expected=7 ``` > Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance > - > > Key: KAFKA-8940 > URL: https://issues.apache.org/jira/browse/KAFKA-8940 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Labels: flaky-test > Fix For: 2.5.0 > > > I lost the screen shot unfortunately... it reports the set of expected > records does not match the received records. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9254) Updating Kafka Broker configuration dynamically twice reverts log configuration to default
[ https://issues.apache.org/jira/browse/KAFKA-9254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032766#comment-17032766 ] ASF GitHub Bot commented on KAFKA-9254: --- soondenana commented on pull request #8067: KAFKA-9254; Overridden topic configs are reset after dynamic default change (#7870) URL: https://github.com/apache/kafka/pull/8067 Currently, when a dynamic change is made to the broker-level default log configuration, existing log configs will be recreated with an empty overridden configs. In such case, when updating dynamic broker configs a second round, the topic-level configs are lost. This can cause unexpected data loss, for example, if the cleanup policy changes from "compact" to "delete." Reviewers: Rajini Sivaram , Jason Gustafson (cherry picked from commit 0e7f867041959c5d77727c7f5ce32d363fa09fc2) *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Updating Kafka Broker configuration dynamically twice reverts log > configuration to default > -- > > Key: KAFKA-9254 > URL: https://issues.apache.org/jira/browse/KAFKA-9254 > Project: Kafka > Issue Type: Bug > Components: config, log, replication >Affects Versions: 2.0.1, 2.1.1, 2.2.2, 2.4.0, 2.3.1 >Reporter: fenghong >Assignee: huxihx >Priority: Critical > Fix For: 2.0.2, 2.1.2, 2.2.3, 2.3.2, 2.4.1 > > > We are engineers at Huobi and now encounter Kafka BUG > Modifying DynamicBrokerConfig more than 2 times will invalidate the topic > level unrelated configuration > The bug reproduction method as follows: > # Set Kafka Broker config server.properties min.insync.replicas=3 > # Create topic test-1 and set topic‘s level config min.insync.replicas=2 > # Dynamically modify the configuration twice as shown below > {code:java} > bin/kafka-configs.sh --bootstrap-server xxx:9092 --entity-type brokers > --entity-default --alter --add-config log.message.timestamp.type=LogAppendTime > bin/kafka-configs.sh --bootstrap-server xxx:9092 --entity-type brokers > --entity-default --alter --add-config log.retention.ms=60480 > {code} > # stop a Kafka Server and found the Exception as shown below > org.apache.kafka.common.errors.NotEnoughReplicasException: Number of insync > replicas for partition test-1-0 is [2], below required minimum [3] > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9525) Allow explicit rebalance triggering on the Consumer
[ https://issues.apache.org/jira/browse/KAFKA-9525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032763#comment-17032763 ] Sophie Blee-Goldman commented on KAFKA-9525: Well, the point is we want to be able to trigger a rebalance without removing a member. That would definitely solve the static membership problem, but we still want the option to trigger a rebalance as lightweight as possible for KIP-441, ie without having to revoke all partitions or actually leave the group. > Allow explicit rebalance triggering on the Consumer > --- > > Key: KAFKA-9525 > URL: https://issues.apache.org/jira/browse/KAFKA-9525 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Sophie Blee-Goldman >Priority: Major > Labels: needs-kip > > Currently the only way to explicitly trigger a rebalance is by unsubscribing > the consumer. This has two drawbacks: it does not work with static > membership, and it causes the consumer to revoke all its currently owned > partitions. Streams relies on being able to enforce a rebalance for its > version probing upgrade protocol and the upcoming KIP-441, both of which > should be able to work with static membership and be able to leverage the > improvements of KIP-429 to no longer revoke all owned partitions. > We should add an API that will allow users to explicitly trigger a rebalance > without going through #unsubscribe -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9525) Allow explicit rebalance triggering on the Consumer
[ https://issues.apache.org/jira/browse/KAFKA-9525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032762#comment-17032762 ] Boyang Chen commented on KAFKA-9525: If you are talking about application level rebalance triggering, after https://issues.apache.org/jira/browse/KAFKA-9146 we could potentially have admin client remove one single member to trigger rebalance, which is pretty convenient. > Allow explicit rebalance triggering on the Consumer > --- > > Key: KAFKA-9525 > URL: https://issues.apache.org/jira/browse/KAFKA-9525 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Sophie Blee-Goldman >Priority: Major > Labels: needs-kip > > Currently the only way to explicitly trigger a rebalance is by unsubscribing > the consumer. This has two drawbacks: it does not work with static > membership, and it causes the consumer to revoke all its currently owned > partitions. Streams relies on being able to enforce a rebalance for its > version probing upgrade protocol and the upcoming KIP-441, both of which > should be able to work with static membership and be able to leverage the > improvements of KIP-429 to no longer revoke all owned partitions. > We should add an API that will allow users to explicitly trigger a rebalance > without going through #unsubscribe -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9519) Deprecate ZooKeeper access for kafka-configs.sh
[ https://issues.apache.org/jira/browse/KAFKA-9519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032760#comment-17032760 ] ASF GitHub Bot commented on KAFKA-9519: --- cmccabe commented on pull request #8056: KAFKA-9519: Deprecating ZK for ConfigCommand URL: https://github.com/apache/kafka/pull/8056 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Deprecate ZooKeeper access for kafka-configs.sh > > > Key: KAFKA-9519 > URL: https://issues.apache.org/jira/browse/KAFKA-9519 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 2.5.0 >Reporter: Sanjana Kaundinya >Assignee: Sanjana Kaundinya >Priority: Major > Fix For: 2.5.0 > > > As part of KIP-555 access for zookeeper must be deprecated for > kafka-configs.sh. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-4090) JVM runs into OOM if (Java) client uses a SSL port without setting the security protocol
[ https://issues.apache.org/jira/browse/KAFKA-4090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032753#comment-17032753 ] David Mollitor commented on KAFKA-4090: --- OK. I took a stab at this and I added a unit test to simulate the SSL response laid out in KIP-498. I don't think the answer is to use some sort of max buffer size. That feature may be nice as protection from a misconfigured server, but not for this use case. As pointed out int KIP-498, the server identifies that this is a plain-text connection, but the client is unable to identify the connection as being SSL-enabled. How is that the case? Well, the server buffers data as it is being read in chunks (8K perhaps). Once it reads enough data into the buffer, it runs a quick scan on the bytes to try to detect SSL v.s. plaintext. I tried putting the logic into the client, but it was very difficult. The client does not buffer data in chunks in the same way. It reads 4 bytes from the stream, and when those four bytes are read, it parses the size and creates the necessary payload buffer. It's very simple. There's just not a lot of flexibility in the existing setup. Once the data is consumed from the stream, you can't put it back. That is, there is no way to read 16 bytes, check for SSL, and if it's not SSL, put the bytes back into the front of the stream (or just inspect without consuming) for normal processing. Once those 16 bytes are consumed, they are consumed... even if the stream has two 8 byte packets one-after-the-other. In this scenario, reading 16 bytes to parse one packet just ate a packet. To detect SSL, Netty (Apache 2.0) requires 5 bytes. So, I am doing the same. I am consuming 4 bytes and not loading a buffer at that time. If the stream produces just 1 more byte, then I check if SSL is present, and if not, I create the required buffer. It is much like a lazy-load implementation. The payload buffer doesn't get created unless it absolutely has to. Unfortunately, the unit tests bit me hard, because they all assume there will be two reads: one 4-byte read to get the payload size (and create the payload buffer), then one read to get the payload itself. So, the mocking all mimics two reads. My code does 3 reads: 1 for the size, 1 to get a more byte to test for SSL, then 1 for the payload. For anything that is a "live" test, with a test socket actually receiving data, this is brittle but generally works because it is very unlikely that the socket won't at least have 4 bytes for the size in the first read. However, I think some of these live test would fail if there was only 1 or 2 bytes came from the stream at a time. There is one unit test that is counting the number of reads that take place. I had to disable it because it assumes the payload buffer is created after only 1 read (flaky test if the data comes in slowly),... it blocks (for testing purposes) any subsequent reads, so my implementation never creates the lazy payload buffer because a second read is never triggered). > JVM runs into OOM if (Java) client uses a SSL port without setting the > security protocol > > > Key: KAFKA-4090 > URL: https://issues.apache.org/jira/browse/KAFKA-4090 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.9.0.1, 0.10.0.1, 2.1.0 >Reporter: Jaikiran Pai >Assignee: Alexandre Dupriez >Priority: Major > > Quoting from the mail thread that was sent to Kafka mailing list: > {quote} > We have been using Kafka 0.9.0.1 (server and Java client libraries). So far > we had been using it with plaintext transport but recently have been > considering upgrading to using SSL. It mostly works except that a > mis-configured producer (and even consumer) causes a hard to relate > OutOfMemory exception and thus causing the JVM in which the client is > running, to go into a bad state. We can consistently reproduce that OOM very > easily. We decided to check if this is something that is fixed in 0.10.0.1 so > upgraded one of our test systems to that version (both server and client > libraries) but still see the same issue. Here's how it can be easily > reproduced > 1. Enable SSL listener on the broker via server.properties, as per the Kafka > documentation > {code} > listeners=PLAINTEXT://:9092,SSL://:9093 > ssl.keystore.location= > ssl.keystore.password=pass > ssl.key.password=pass > ssl.truststore.location= > ssl.truststore.password=pass > {code} > 2. Start zookeeper and kafka server > 3. Create a "oom-test" topic (which will be used for these tests): > {code} > kafka-topics.sh --zookeeper localhost:2181 --create --topic oom-test > --partitions 1 --replication-factor 1 > {code} > 4. Create a simple producer which
[jira] [Commented] (KAFKA-4090) JVM runs into OOM if (Java) client uses a SSL port without setting the security protocol
[ https://issues.apache.org/jira/browse/KAFKA-4090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032746#comment-17032746 ] ASF GitHub Bot commented on KAFKA-4090: --- belugabehr commented on pull request #8066: KAFKA-4090: Validate SSL connection in client URL: https://github.com/apache/kafka/pull/8066 Issue has been around for a while. Will post update on JIRA. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > JVM runs into OOM if (Java) client uses a SSL port without setting the > security protocol > > > Key: KAFKA-4090 > URL: https://issues.apache.org/jira/browse/KAFKA-4090 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.9.0.1, 0.10.0.1, 2.1.0 >Reporter: Jaikiran Pai >Assignee: Alexandre Dupriez >Priority: Major > > Quoting from the mail thread that was sent to Kafka mailing list: > {quote} > We have been using Kafka 0.9.0.1 (server and Java client libraries). So far > we had been using it with plaintext transport but recently have been > considering upgrading to using SSL. It mostly works except that a > mis-configured producer (and even consumer) causes a hard to relate > OutOfMemory exception and thus causing the JVM in which the client is > running, to go into a bad state. We can consistently reproduce that OOM very > easily. We decided to check if this is something that is fixed in 0.10.0.1 so > upgraded one of our test systems to that version (both server and client > libraries) but still see the same issue. Here's how it can be easily > reproduced > 1. Enable SSL listener on the broker via server.properties, as per the Kafka > documentation > {code} > listeners=PLAINTEXT://:9092,SSL://:9093 > ssl.keystore.location= > ssl.keystore.password=pass > ssl.key.password=pass > ssl.truststore.location= > ssl.truststore.password=pass > {code} > 2. Start zookeeper and kafka server > 3. Create a "oom-test" topic (which will be used for these tests): > {code} > kafka-topics.sh --zookeeper localhost:2181 --create --topic oom-test > --partitions 1 --replication-factor 1 > {code} > 4. Create a simple producer which sends a single message to the topic via > Java (new producer) APIs: > {code} > public class OOMTest { > public static void main(final String[] args) throws Exception { > final Properties kafkaProducerConfigs = new Properties(); > // NOTE: Intentionally use a SSL port without specifying > security.protocol as SSL > > kafkaProducerConfigs.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9093"); > > kafkaProducerConfigs.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > > kafkaProducerConfigs.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > try (KafkaProducer producer = new > KafkaProducer<>(kafkaProducerConfigs)) { > System.out.println("Created Kafka producer"); > final String topicName = "oom-test"; > final String message = "Hello OOM!"; > // send a message to the topic > final Future recordMetadataFuture = > producer.send(new ProducerRecord<>(topicName, message)); > final RecordMetadata sentRecordMetadata = > recordMetadataFuture.get(); > System.out.println("Sent message '" + message + "' to topic '" + > topicName + "'"); > } > System.out.println("Tests complete"); > } > } > {code} > Notice that the server URL is using a SSL endpoint localhost:9093 but isn't > specifying any of the other necessary SSL configs like security.protocol. > 5. For the sake of easily reproducing this issue run this class with a max > heap size of 256MB (-Xmx256M). Running this code throws up the following > OutOfMemoryError in one of the Sender threads: > {code} > 18:33:25,770 ERROR [KafkaThread] - Uncaught exception in > kafka-producer-network-thread | producer-1: > java.lang.OutOfMemoryError: Java heap space > at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) > at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) > at > org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93) > at >
[jira] [Commented] (KAFKA-9504) Memory leak in KafkaMetrics registered to MBean
[ https://issues.apache.org/jira/browse/KAFKA-9504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032739#comment-17032739 ] Ted Yu commented on KAFKA-9504: --- It seems the closing of metrics is not enough in terms of preventing memory leak: {code} Utils.closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", firstException); Utils.closeQuietly(metrics, "consumer metrics", firstException); {code} > Memory leak in KafkaMetrics registered to MBean > --- > > Key: KAFKA-9504 > URL: https://issues.apache.org/jira/browse/KAFKA-9504 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.4.0 >Reporter: Andreas Holmén >Priority: Major > > After close() called on a KafkaConsumer some registered MBeans are not > unregistered causing leak. > > > {code:java} > import static > org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; > import java.lang.management.ManagementFactory; > import java.util.HashMap; > import java.util.Map; > import javax.management.MBeanServer; > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.serialization.ByteArrayDeserializer; > public class Leaker { > private static String bootstrapServers = "hostname:9092"; > > public static void main(String[] args) throws InterruptedException { > MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); > Map props = new HashMap<>(); > props.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); > > int beans = mBeanServer.getMBeanCount(); > for (int i = 0; i < 100; i++) { >KafkaConsumer consumer = new KafkaConsumer<>(props, new > ByteArrayDeserializer(), new ByteArrayDeserializer()); >consumer.close(); > } > int newBeans = mBeanServer.getMBeanCount(); > System.out.println("\nbeans delta: " + (newBeans - beans)); > } > } > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9525) Allow explicit rebalance triggering on the Consumer
Sophie Blee-Goldman created KAFKA-9525: -- Summary: Allow explicit rebalance triggering on the Consumer Key: KAFKA-9525 URL: https://issues.apache.org/jira/browse/KAFKA-9525 Project: Kafka Issue Type: Improvement Components: clients Reporter: Sophie Blee-Goldman Currently the only way to explicitly trigger a rebalance is by unsubscribing the consumer. This has two drawbacks: it does not work with static membership, and it causes the consumer to revoke all its currently owned partitions. Streams relies on being able to enforce a rebalance for its version probing upgrade protocol and the upcoming KIP-441, both of which should be able to work with static membership and be able to leverage the improvements of KIP-429 to no longer revoke all owned partitions. We should add an API that will allow users to explicitly trigger a rebalance without going through #unsubscribe -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9524) Default window retention does not consider grace period
[ https://issues.apache.org/jira/browse/KAFKA-9524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032715#comment-17032715 ] Michael Bingham commented on KAFKA-9524: Good point about the workaround. Thanks [~vvcephei]! > Default window retention does not consider grace period > --- > > Key: KAFKA-9524 > URL: https://issues.apache.org/jira/browse/KAFKA-9524 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Michael Bingham >Priority: Minor > > In a windowed aggregation, if you specify a window size larger than the > default window retention (1 day), Streams will implicitly set retention > accordingly to accommodate windows of that size. For example, > {code:java} > .windowedBy(TimeWindows.of(Duration.ofDays(20))) > {code} > In this case, Streams will implicitly set window retention to 20 days, and no > exceptions will occur. > However, if you also include a non-zero grace period on the window, such as: > {code:java} > .windowedBy(TimeWindows.of(Duration.ofDays(20)).grace(Duration.ofMinutes(5))) > {code} > In this case, Streams will still implicitly set the window retention 20 days > (not 20 days + 5 minutes grace), and an exception will be thrown: > {code:java} > Exception in thread "main" java.lang.IllegalArgumentException: The retention > period of the window store KSTREAM-KEY-SELECT-02 must be no smaller > than its window size plus the grace period. Got size=[172800], > grace=[30], retention=[172800]{code} > Ideally, Streams should include grace period when implicitly setting window > retention. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9524) Default window retention does not consider grace period
[ https://issues.apache.org/jira/browse/KAFKA-9524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032714#comment-17032714 ] John Roesler commented on KAFKA-9524: - Thanks for the report, [~mikebin]! FWIW, the workaround is to also set the retention time on the store (using Materialized) to be at least windowSize + grace. I just mention that for anyone who happens to find this ticket after searching the error message. I agree that we could and should configure the store's retention properly. If I recall correctly, we need to remove the deprecated method: > org.apache.kafka.streams.kstream.TimeWindows#maintainMs and the other deprecated members of Windows and its children. They've been deprecated since 2.1, so it's probably been long enough by now for us to go ahead and remove them and make Streams behave more nicely in this case. > Default window retention does not consider grace period > --- > > Key: KAFKA-9524 > URL: https://issues.apache.org/jira/browse/KAFKA-9524 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Michael Bingham >Priority: Minor > > In a windowed aggregation, if you specify a window size larger than the > default window retention (1 day), Streams will implicitly set retention > accordingly to accommodate windows of that size. For example, > {code:java} > .windowedBy(TimeWindows.of(Duration.ofDays(20))) > {code} > In this case, Streams will implicitly set window retention to 20 days, and no > exceptions will occur. > However, if you also include a non-zero grace period on the window, such as: > {code:java} > .windowedBy(TimeWindows.of(Duration.ofDays(20)).grace(Duration.ofMinutes(5))) > {code} > In this case, Streams will still implicitly set the window retention 20 days > (not 20 days + 5 minutes grace), and an exception will be thrown: > {code:java} > Exception in thread "main" java.lang.IllegalArgumentException: The retention > period of the window store KSTREAM-KEY-SELECT-02 must be no smaller > than its window size plus the grace period. Got size=[172800], > grace=[30], retention=[172800]{code} > Ideally, Streams should include grace period when implicitly setting window > retention. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9524) Default window retention does not consider grace period
[ https://issues.apache.org/jira/browse/KAFKA-9524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Bingham updated KAFKA-9524: --- Description: In a windowed aggregation, if you specify a window size larger than the default window retention (1 day), Streams will implicitly set retention accordingly to accommodate windows of that size. For example, {code:java} .windowedBy(TimeWindows.of(Duration.ofDays(20))) {code} In this case, Streams will implicitly set window retention to 20 days, and no exceptions will occur. However, if you also include a non-zero grace period on the window, such as: {code:java} .windowedBy(TimeWindows.of(Duration.ofDays(20)).grace(Duration.ofMinutes(5))) {code} In this case, Streams will still implicitly set the window retention 20 days (not 20 days + 5 minutes grace), and an exception will be thrown: {code:java} Exception in thread "main" java.lang.IllegalArgumentException: The retention period of the window store KSTREAM-KEY-SELECT-02 must be no smaller than its window size plus the grace period. Got size=[172800], grace=[30], retention=[172800]{code} Ideally, Streams should include grace period when implicitly setting window retention. was: In a windowed aggregation, if you specify a window size larger than the default window retention (1 day), Streams will implicitly set retention accordingly to accommodate windows of that size. For example, {code:java} .windowedBy(TimeWindows.of(Duration.ofDays(20))) {code} In this case, Streams will implicitly set window retention to 20 days, and no exceptions will occur. However, if you also include a non-zero grace period on the window, such as: {code:java} .windowedBy(TimeWindows.of(Duration.ofDays(20)).grace(Duration.ofMinutes(5))) {code} In this case, Streams will still implicitly set the window retention 20 days (not 20 days + 5 minutes grace), and an exception will be thrown: Exception in thread "main" java.lang.IllegalArgumentException: The retention period of the window store KSTREAM-KEY-SELECT-02 must be no smaller than its window size plus the grace period. Got size=[172800], grace=[30], retention=[172800] Ideally, Streams should include grace period when implicitly setting window retention. > Default window retention does not consider grace period > --- > > Key: KAFKA-9524 > URL: https://issues.apache.org/jira/browse/KAFKA-9524 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Michael Bingham >Priority: Minor > > In a windowed aggregation, if you specify a window size larger than the > default window retention (1 day), Streams will implicitly set retention > accordingly to accommodate windows of that size. For example, > {code:java} > .windowedBy(TimeWindows.of(Duration.ofDays(20))) > {code} > In this case, Streams will implicitly set window retention to 20 days, and no > exceptions will occur. > However, if you also include a non-zero grace period on the window, such as: > {code:java} > .windowedBy(TimeWindows.of(Duration.ofDays(20)).grace(Duration.ofMinutes(5))) > {code} > In this case, Streams will still implicitly set the window retention 20 days > (not 20 days + 5 minutes grace), and an exception will be thrown: > {code:java} > Exception in thread "main" java.lang.IllegalArgumentException: The retention > period of the window store KSTREAM-KEY-SELECT-02 must be no smaller > than its window size plus the grace period. Got size=[172800], > grace=[30], retention=[172800]{code} > Ideally, Streams should include grace period when implicitly setting window > retention. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9524) Default window retention does not consider grace period
[ https://issues.apache.org/jira/browse/KAFKA-9524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Bingham updated KAFKA-9524: --- Description: In a windowed aggregation, if you specify a window size larger than the default window retention (1 day), Streams will implicitly set retention accordingly to accommodate windows of that size. For example, {code:java} .windowedBy(TimeWindows.of(Duration.ofDays(20))) {code} In this case, Streams will implicitly set window retention to 20 days, and no exceptions will occur. However, if you also include a non-zero grace period on the window, such as: {code:java} .windowedBy(TimeWindows.of(Duration.ofDays(20)).grace(Duration.ofMinutes(5))) {code} In this case, Streams will still implicitly set the window retention 20 days (not 20 days + 5 minutes grace), and an exception will be thrown: Exception in thread "main" java.lang.IllegalArgumentException: The retention period of the window store KSTREAM-KEY-SELECT-02 must be no smaller than its window size plus the grace period. Got size=[172800], grace=[30], retention=[172800] Ideally, Streams should include grace period when implicitly setting window retention. was: In a windowed aggregation, if you specify a window size larger than the default window retention (1 day), Streams will implicitly set retention accordingly to accommodate windows of that size. For example, {code:java} .windowedBy(TimeWindows.of(Duration.ofDays(20))) {code} In this case, Streams will implicitly set window retention to 20 days, and no exceptions will occur. However, if you also include a non-zero grace period on the window, such as: {code:java} .windowedBy(TimeWindows.of(Duration.ofDays(20)).grace(Duration.ofMinutes(5))) {code} In this case, Streams will still implicitly set the window retention 20 days (not 20 days + 5 minutes grace), and an exception will be thrown: Exception in thread "main" java.lang.IllegalArgumentException: The retention period of the window store KSTREAM-KEY-SELECT-02 must be no smaller than its window size plus the grace period. Got size=[172800], grace=[30], retention=[172800] Ideally, Streams should include grace period when implicitly setting window retention. > Default window retention does not consider grace period > --- > > Key: KAFKA-9524 > URL: https://issues.apache.org/jira/browse/KAFKA-9524 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Michael Bingham >Priority: Minor > > In a windowed aggregation, if you specify a window size larger than the > default window retention (1 day), Streams will implicitly set retention > accordingly to accommodate windows of that size. For example, > {code:java} > .windowedBy(TimeWindows.of(Duration.ofDays(20))) > {code} > In this case, Streams will implicitly set window retention to 20 days, and no > exceptions will occur. > However, if you also include a non-zero grace period on the window, such as: > {code:java} > .windowedBy(TimeWindows.of(Duration.ofDays(20)).grace(Duration.ofMinutes(5))) > {code} > In this case, Streams will still implicitly set the window retention 20 days > (not 20 days + 5 minutes grace), and an exception will be thrown: > Exception in thread "main" java.lang.IllegalArgumentException: The retention > period of the window store KSTREAM-KEY-SELECT-02 must be no smaller > than its window size plus the grace period. Got size=[172800], > grace=[30], retention=[172800] > Ideally, Streams should include grace period when implicitly setting window > retention. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9524) Default window retention does not consider grace period
Michael Bingham created KAFKA-9524: -- Summary: Default window retention does not consider grace period Key: KAFKA-9524 URL: https://issues.apache.org/jira/browse/KAFKA-9524 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.4.0 Reporter: Michael Bingham In a windowed aggregation, if you specify a window size larger than the default window retention (1 day), Streams will implicitly set retention accordingly to accommodate windows of that size. For example, {code:java} .windowedBy(TimeWindows.of(Duration.ofDays(20))) {code} In this case, Streams will implicitly set window retention to 20 days, and no exceptions will occur. However, if you also include a non-zero grace period on the window, such as: {code:java} .windowedBy(TimeWindows.of(Duration.ofDays(20)).grace(Duration.ofMinutes(5))) {code} In this case, Streams will still implicitly set the window retention 20 days (not 20 days + 5 minutes grace), and an exception will be thrown: Exception in thread "main" java.lang.IllegalArgumentException: The retention period of the window store KSTREAM-KEY-SELECT-02 must be no smaller than its window size plus the grace period. Got size=[172800], grace=[30], retention=[172800] Ideally, Streams should include grace period when implicitly setting window retention. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9523) Reduce flakiness of BranchedMultiLevelRepartitionConnectedTopologyTest
Boyang Chen created KAFKA-9523: -- Summary: Reduce flakiness of BranchedMultiLevelRepartitionConnectedTopologyTest Key: KAFKA-9523 URL: https://issues.apache.org/jira/browse/KAFKA-9523 Project: Kafka Issue Type: Test Reporter: Boyang Chen KAFKA-9335 introduces an integration test to verify the topology builder itself could survive from building a complex topology. This test gets flaky some time for stream client to broker connection, so we should consider making it less flaky by either converting to a unit test or just focus on making the test logic more robust. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9509) Fix flaky test MirrorConnectorsIntegrationTest.testReplication
[ https://issues.apache.org/jira/browse/KAFKA-9509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032689#comment-17032689 ] ASF GitHub Bot commented on KAFKA-9509: --- hachikuji commented on pull request #8048: KAFKA-9509: Fixing flakiness of MirrorConnectorsIntegrationTest.testReplication URL: https://github.com/apache/kafka/pull/8048 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix flaky test MirrorConnectorsIntegrationTest.testReplication > -- > > Key: KAFKA-9509 > URL: https://issues.apache.org/jira/browse/KAFKA-9509 > Project: Kafka > Issue Type: Test > Components: mirrormaker >Affects Versions: 2.4.0, 2.4.1, 2.5.0 >Reporter: Sanjana Kaundinya >Assignee: Sanjana Kaundinya >Priority: Major > Fix For: 2.5.0 > > > The test > org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication > is a flaky test for MirrorMaker 2.0. Its flakiness lies in the timing of > when the connectors and tasks are started up. The fix for this would make it > such that when the connectors are started up, to wait until the REST endpoint > returns a positive number of tasks to be confident that we can start testing. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9248) Foreign key join does not pickup default serdes and dies with NPE
[ https://issues.apache.org/jira/browse/KAFKA-9248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-9248. Resolution: Duplicate Agreed. Closing this ticket as duplicate. > Foreign key join does not pickup default serdes and dies with NPE > - > > Key: KAFKA-9248 > URL: https://issues.apache.org/jira/browse/KAFKA-9248 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Major > > The foreign key join operator only works if `Serdes` are passed in by the > user via corresponding API methods. > If one tries to fall back to default Serdes from `StreamsConfig` the operator > does not pick up those Serdes but dies with a NPE. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9518) NullPointerException on out-of-order topologies
[ https://issues.apache.org/jira/browse/KAFKA-9518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032663#comment-17032663 ] Matthias J. Sax commented on KAFKA-9518: Thanks for digging out the other Jira [~vvcephei] – can we properly link all of those as "related" so it shows up on top? It gets lost easily in comments. Still unclear to me, to what extent we can fix it though... > NullPointerException on out-of-order topologies > --- > > Key: KAFKA-9518 > URL: https://issues.apache.org/jira/browse/KAFKA-9518 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.1, 2.4.0, 2.3.1 >Reporter: Murilo Tavares >Priority: Minor > Attachments: kafka-streams-testing.zip > > > I have a KafkaStreams that dinamically builds a topology based on a Map of > input-to-output topics. Since the map was not sorted, iteration was > unpredictable, and different instances could have different orders. When this > happen, KafkaStreams throws an exception during REBALANCE. > > I was able to reproduce this using the attached java project. The project is > a pretty simple Maven project with one class. It starts 2 instances in > parallel, with the same input-to-output topics, but one instance takes the > topics in a reversed order. > > The exception is this: > {noformat} > Exception in thread > "MY-APP-81e9dc0b-1459-4499-93d6-b5c03da60e18-StreamThread-1" > org.apache.kafka.streams.errors.StreamsException: stream-thread > [MY-APP-81e9dc0b-1459-4499-93d6-b5c03da60e18-StreamThread-1] Failed to > rebalance. > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:852) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) > Caused by: java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:234) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:176) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:355) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:313) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:298) > at > org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160) > at > org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120) > at > org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:272) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:400) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843) > ... 3 more{noformat} > > The topology for both instances: > {code:java} > // instance1 > Topologies: > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [topicA]) > --> KSTREAM-SINK-01 > Sink: KSTREAM-SINK-01 (topic: topicA-repartitioned) > <-- KSTREAM-SOURCE-00 > Sub-topology: 1 > Source: KSTREAM-SOURCE-02 (topics: [topicB]) > --> KSTREAM-SINK-03 > Sink: KSTREAM-SINK-03 (topic: topicB-repartitioned) > <-- KSTREAM-SOURCE-02 > // instance2 > Topologies: > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [topicB]) > --> KSTREAM-SINK-01 > Sink: KSTREAM-SINK-01 (topic: topicB-repartitioned) > <-- KSTREAM-SOURCE-00 > Sub-topology: 1 > Source: KSTREAM-SOURCE-02 (topics: [topicA]) > --> KSTREAM-SINK-03 > Sink: KSTREAM-SINK-03 (topic:
[jira] [Commented] (KAFKA-9498) Topic validation during the creation trigger unnecessary TopicChange events
[ https://issues.apache.org/jira/browse/KAFKA-9498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032654#comment-17032654 ] ASF GitHub Bot commented on KAFKA-9498: --- dajac commented on pull request #8062: KAFKA-9498; Topic validation during the creation trigger unnecessary TopicChange events URL: https://github.com/apache/kafka/pull/8062 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Topic validation during the creation trigger unnecessary TopicChange events > > > Key: KAFKA-9498 > URL: https://issues.apache.org/jira/browse/KAFKA-9498 > Project: Kafka > Issue Type: Bug >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > > I have found out that the topic validation logic, which is executed when > CreateTopicPolicy or when validateOnly is set, triggers unnecessary > ChangeTopic events in the controller. In the worst case, it can trigger up to > one event per created topic and leads to overloading the controller. > This happens because the validation logic reads all the topics from ZK using > the method getAllTopicsInCluster provided by the KafkaZKClient. This method > registers a watch every time the topics are read from Zookeeper. > I think that we should make the watch registration optional for this call in > oder to avoid this unwanted behaviour. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9248) Foreign key join does not pickup default serdes and dies with NPE
[ https://issues.apache.org/jira/browse/KAFKA-9248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032632#comment-17032632 ] John Roesler commented on KAFKA-9248: - Hey [~mjsax], I just saw this ticket. I think it's the same as https://issues.apache.org/jira/browse/KAFKA-9517, do you agree? If so, I'd close this one as a duplicate, since 9517 already has a PR. > Foreign key join does not pickup default serdes and dies with NPE > - > > Key: KAFKA-9248 > URL: https://issues.apache.org/jira/browse/KAFKA-9248 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Major > > The foreign key join operator only works if `Serdes` are passed in by the > user via corresponding API methods. > If one tries to fall back to default Serdes from `StreamsConfig` the operator > does not pick up those Serdes but dies with a NPE. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9517) KTable Joins Without Materialized Argument Yield Results That Further Joins NPE On
[ https://issues.apache.org/jira/browse/KAFKA-9517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler reassigned KAFKA-9517: --- Assignee: John Roesler > KTable Joins Without Materialized Argument Yield Results That Further Joins > NPE On > -- > > Key: KAFKA-9517 > URL: https://issues.apache.org/jira/browse/KAFKA-9517 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Paul Snively >Assignee: John Roesler >Priority: Blocker > Fix For: 2.5.0, 2.4.1 > > > The `KTable` API implemented [[here||#L842-L844]] > [https://github.com/apache/kafka/blob/2.4.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L842-L844] > []|#L842-L844]] calls `doJoinOnForeignKey` with an argument of > `Materialized.with(null, null)`, as apparently do several other APIs. As the > comment spanning [these lines|#L1098-L1099]] makes clear, the result is a > `KTable` whose `valueSerde` (as a `KTableImpl`) is `null`. Therefore, > attempts to `join` etc. on the resulting `KTable` fail with a > `NullPointerException`. > While there is an obvious workaround—explicitly construct the required > `Materialized` and use the APIs that take it as an argument—I have to admit I > find the existence of public APIs with this sort of bug, particularly when > the bug is literally documented as a comment in the source code, astonishing > to the point of incredulity. It calls the quality and trustworthiness of > Kafka Streams into serious question, and if a resolution is not forthcoming > within a week, we will be left with no other option but to consider technical > alternatives. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9517) KTable Joins Without Materialized Argument Yield Results That Further Joins NPE On
[ https://issues.apache.org/jira/browse/KAFKA-9517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032630#comment-17032630 ] John Roesler commented on KAFKA-9517: - Hi [~psnively], thanks for your kind words. Actually, the 2.4.1 release is currently in progress. Fortunately, we caught these issues early enough to have them included as blockers for the release. The release plan is here: https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+2.4.1 Of course, it cannot progress until the blockers are resolved, which is the biggest wild-card in the timeline. As soon as the blockers are cleared, Bill (who volunteered to drive the release) will be able to give a better estimate about the timeline. As you said, the biggest thing you can do to help is to check out the PRs and test them. I think you'll need both #8015 and #8061. As per Apache Kafka policies, the PRs are actually based on trunk, so you'll want to squash them and cherry-pick them onto 2.4 to get an accurate proxy for 2.4.1 . This is actually a huge help, since even extensive testing can have subtle but important gaps (which is how we wound up with these bugs to begin with). The other bug thing you can do if you have time is review the PRs. You've already become familiar enough with the code to identify the root cause even before I saw it, and a fresh perspective is always helpful. Thanks again, -John > KTable Joins Without Materialized Argument Yield Results That Further Joins > NPE On > -- > > Key: KAFKA-9517 > URL: https://issues.apache.org/jira/browse/KAFKA-9517 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Paul Snively >Priority: Blocker > Fix For: 2.5.0, 2.4.1 > > > The `KTable` API implemented [[here||#L842-L844]] > [https://github.com/apache/kafka/blob/2.4.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L842-L844] > []|#L842-L844]] calls `doJoinOnForeignKey` with an argument of > `Materialized.with(null, null)`, as apparently do several other APIs. As the > comment spanning [these lines|#L1098-L1099]] makes clear, the result is a > `KTable` whose `valueSerde` (as a `KTableImpl`) is `null`. Therefore, > attempts to `join` etc. on the resulting `KTable` fail with a > `NullPointerException`. > While there is an obvious workaround—explicitly construct the required > `Materialized` and use the APIs that take it as an argument—I have to admit I > find the existence of public APIs with this sort of bug, particularly when > the bug is literally documented as a comment in the source code, astonishing > to the point of incredulity. It calls the quality and trustworthiness of > Kafka Streams into serious question, and if a resolution is not forthcoming > within a week, we will be left with no other option but to consider technical > alternatives. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7052) ExtractField SMT throws NPE - needs clearer error message
[ https://issues.apache.org/jira/browse/KAFKA-7052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032625#comment-17032625 ] Randall Hauch commented on KAFKA-7052: -- Any change to an API requires a KIP, even if it's a backward compatible change (which most are). {quote}"fail" for records with a schema (it's raising an {{IllegalArgumentException}} but could be NPE of course if the exact exception type is a concern; I think it shouldn't, as the original NPE really is a weakness of the existing implementation that shouldn't be relied upon {quote} I'm not sure that the type of exception matters, and IAE would be more clear. > ExtractField SMT throws NPE - needs clearer error message > - > > Key: KAFKA-7052 > URL: https://issues.apache.org/jira/browse/KAFKA-7052 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Robin Moffatt >Priority: Major > > With the following Single Message Transform: > {code:java} > "transforms.ExtractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key", > "transforms.ExtractId.field":"id"{code} > Kafka Connect errors with : > {code:java} > java.lang.NullPointerException > at > org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61) > at > org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38){code} > There should be a better error message here, identifying the reason for the > NPE. > Version: Confluent Platform 4.1.1 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9517) KTable Joins Without Materialized Argument Yield Results That Further Joins NPE On
[ https://issues.apache.org/jira/browse/KAFKA-9517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-9517: --- Fix Version/s: 2.4.1 2.5.0 > KTable Joins Without Materialized Argument Yield Results That Further Joins > NPE On > -- > > Key: KAFKA-9517 > URL: https://issues.apache.org/jira/browse/KAFKA-9517 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Paul Snively >Priority: Critical > Fix For: 2.5.0, 2.4.1 > > > The `KTable` API implemented [[here||#L842-L844]] > [https://github.com/apache/kafka/blob/2.4.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L842-L844] > []|#L842-L844]] calls `doJoinOnForeignKey` with an argument of > `Materialized.with(null, null)`, as apparently do several other APIs. As the > comment spanning [these lines|#L1098-L1099]] makes clear, the result is a > `KTable` whose `valueSerde` (as a `KTableImpl`) is `null`. Therefore, > attempts to `join` etc. on the resulting `KTable` fail with a > `NullPointerException`. > While there is an obvious workaround—explicitly construct the required > `Materialized` and use the APIs that take it as an argument—I have to admit I > find the existence of public APIs with this sort of bug, particularly when > the bug is literally documented as a comment in the source code, astonishing > to the point of incredulity. It calls the quality and trustworthiness of > Kafka Streams into serious question, and if a resolution is not forthcoming > within a week, we will be left with no other option but to consider technical > alternatives. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9517) KTable Joins Without Materialized Argument Yield Results That Further Joins NPE On
[ https://issues.apache.org/jira/browse/KAFKA-9517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-9517: --- Priority: Blocker (was: Critical) > KTable Joins Without Materialized Argument Yield Results That Further Joins > NPE On > -- > > Key: KAFKA-9517 > URL: https://issues.apache.org/jira/browse/KAFKA-9517 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Paul Snively >Priority: Blocker > Fix For: 2.5.0, 2.4.1 > > > The `KTable` API implemented [[here||#L842-L844]] > [https://github.com/apache/kafka/blob/2.4.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L842-L844] > []|#L842-L844]] calls `doJoinOnForeignKey` with an argument of > `Materialized.with(null, null)`, as apparently do several other APIs. As the > comment spanning [these lines|#L1098-L1099]] makes clear, the result is a > `KTable` whose `valueSerde` (as a `KTableImpl`) is `null`. Therefore, > attempts to `join` etc. on the resulting `KTable` fail with a > `NullPointerException`. > While there is an obvious workaround—explicitly construct the required > `Materialized` and use the APIs that take it as an argument—I have to admit I > find the existence of public APIs with this sort of bug, particularly when > the bug is literally documented as a comment in the source code, astonishing > to the point of incredulity. It calls the quality and trustworthiness of > Kafka Streams into serious question, and if a resolution is not forthcoming > within a week, we will be left with no other option but to consider technical > alternatives. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9517) KTable Joins Without Materialized Argument Yield Results That Further Joins NPE On
[ https://issues.apache.org/jira/browse/KAFKA-9517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032616#comment-17032616 ] ASF GitHub Bot commented on KAFKA-9517: --- vvcephei commented on pull request #8061: KAFKA-9517: Fix default serdes with FK join URL: https://github.com/apache/kafka/pull/8061 During the KIP-213 implementation and verification, we neglected to test the code path for falling back to default serdes if none are given in the topology. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KTable Joins Without Materialized Argument Yield Results That Further Joins > NPE On > -- > > Key: KAFKA-9517 > URL: https://issues.apache.org/jira/browse/KAFKA-9517 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Paul Snively >Priority: Critical > > The `KTable` API implemented [[here||#L842-L844]] > [https://github.com/apache/kafka/blob/2.4.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L842-L844] > []|#L842-L844]] calls `doJoinOnForeignKey` with an argument of > `Materialized.with(null, null)`, as apparently do several other APIs. As the > comment spanning [these lines|#L1098-L1099]] makes clear, the result is a > `KTable` whose `valueSerde` (as a `KTableImpl`) is `null`. Therefore, > attempts to `join` etc. on the resulting `KTable` fail with a > `NullPointerException`. > While there is an obvious workaround—explicitly construct the required > `Materialized` and use the APIs that take it as an argument—I have to admit I > find the existence of public APIs with this sort of bug, particularly when > the bug is literally documented as a comment in the source code, astonishing > to the point of incredulity. It calls the quality and trustworthiness of > Kafka Streams into serious question, and if a resolution is not forthcoming > within a week, we will be left with no other option but to consider technical > alternatives. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032604#comment-17032604 ] Evan Williams commented on KAFKA-4084: -- [~sql_consulting] That would be great! We are running the latest Confluent 5.4. However Zookeeper is whatever is bundled with 5.3 currently. Another option I was thinking about, is separating client, replication and controller traffic via multiple NIC's. At the moment, we just have one listener/advertised listener and one NIC per broker. I guess this would also guarantee that even if threads get exhausted for replication traffic, that client traffic would be unaffected ? > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk. Clients with {{acks=-1}} also see issues at this > time, because replication is effectively stalled. > To quote Todd Palino from the mailing list: > bq. There is an admin CLI command to trigger the preferred replica election > manually. There is also a broker configuration “auto.leader.rebalance.enable” > which you can set to have the broker automatically perform the PLE when > needed. DO NOT USE THIS OPTION. There are serious performance issues when > doing so, especially on larger clusters. It needs some development work that > has not been fully identified yet. > This setting is extremely useful for smaller clusters, but with high > partition counts causes the huge issues stated above. > One potential fix could be adding a new configuration for the number of > partitions to do automated leader rebalancing for at once, and *stop* once > that number of leader rebalances are in flight, until they're done. There may > be better mechanisms, and I'd love to hear if anybody has any ideas. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032598#comment-17032598 ] GEORGE LI commented on KAFKA-4084: -- [~blodsbror] Which kafka version you are running? Maybe I can provide a diff for the KIP-491 changes for you to apply at your end to try it out? > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk. Clients with {{acks=-1}} also see issues at this > time, because replication is effectively stalled. > To quote Todd Palino from the mailing list: > bq. There is an admin CLI command to trigger the preferred replica election > manually. There is also a broker configuration “auto.leader.rebalance.enable” > which you can set to have the broker automatically perform the PLE when > needed. DO NOT USE THIS OPTION. There are serious performance issues when > doing so, especially on larger clusters. It needs some development work that > has not been fully identified yet. > This setting is extremely useful for smaller clusters, but with high > partition counts causes the huge issues stated above. > One potential fix could be adding a new configuration for the number of > partitions to do automated leader rebalancing for at once, and *stop* once > that number of leader rebalances are in flight, until they're done. There may > be better mechanisms, and I'd love to hear if anybody has any ideas. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9274) Gracefully handle timeout exceptions on Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-9274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032577#comment-17032577 ] ASF GitHub Bot commented on KAFKA-9274: --- guozhangwang commented on pull request #8060: KAFKA-9274: Gracefully handle timeout exception [WIP] URL: https://github.com/apache/kafka/pull/8060 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Gracefully handle timeout exceptions on Kafka Streams > - > > Key: KAFKA-9274 > URL: https://issues.apache.org/jira/browse/KAFKA-9274 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Boyang Chen >Priority: Major > > Right now streams don't treat timeout exception as retriable in general by > throwing it to the application level. If not handled by the user, this would > kill the stream thread unfortunately. > In fact, timeouts happen mostly due to network issue or server side > unavailability. Hard failure on client seems to be an over-kill. > We would like to discuss what's the best practice to handle timeout > exceptions on Streams. The current state is still brainstorming and > consolidate all the cases that contain timeout exception within this ticket. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7052) ExtractField SMT throws NPE - needs clearer error message
[ https://issues.apache.org/jira/browse/KAFKA-7052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032552#comment-17032552 ] Gunnar Morling commented on KAFKA-7052: --- Thanks for commenting, [~rhauch]! My intention was to actually keep the current behavior by means of the default setting of the new option: * "fail" for records with a schema (it's raising an {{IllegalArgumentException}} but could be NPE of course if the exact exception type is a concern; I think it shouldn't, as the original NPE really is a weakness of the existing implementation that shouldn't be relied upon * "return-null" for records without schema I.e. without any explicit setting, the behavior will be exactly be the same as today (ignoring the changed exception type). That's why I didn't assume that'd need a KIP, but as per what you're saying, any new option mandates a KIP? > ExtractField SMT throws NPE - needs clearer error message > - > > Key: KAFKA-7052 > URL: https://issues.apache.org/jira/browse/KAFKA-7052 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Robin Moffatt >Priority: Major > > With the following Single Message Transform: > {code:java} > "transforms.ExtractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key", > "transforms.ExtractId.field":"id"{code} > Kafka Connect errors with : > {code:java} > java.lang.NullPointerException > at > org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61) > at > org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38){code} > There should be a better error message here, identifying the reason for the > NPE. > Version: Confluent Platform 4.1.1 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8307) Kafka Streams should provide some mechanism to determine topology equality and compatibility
[ https://issues.apache.org/jira/browse/KAFKA-8307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032525#comment-17032525 ] Guozhang Wang commented on KAFKA-8307: -- Thanks for the summary [~vvcephei]. I agree with you that having a verification mechanism that two topologies are compatible or equal. Besides that, though, I'm thinking that Streams itself should be robust to the user code in determining the ordering of the operators (and more importantly, the naming suffix of the operators) -- since we now have an internal logical representation of the topology before generating the physical Topology, we should make the generation process to be somehow "deterministic" such that no matter you write: stream1.join(stream2) stream1.groupBy().aggregate() OR stream1.groupBy().aggregate() stream1.join(stream2) The generated topology would be the same in terms of the operator ordering (today they would be different). > Kafka Streams should provide some mechanism to determine topology equality > and compatibility > > > Key: KAFKA-8307 > URL: https://issues.apache.org/jira/browse/KAFKA-8307 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: user-experience > > Currently, Streams provides no mechanism to compare two topologies. This is a > common operation when users want to have tests verifying that they don't > accidentally alter their topology. They would save the known-good topology > and then add a unit test verifying the current code against that known-good > state. > However, because there's no way to do this comparison properly, everyone is > reduced to using the string format of the topology (from > `Topology#describe().toString()`). The major drawback is that the string > format is meant for human consumption. It is neither machine-parseable nor > stable. So, these compatibility tests are doomed to fail when any minor, > non-breaking, change is made either to the application, or to the library. > This trains everyone to update the test whenever it fails, undermining its > utility. > We should fix this problem, and provide both a mechanism to serialize the > topology and to compare two topologies for compatibility. All in all, I think > we need: > # a way to serialize/deserialize topology structure in a machine-parseable > format that is future-compatible. Offhand, I'd recommend serializing the > topology structure as JSON, and establishing a policy that attributes should > only be added to the object graph, never removed. Note, it's out of scope to > be able to actually run a deserialized topology; we only want to save and > load the structure (not the logic) to facilitate comparisons. > # a method to verify the *equality* of two topologies... This method tells > you that the two topologies are structurally identical. We can't know if the > logic of any operator has changed, only if the structure of the graph is > changed. We can consider whether other graph properties, like serdes, should > be included. > # a method to verify the *compatibility* of two topologies... This method > tells you that moving from topology A to topology B does not require an > application reset. Note that this operation is not commutative: > `A.compatibleWith(B)` does not imply `B.compatibleWith(A)`. We can discuss > whether `A.compatibleWith(B) && B.compatibleWith(A)` implies `A.equals(B)` (I > think not necessarily, because we may want "equality" to be stricter than > "compatibility"). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9517) KTable Joins Without Materialized Argument Yield Results That Further Joins NPE On
[ https://issues.apache.org/jira/browse/KAFKA-9517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032520#comment-17032520 ] Paul Snively commented on KAFKA-9517: - Speaking of things I can do: I am downloading PR #8015 as a patch, and will apply it locally, build the appropriate `.jar`s, and we will attempt to reproduce the issues we've seen given that PR, and report back. > KTable Joins Without Materialized Argument Yield Results That Further Joins > NPE On > -- > > Key: KAFKA-9517 > URL: https://issues.apache.org/jira/browse/KAFKA-9517 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Paul Snively >Priority: Critical > > The `KTable` API implemented [[here||#L842-L844]] > [https://github.com/apache/kafka/blob/2.4.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L842-L844] > []|#L842-L844]] calls `doJoinOnForeignKey` with an argument of > `Materialized.with(null, null)`, as apparently do several other APIs. As the > comment spanning [these lines|#L1098-L1099]] makes clear, the result is a > `KTable` whose `valueSerde` (as a `KTableImpl`) is `null`. Therefore, > attempts to `join` etc. on the resulting `KTable` fail with a > `NullPointerException`. > While there is an obvious workaround—explicitly construct the required > `Materialized` and use the APIs that take it as an argument—I have to admit I > find the existence of public APIs with this sort of bug, particularly when > the bug is literally documented as a comment in the source code, astonishing > to the point of incredulity. It calls the quality and trustworthiness of > Kafka Streams into serious question, and if a resolution is not forthcoming > within a week, we will be left with no other option but to consider technical > alternatives. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7052) ExtractField SMT throws NPE - needs clearer error message
[ https://issues.apache.org/jira/browse/KAFKA-7052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032514#comment-17032514 ] Randall Hauch commented on KAFKA-7052: -- On second though, I'm not sure we can change the behavior without risking backward-incompatible changes. First, the current behavior when the SMT can't be applied is to fail with an NPE. We try to use meaningful exceptions with useful error messages, and the lack of any code to check for this situation likely means this case was simply not considered and throwing an NPE is unintentional. This is an argument for changing the behavior to skip any record for which the specified field is not found. Second, it's probably not practical for users to rely upon this existing NPE behavior when used with a +source+ connector, since that would leave the connector in a failed state without advancing offsets. Essentially, the connector would be stuck and unable to continue unless the configuration is changed. However, if someone were to use this SMT with a +sink+ connector and use the DLQ functionality, they might be relying upon the NPE to signal that the record should go to the DLQ. If we were to change the behavior, the record would no longer go to the DLQ and instead would get sent to any subsequent transformation and ultimately to the sink connector. Therefore, changing to have the SMT skip any record for which the field is not found is technically not a backward compatible change, and this would require a KIP or, better yet, require introducing a new configuration property (as mentioned in my previous comment) that would default to "fail" to maintain backward compatibility. Thoughts? > ExtractField SMT throws NPE - needs clearer error message > - > > Key: KAFKA-7052 > URL: https://issues.apache.org/jira/browse/KAFKA-7052 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Robin Moffatt >Priority: Major > > With the following Single Message Transform: > {code:java} > "transforms.ExtractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key", > "transforms.ExtractId.field":"id"{code} > Kafka Connect errors with : > {code:java} > java.lang.NullPointerException > at > org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61) > at > org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38){code} > There should be a better error message here, identifying the reason for the > NPE. > Version: Confluent Platform 4.1.1 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8810) Add mechanism to detect topology mismatch between streams instances
[ https://issues.apache.org/jira/browse/KAFKA-8810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032509#comment-17032509 ] John Roesler commented on KAFKA-8810: - This issue has been lurking for a while, and has been reported a number of different ways. It seems to take two forms: 1. changing the topology at all (in apparently compatible ways) can renumber operators and corrupt the application upon restart 2. changing the topology in combination with a rolling bounce results in members executing a different topology than the leader, which leads to extra problems (such as NPEs) https://issues.apache.org/jira/browse/KAFKA-7669 is related, and seems to be more about just changing the topology at all https://issues.apache.org/jira/browse/KAFKA-8307 proposes a fix https://issues.apache.org/jira/browse/KAFKA-8810 seems to be a duplicate of KAFKA-8307 and https://issues.apache.org/jira/browse/KAFKA-9518 reports an exception that results from a rolling-bounce topology change. > Add mechanism to detect topology mismatch between streams instances > --- > > Key: KAFKA-8810 > URL: https://issues.apache.org/jira/browse/KAFKA-8810 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Vinoth Chandar >Priority: Major > > Noticed this while reading through the StreamsPartitionAssignor related code. > If an user accidentally deploys a different topology on one of the instances, > there is no mechanism to detect this and refuse assignment/take action. Given > Kafka Streams is designed as an embeddable library, I feel this is rather an > important scenario to handle. For e.g, kafka streams is embedded into a web > front end tier and operators deploy a hot fix for a site issue to a few > instances that are leaking memory and that accidentally also deploys some > topology changes with it. > Please feel free to close the issue, if its a duplicate. (Could not find a > ticket for this) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8307) Kafka Streams should provide some mechanism to determine topology equality and compatibility
[ https://issues.apache.org/jira/browse/KAFKA-8307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032511#comment-17032511 ] John Roesler commented on KAFKA-8307: - This issue has been lurking for a while, and has been reported a number of different ways. It seems to take two forms: 1. changing the topology at all (in apparently compatible ways) can renumber operators and corrupt the application upon restart 2. changing the topology in combination with a rolling bounce results in members executing a different topology than the leader, which leads to extra problems (such as NPEs) https://issues.apache.org/jira/browse/KAFKA-7669 is related, and seems to be more about just changing the topology at all https://issues.apache.org/jira/browse/KAFKA-8307 proposes a fix https://issues.apache.org/jira/browse/KAFKA-8810 seems to be a duplicate of KAFKA-8307 and https://issues.apache.org/jira/browse/KAFKA-9518 reports an exception that results from a rolling-bounce topology change. > Kafka Streams should provide some mechanism to determine topology equality > and compatibility > > > Key: KAFKA-8307 > URL: https://issues.apache.org/jira/browse/KAFKA-8307 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: user-experience > > Currently, Streams provides no mechanism to compare two topologies. This is a > common operation when users want to have tests verifying that they don't > accidentally alter their topology. They would save the known-good topology > and then add a unit test verifying the current code against that known-good > state. > However, because there's no way to do this comparison properly, everyone is > reduced to using the string format of the topology (from > `Topology#describe().toString()`). The major drawback is that the string > format is meant for human consumption. It is neither machine-parseable nor > stable. So, these compatibility tests are doomed to fail when any minor, > non-breaking, change is made either to the application, or to the library. > This trains everyone to update the test whenever it fails, undermining its > utility. > We should fix this problem, and provide both a mechanism to serialize the > topology and to compare two topologies for compatibility. All in all, I think > we need: > # a way to serialize/deserialize topology structure in a machine-parseable > format that is future-compatible. Offhand, I'd recommend serializing the > topology structure as JSON, and establishing a policy that attributes should > only be added to the object graph, never removed. Note, it's out of scope to > be able to actually run a deserialized topology; we only want to save and > load the structure (not the logic) to facilitate comparisons. > # a method to verify the *equality* of two topologies... This method tells > you that the two topologies are structurally identical. We can't know if the > logic of any operator has changed, only if the structure of the graph is > changed. We can consider whether other graph properties, like serdes, should > be included. > # a method to verify the *compatibility* of two topologies... This method > tells you that moving from topology A to topology B does not require an > application reset. Note that this operation is not commutative: > `A.compatibleWith(B)` does not imply `B.compatibleWith(A)`. We can discuss > whether `A.compatibleWith(B) && B.compatibleWith(A)` implies `A.equals(B)` (I > think not necessarily, because we may want "equality" to be stricter than > "compatibility"). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9518) NullPointerException on out-of-order topologies
[ https://issues.apache.org/jira/browse/KAFKA-9518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032508#comment-17032508 ] John Roesler commented on KAFKA-9518: - This issue has been lurking for a while, and has been reported a number of different ways. It seems to take two forms: 1. changing the topology at all (in apparently compatible ways) can renumber operators and corrupt the application upon restart 2. changing the topology in combination with a rolling bounce results in members executing a different topology than the leader, which leads to extra problems (such as NPEs) https://issues.apache.org/jira/browse/KAFKA-7669 is related, and seems to be more about just changing the topology at all https://issues.apache.org/jira/browse/KAFKA-8307 proposes a fix https://issues.apache.org/jira/browse/KAFKA-8810 seems to be a duplicate of KAFKA-8307 and https://issues.apache.org/jira/browse/KAFKA-9518 reports an exception that results from a rolling-bounce topology change. > NullPointerException on out-of-order topologies > --- > > Key: KAFKA-9518 > URL: https://issues.apache.org/jira/browse/KAFKA-9518 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.1, 2.4.0, 2.3.1 >Reporter: Murilo Tavares >Priority: Minor > Attachments: kafka-streams-testing.zip > > > I have a KafkaStreams that dinamically builds a topology based on a Map of > input-to-output topics. Since the map was not sorted, iteration was > unpredictable, and different instances could have different orders. When this > happen, KafkaStreams throws an exception during REBALANCE. > > I was able to reproduce this using the attached java project. The project is > a pretty simple Maven project with one class. It starts 2 instances in > parallel, with the same input-to-output topics, but one instance takes the > topics in a reversed order. > > The exception is this: > {noformat} > Exception in thread > "MY-APP-81e9dc0b-1459-4499-93d6-b5c03da60e18-StreamThread-1" > org.apache.kafka.streams.errors.StreamsException: stream-thread > [MY-APP-81e9dc0b-1459-4499-93d6-b5c03da60e18-StreamThread-1] Failed to > rebalance. > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:852) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) > Caused by: java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:234) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:176) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:355) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:313) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:298) > at > org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160) > at > org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120) > at > org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:272) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:400) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843) > ... 3 more{noformat} > > The topology for both instances: > {code:java} > // instance1 > Topologies: > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [topicA]) > --> KSTREAM-SINK-01 > Sink: KSTREAM-SINK-01 (topic: topicA-repartitioned) > <-- KSTREAM-SOURCE-00 > Sub-topology: 1 >
[jira] [Commented] (KAFKA-7669) Stream topology definition is not robust to the ordering changes
[ https://issues.apache.org/jira/browse/KAFKA-7669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032512#comment-17032512 ] John Roesler commented on KAFKA-7669: - This issue has been lurking for a while, and has been reported a number of different ways. It seems to take two forms: 1. changing the topology at all (in apparently compatible ways) can renumber operators and corrupt the application upon restart 2. changing the topology in combination with a rolling bounce results in members executing a different topology than the leader, which leads to extra problems (such as NPEs) https://issues.apache.org/jira/browse/KAFKA-7669 is related, and seems to be more about just changing the topology at all https://issues.apache.org/jira/browse/KAFKA-8307 proposes a fix https://issues.apache.org/jira/browse/KAFKA-8810 seems to be a duplicate of KAFKA-8307 and https://issues.apache.org/jira/browse/KAFKA-9518 reports an exception that results from a rolling-bounce topology change. > Stream topology definition is not robust to the ordering changes > > > Key: KAFKA-7669 > URL: https://issues.apache.org/jira/browse/KAFKA-7669 > Project: Kafka > Issue Type: Wish > Components: streams >Affects Versions: 2.0.0 >Reporter: Mateusz Owczarek >Priority: Major > > It seems that if the user does not guarantee the order of the stream topology > definition, he may end up with multiple stream branches having the same > internal changelog (and repartition, if created) topic. > Let's assume: > {code:java} > val initialStream = new StreamsBuilder().stream(sth); > val someStrings = (1 to 10).map(_.toString) > val notGuaranteedOrderOfStreams: Map[String, KStream[...]] = > someStrings.map(s => s -> initialStream.filter(...)).toMap{code} > When the user defines now common aggregation logic for the > notGuaranteedOrderOfStreams, and runs multiple instances of the application > the KSTREAM-AGGREGATE-STATE-STORE topics names will not be unique and will > contain results of the different streams from notGuaranteedOrderOfStreams map. > All of this without a single warning that the topology (or just the order of > the topology definition) differs in different instances of the Kafka Streams > application. > Also, I am concerned that ids in "KSTREAM-AGGREGATE-STATE-STORE-id-changelog > " match so well for the different application instances (and different > topologies). > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7052) ExtractField SMT throws NPE - needs clearer error message
[ https://issues.apache.org/jira/browse/KAFKA-7052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032488#comment-17032488 ] Randall Hauch commented on KAFKA-7052: -- Thanks for the discussion, everyone. As soon as we introduce a configuration option on a Connect-provided SMT, we need a KIP and we can't backport the change. I would suggestion the following: # One PR to improve the error message per [~rmoff]'s earlier comment (e.g., "ExtractField : field 'id' not found in Key") but otherwise not change the behavior. Or, we make the message better and we decide that "skip" is an appropriate fix rather than an NPE; this may need more discussion, but I think NPE is a bad UX and probably rarely useful. This would be able to be merged on trunk and backported to 2-3 branches (standard practice). # Create a new Jira issue and small KIP and another PR to add the proposed ExtractField configuration property, such as "{{behavior.on.non.existant.field}} = {{(fail|drop|skip)}}". Note I'm suggesting using "skip" rather than "passon" to keep things simple. [~gunnar.morling]'s PR would apply to this new Jira issue and KIP rather than to this issue. # Create a new Jira issue and KIP to define an optional topic filter property for each transformation that would specify a regex pattern matching the topics to which the SMT should apply, and defaulting to ".*" to maintain the current behavior. I'm not sure how feasible this will be to ensure it never clashes with SMT-specific properties, but it's worth investigating. Important: We've already missed the KIP acceptance deadline for AK 2.5, so the earliest #2 and #3 could appear is AK 2.6. WDYT? > ExtractField SMT throws NPE - needs clearer error message > - > > Key: KAFKA-7052 > URL: https://issues.apache.org/jira/browse/KAFKA-7052 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Robin Moffatt >Priority: Major > > With the following Single Message Transform: > {code:java} > "transforms.ExtractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key", > "transforms.ExtractId.field":"id"{code} > Kafka Connect errors with : > {code:java} > java.lang.NullPointerException > at > org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61) > at > org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38){code} > There should be a better error message here, identifying the reason for the > NPE. > Version: Confluent Platform 4.1.1 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9177) Pause completed partitions on restore consumer
[ https://issues.apache.org/jira/browse/KAFKA-9177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-9177. -- Fix Version/s: 2.6.0 Assignee: Guozhang Wang Resolution: Fixed As part of KAFKA-9113 fix, we will pause the restore consumer once the corresponding partition has completed restoration, so I'm resolving this ticket now. > Pause completed partitions on restore consumer > -- > > Key: KAFKA-9177 > URL: https://issues.apache.org/jira/browse/KAFKA-9177 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sophie Blee-Goldman >Assignee: Guozhang Wang >Priority: Major > Fix For: 2.6.0 > > > The StoreChangelogReader is responsible for tracking and restoring active > tasks, but once a store has finished restoring it will continue polling for > records on that partition. > Ordinarily this doesn't make a difference as a store is not completely > restored until its entire changelog has been read, so there are no more > records for poll to return anyway. But if the restoring state is actually an > optimized source KTable, the changelog is just the source topic and poll will > keep returning records for that partition until all stores have been restored. > Note that this isn't a correctness issue since it's just the restore > consumer, but it is wasteful to be polling for records and throwing them > away. We should pause completed partitions in StoreChangelogReader so we > don't slow down the restore consumer in reading from the unfinished changelog > topics, and avoid wasted network. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9517) KTable Joins Without Materialized Argument Yield Results That Further Joins NPE On
[ https://issues.apache.org/jira/browse/KAFKA-9517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032426#comment-17032426 ] Paul Snively commented on KAFKA-9517: - [~vvcephei], that's great news! First, let me thank you again for your prompt attention. It's done a great deal to restore my confidence in Kafka Streams. Second, and I hate to ask because I know it's a big project with many other customers with various needs, but do you happen to have some idea when a 2.4.1 might be available with fixes for these, and is there anything I can do to help with the process? Thanks again! > KTable Joins Without Materialized Argument Yield Results That Further Joins > NPE On > -- > > Key: KAFKA-9517 > URL: https://issues.apache.org/jira/browse/KAFKA-9517 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Paul Snively >Priority: Critical > > The `KTable` API implemented [[here||#L842-L844]] > [https://github.com/apache/kafka/blob/2.4.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L842-L844] > []|#L842-L844]] calls `doJoinOnForeignKey` with an argument of > `Materialized.with(null, null)`, as apparently do several other APIs. As the > comment spanning [these lines|#L1098-L1099]] makes clear, the result is a > `KTable` whose `valueSerde` (as a `KTableImpl`) is `null`. Therefore, > attempts to `join` etc. on the resulting `KTable` fail with a > `NullPointerException`. > While there is an obvious workaround—explicitly construct the required > `Materialized` and use the APIs that take it as an argument—I have to admit I > find the existence of public APIs with this sort of bug, particularly when > the bug is literally documented as a comment in the source code, astonishing > to the point of incredulity. It calls the quality and trustworthiness of > Kafka Streams into serious question, and if a resolution is not forthcoming > within a week, we will be left with no other option but to consider technical > alternatives. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032285#comment-17032285 ] Evan Williams edited comment on KAFKA-4084 at 2/7/20 1:07 PM: -- [~sql_consulting] We are using min.insync.replicas=1. And have replication.factor=3 or above for most topics (6 brokers). As a side note, one interesting thing I've seen reported now from the owners of the clients (streams) is that, for certain topics/partitions - they had no leader elected, even if there was a clean shutdown of the bootstrapping broker. So something is quite weird there. What might cause that ? But yes, I think there is a clear case for KIP-491 in this scenario, to just blacklist a broker from becoming leader until x factor is satisfied, or it's manually removed. was (Author: blodsbror): [~sql_consulting] We are using min.insync.replicas=1. And have replication.factor=3 or above for most topics (6 brokers). As a side note, one interesting thing I've seen reported now from the owners of the clients (streams) is that, for certain topics/partitions - they had no leader elected, even if there was a clean shutdown of the bootstrapping broker. So something is quite weird there. What might cause that ? But yes, I think there is a clear case for KIP-491 in this scenario, of URP to just blacklist a broker from becoming leader until x factor is satisfied, or it's manually removed. > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk. Clients with {{acks=-1}} also see issues at this > time, because replication is effectively stalled. > To quote Todd Palino from the mailing list: > bq. There is an admin CLI command to trigger the preferred replica election > manually. There is also a broker configuration “auto.leader.rebalance.enable” > which you can set to have the broker automatically perform the PLE when > needed. DO NOT USE THIS OPTION. There are serious performance issues when > doing so, especially on larger clusters. It needs some development work that > has not been fully identified yet. > This setting is extremely useful for smaller clusters, but with high > partition counts causes the huge issues stated above. > One potential fix could be adding a new configuration for the number of > partitions to do automated leader rebalancing for at once, and *stop* once > that number of leader rebalances are in flight, until they're done. There may > be better mechanisms, and I'd love to hear if anybody has any ideas. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9522) kafka-connect failed when the topic can not be accessed
[ https://issues.apache.org/jira/browse/KAFKA-9522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Deblock updated KAFKA-9522: --- Description: The kafka-connect fail if the topic can not be join (permission issue or topic doesn't exists). This issue happend using Debezium CDC : [https://issues.redhat.com/browse/DBZ-1770] The topic can be choosen using a column on the database. If the topic on a database contains an issue (permission issue or topic doesn't exists), the connect stop to work, and new event will not be sent. The exception is thrown on `WorkerSourceTask` by method `maybeThrowProducerSendException`. Maybe add a parameter to not fail on exception. was: The kafka-connect fail if the topic can not be join (permission issue or topic doesn't exists). This issue happend using Debezium CDC : [https://issues.redhat.com/browse/DBZ-1770] The topic can be choosen using a column on the database. If the topic on a database contains an issue (permission issue or topic doesn't exists), the connect stop to work, and new event will not be sent. > kafka-connect failed when the topic can not be accessed > --- > > Key: KAFKA-9522 > URL: https://issues.apache.org/jira/browse/KAFKA-9522 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.3.1 >Reporter: Deblock >Priority: Major > > The kafka-connect fail if the topic can not be join (permission issue or > topic doesn't exists). > > This issue happend using Debezium CDC : > [https://issues.redhat.com/browse/DBZ-1770] > > The topic can be choosen using a column on the database. If the topic on a > database contains an issue (permission issue or topic doesn't exists), the > connect stop to work, and new event will not be sent. > > The exception is thrown on `WorkerSourceTask` by method > `maybeThrowProducerSendException`. Maybe add a parameter to not fail on > exception. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9522) kafka-connect failed when the topic can not be accessed
[ https://issues.apache.org/jira/browse/KAFKA-9522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Deblock updated KAFKA-9522: --- Description: The kafka-connect fail if the topic can not be join (permission issue or topic doesn't exists). This issue happend using Debezium CDC : [https://issues.redhat.com/browse/DBZ-1770] The topic can be choosen using a column on the database. If the topic on a database contains an issue (permission issue or topic doesn't exists), the connect stop to work, and new event will not be sent. was: The kafka-connect fail if the topic can not be join (permission issue or topic doesn't exists). This issue happend using Debezium CDC : [https://issues.redhat.com/browse/DBZ-1770] The topic can be choosen using a column on the database. If the topic on a database contains an issue (permission issue or topic doesn't exists), the connect stop to work, and new event will not be sent. > kafka-connect failed when the topic can not be accessed > --- > > Key: KAFKA-9522 > URL: https://issues.apache.org/jira/browse/KAFKA-9522 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.3.1 >Reporter: Deblock >Priority: Major > > The kafka-connect fail if the topic can not be join (permission issue or > topic doesn't exists). > > This issue happend using Debezium CDC : > [https://issues.redhat.com/browse/DBZ-1770] > > The topic can be choosen using a column on the database. If the topic on a > database contains an issue (permission issue or topic doesn't exists), the > connect stop to work, and new event will not be sent. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032285#comment-17032285 ] Evan Williams edited comment on KAFKA-4084 at 2/7/20 11:58 AM: --- [~sql_consulting] We are using min.insync.replicas=1. And have replication.factor=3 or above for most topics (6 brokers). As a side note, one interesting thing I've seen reported now from the owners of the clients (streams) is that, for certain topics/partitions - they had no leader elected, even if there was a clean shutdown of the bootstrapping broker. So something is quite weird there. What might cause that ? But yes, I think there is a clear case for KIP-491 in this scenario, of URP to just blacklist a broker from becoming leader until x factor is satisfied, or it's manually removed. was (Author: blodsbror): [~sql_consulting] We are using min.insync.replicas=1. And have replication.factor=3 or above for most topics (6 brokers). As a side note, one interesting thing I've seen reported now from the owners of the clients (streams) is that, for certain topics/partitions - they had no leader, even if there was a clean shutdown of the bootstrapping broker. So something is quite weird there. What might cause that ? Topic was marked for deletion, without any user requesting this: Topic: data.vehicle-topic.journey-dpi PartitionCount: 6 ReplicationFactor: 3 Configs: cleanup.policy=delete MarkedForDeletion: true Topic: topic.name: 0 Leader: none Replicas: 54,52,53 Isr: 54 MarkedForDeletion: true Topic: topic.name: 1 Leader: none Replicas: 82,53,54 Isr: 82 MarkedForDeletion: true Topic: topic.name: 2 Leader: none Replicas: 83,54,82 Isr: 83 MarkedForDeletion: true Topic: topic.name: 3 Leader: none Replicas: 84,82,83 Isr: 83 MarkedForDeletion: true Topic: topic.name: 4 Leader: none Replicas: 52,83,84 Isr: 83 MarkedForDeletion: true Topic: topic.name: 5 Leader: none Replicas: 53,84,52 Isr: 53 MarkedForDeletion: true But yes, I think there is a clear case for KIP-491 in this scenario, of URP to just blacklist a broker from becoming leader until x factor is satisfied, or it's manually removed. > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk. Clients with {{acks=-1}} also see issues at this > time, because replication is effectively stalled. > To quote Todd Palino from the mailing list: > bq. There is an admin CLI command to trigger the preferred replica election > manually. There is also a broker configuration “auto.leader.rebalance.enable” > which you can set to have the broker automatically perform the PLE when > needed. DO NOT USE THIS OPTION. There are serious performance issues when > doing so, especially on larger clusters. It needs some development work that > has not been fully identified yet. > This setting is extremely useful for smaller clusters, but with high > partition counts causes the huge issues stated above. > One potential fix could be adding a new configuration for the number of > partitions to do automated leader rebalancing for at once, and *stop* once > that number of leader rebalances are in flight, until they're done. There may > be better mechanisms, and I'd love to hear if anybody has any ideas. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7052) ExtractField SMT throws NPE - needs clearer error message
[ https://issues.apache.org/jira/browse/KAFKA-7052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032325#comment-17032325 ] ASF GitHub Bot commented on KAFKA-7052: --- gunnarmorling commented on pull request #8059: KAFKA-7052 Adding option to ExtractField SMT for controlling behavior… URL: https://github.com/apache/kafka/pull/8059 … in case of non-existent fields https://issues.apache.org/jira/browse/KAFKA-7052 *More detailed description of your change n/a *Summary of testing strategy (including rationale) Added JUnit test ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) @rhauch, hey, that's a quick first attempt for making the `ExtractField` SMT more flexible when it comes to encountering a record that doesn't contain the specified field. It's a common situation for connectors like Debezium which produce different "kinds" of records/topics; in our case e.g. actual change event topics and meta-topics such as TX data or schema history. One might want to apply the `ExtractField` SMT to the CDC records but not to those others. Also see [KAFKA-7052](https://issues.apache.org/jira/browse/KAFKA-7052) for some backgrounds. The proposal is to add a new option `behavior.on.non.existent.field` to the SMT which makes the behavior configurable. Its supported values are: * fail: raise an exception (default for records with schema) * return-null: return null (default for records without schema) * pass-on: pass on the unmodified original record I did a quick implementation of that proposal to foster feedback. Happy to adjust and expand as needed, e.g. to adjust with existing naming patterns for the option and/or its values as well as docs (not sure where that'd go). Thanks! CC @rmoff, @big-andy-coates. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ExtractField SMT throws NPE - needs clearer error message > - > > Key: KAFKA-7052 > URL: https://issues.apache.org/jira/browse/KAFKA-7052 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Robin Moffatt >Priority: Major > > With the following Single Message Transform: > {code:java} > "transforms.ExtractId.type":"org.apache.kafka.connect.transforms.ExtractField$Key", > "transforms.ExtractId.field":"id"{code} > Kafka Connect errors with : > {code:java} > java.lang.NullPointerException > at > org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61) > at > org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38){code} > There should be a better error message here, identifying the reason for the > NPE. > Version: Confluent Platform 4.1.1 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032285#comment-17032285 ] Evan Williams edited comment on KAFKA-4084 at 2/7/20 10:57 AM: --- [~sql_consulting] We are using min.insync.replicas=1. And have replication.factor=3 or above for most topics (6 brokers). As a side note, one interesting thing I've seen reported now from the owners of the clients (streams) is that, for certain topics/partitions - they had no leader, even if there was a clean shutdown of the bootstrapping broker. So something is quite weird there. What might cause that ? Topic was marked for deletion, without any user requesting this: Topic: data.vehicle-topic.journey-dpi PartitionCount: 6 ReplicationFactor: 3 Configs: cleanup.policy=delete MarkedForDeletion: true Topic: topic.name: 0 Leader: none Replicas: 54,52,53 Isr: 54 MarkedForDeletion: true Topic: topic.name: 1 Leader: none Replicas: 82,53,54 Isr: 82 MarkedForDeletion: true Topic: topic.name: 2 Leader: none Replicas: 83,54,82 Isr: 83 MarkedForDeletion: true Topic: topic.name: 3 Leader: none Replicas: 84,82,83 Isr: 83 MarkedForDeletion: true Topic: topic.name: 4 Leader: none Replicas: 52,83,84 Isr: 83 MarkedForDeletion: true Topic: topic.name: 5 Leader: none Replicas: 53,84,52 Isr: 53 MarkedForDeletion: true But yes, I think there is a clear case for KIP-491 in this scenario, of URP to just blacklist a broker from becoming leader until x factor is satisfied, or it's manually removed. was (Author: blodsbror): [~sql_consulting] We are using min.insync.replicas=1. And have replication.factor=3 or above for most topics (6 brokers). As a side note, one interesting thing I've seen reported now from the owners of the clients (streams) is that, for certain topics/partitions - they had no leader, even if there was a clean shutdown of the bootstrapping broker. So something is quite weird there. What might cause that ? Topic: data.vehicle-topic.journey-dpi PartitionCount: 6 ReplicationFactor: 3 Configs: cleanup.policy=delete MarkedForDeletion: true Topic: topic.name: 0 Leader: none Replicas: 54,52,53 Isr: 54 MarkedForDeletion: true Topic: topic.name: 1 Leader: none Replicas: 82,53,54 Isr: 82 MarkedForDeletion: true Topic: topic.name: 2 Leader: none Replicas: 83,54,82 Isr: 83 MarkedForDeletion: true Topic: topic.name: 3 Leader: none Replicas: 84,82,83 Isr: 83 MarkedForDeletion: true Topic: topic.name: 4 Leader: none Replicas: 52,83,84 Isr: 83 MarkedForDeletion: true Topic: topic.name: 5 Leader: none Replicas: 53,84,52 Isr: 53 MarkedForDeletion: true But yes, I think there is a clear case for KIP-491 in this scenario, of URP to just blacklist a broker from becoming leader until x factor is satisfied, or it's manually removed. > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk. Clients with {{acks=-1}} also see issues at this > time, because replication is effectively stalled. > To quote Todd Palino from the mailing list: > bq. There is an admin CLI command to trigger the preferred replica election > manually. There is also a broker configuration “auto.leader.rebalance.enable” > which you can set to have the broker automatically perform the PLE when > needed. DO NOT USE THIS OPTION. There are serious performance issues when > doing so, especially on larger clusters. It needs some development work that > has not been fully identified yet. > This setting is extremely useful for smaller clusters, but with high > partition counts causes the huge issues stated above. > One potential fix could be adding a new configuration for the number of > partitions to do automated leader rebalancing
[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032285#comment-17032285 ] Evan Williams edited comment on KAFKA-4084 at 2/7/20 10:44 AM: --- [~sql_consulting] We are using min.insync.replicas=1. And have replication.factor=3 or above for most topics (6 brokers). As a side note, one interesting thing I've seen reported now from the owners of the clients (streams) is that, for certain topics/partitions - they had no leader, even if there was a clean shutdown of the bootstrapping broker. So something is quite weird there. What might cause that ? Topic: data.vehicle-topic.journey-dpi PartitionCount: 6 ReplicationFactor: 3 Configs: cleanup.policy=delete MarkedForDeletion: true Topic: topic.name: 0 Leader: none Replicas: 54,52,53 Isr: 54 MarkedForDeletion: true Topic: topic.name: 1 Leader: none Replicas: 82,53,54 Isr: 82 MarkedForDeletion: true Topic: topic.name: 2 Leader: none Replicas: 83,54,82 Isr: 83 MarkedForDeletion: true Topic: topic.name: 3 Leader: none Replicas: 84,82,83 Isr: 83 MarkedForDeletion: true Topic: topic.name: 4 Leader: none Replicas: 52,83,84 Isr: 83 MarkedForDeletion: true Topic: topic.name: 5 Leader: none Replicas: 53,84,52 Isr: 53 MarkedForDeletion: true But yes, I think there is a clear case for KIP-491 in this scenario, of URP to just blacklist a broker from becoming leader until x factor is satisfied, or it's manually removed. was (Author: blodsbror): [~sql_consulting] We are using min.insync.replicas=1. And have replication.factor=3 or above for most topics (6 brokers). As a side note, one interesting thing I've seen reported now from the owners of the clients (streams) is that, for certain topics/partitions - they had no leader, even if there was a clean shutdown of the bootstrapping broker. So something is quite weird there.. But yes, I think there is a clear case for KIP-491 in this scenario, of URP to just blacklist a broker from becoming leader until x factor is satisfied, or it's manually removed. > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk. Clients with {{acks=-1}} also see issues at this > time, because replication is effectively stalled. > To quote Todd Palino from the mailing list: > bq. There is an admin CLI command to trigger the preferred replica election > manually. There is also a broker configuration “auto.leader.rebalance.enable” > which you can set to have the broker automatically perform the PLE when > needed. DO NOT USE THIS OPTION. There are serious performance issues when > doing so, especially on larger clusters. It needs some development work that > has not been fully identified yet. > This setting is extremely useful for smaller clusters, but with high > partition counts causes the huge issues stated above. > One potential fix could be adding a new configuration for the number of > partitions to do automated leader rebalancing for at once, and *stop* once > that number of leader rebalances are in flight, until they're done. There may > be better mechanisms, and I'd love to hear if anybody has any ideas. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032285#comment-17032285 ] Evan Williams commented on KAFKA-4084: -- [~sql_consulting] We are using min.insync.replicas=1. And have replication.factor=3 or above for most topics (6 brokers). As a side note, one interesting thing I've seen reported now from the owners of the clients (streams) is that, for certain topics/partitions - they had no leader, even if there was a clean shutdown of the bootstrapping broker. So something is quite weird there.. But yes, I think there is a clear case for KIP-491 in this scenario, of URP to just blacklist a broker from becoming leader until x factor is satisfied, or it's manually removed. > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk. Clients with {{acks=-1}} also see issues at this > time, because replication is effectively stalled. > To quote Todd Palino from the mailing list: > bq. There is an admin CLI command to trigger the preferred replica election > manually. There is also a broker configuration “auto.leader.rebalance.enable” > which you can set to have the broker automatically perform the PLE when > needed. DO NOT USE THIS OPTION. There are serious performance issues when > doing so, especially on larger clusters. It needs some development work that > has not been fully identified yet. > This setting is extremely useful for smaller clusters, but with high > partition counts causes the huge issues stated above. > One potential fix could be adding a new configuration for the number of > partitions to do automated leader rebalancing for at once, and *stop* once > that number of leader rebalances are in flight, until they're done. There may > be better mechanisms, and I'd love to hear if anybody has any ideas. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032267#comment-17032267 ] GEORGE LI edited comment on KAFKA-4084 at 2/7/20 10:02 AM: --- [~blodsbror] [~junrao] [~sriharsha] With `auto.leader.rebalance.enable=true`, even with checking the broker is in ISR, then rebalance the leadership to this broker. It might still impact the partition that this broker is serving leadership because the broker is still trying to catch up. [~blodsbror] Is your cluster having lossless setting like min.insync.replicas > 1 ? For lossless, we experience high producer latency during catchup or reassignments. The leader deprioritized list feature is convenient in this case. it will just put this broker catching up in the lowest priority when considering being the leader. Another useful case is when the current controlller is very busy with metadata request, "blacklist" it, and only serving as followers can give 10-15% CPU back to the controller (without bouncing it). In a cluster without any down brokers, no URP (Under Replicated Partitions), there is a workaround to run reassignments to move that broker to the end of the partition assignment, e.g. broker_id 100 is down.then partition assignment (100, 101, 102) => (101, 102, 100). the reassignment should complete fast because all replicas in ISR. then run preferred leader election will change the leader from 100 => 101. The downside is: its more work to rollback or rebalance again.KIP-491 is very easy to rollback, just unset the dynamic config. In the case of the cluster with URP, the reassignment approach to move the broker to the end of the assignment might not work, because the broker is not in ISR. the reassignments will be pending till it catches up and in ISR. So maybe we do have a good use-case for KIP-491 ? was (Author: sql_consulting): [~blodsbror] [~junrao][~sriharsha] With `auto.leader.rebalance.enable=true`, even with checking the broker is in ISR, then rebalance the leadership to this broker. It might still impact the partition that this broker is serving leadership because the broker is still trying to catch up. [~blodsbror] is your cluster having lossless setting like min.insync.replicas > 1 ? For lossless, we experience high producer latency during catchup or reassignments. The leader deprioritized list feature is convenient in this case. it will just put this broker catching up in the lowest priority when considering being the leader. Another useful case is when the current controlller is very busy with metadata request, "blacklist" it, and only serving as followers can give 10-15% CPU back to the controller (without bouncing it). In a cluster without any down brokers, no URP (Under Replicated Partitions), there is a workaround to run reassignments to move that broker to the end of the partition assignment, e.g. broker_id 100 is down.then partition assignment (100, 101, 102) => (101, 102, 100). the reassignment should complete fast because all replicas in ISR. then run preferred leader election will change the leader from 100 => 101. The downside is: its more work to rollback or rebalance again.KIP-491 is very easy to rollback, just unset the dynamic config. In the case of the cluster with URP, the reassignment approach to move the broker to the end of the assignment might not work, because the broker is not in ISR. the reassignments will be pending till it catches up and in ISR. So maybe we do have a good use-case for KIP-491 ? > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk.
[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032267#comment-17032267 ] GEORGE LI commented on KAFKA-4084: -- [~blodsbror] [~junrao][~sriharsha] With `auto.leader.rebalance.enable=true`, even with checking the broker is in ISR, then rebalance the leadership to this broker. It might still impact the partition that this broker is serving leadership because the broker is still trying to catch up. [~blodsbror] is your cluster having lossless setting like min.insync.replicas > 1 ? For lossless, we experience high producer latency during catchup or reassignments. The leader deprioritized list feature is convenient in this case. it will just put this broker catching up in the lowest priority when considering being the leader. Another useful case is when the current controlller is very busy with metadata request, "blacklist" it, and only serving as followers can give 10-15% CPU back to the controller (without bouncing it). In a cluster without any down brokers, no URP (Under Replicated Partitions), there is a workaround to run reassignments to move that broker to the end of the partition assignment, e.g. broker_id 100 is down.then partition assignment (100, 101, 102) => (101, 102, 100). the reassignment should complete fast because all replicas in ISR. then run preferred leader election will change the leader from 100 => 101. The downside is: its more work to rollback or rebalance again.KIP-491 is very easy to rollback, just unset the dynamic config. In the case of the cluster with URP, the reassignment approach to move the broker to the end of the assignment might not work, because the broker is not in ISR. the reassignments will be pending till it catches up and in ISR. So maybe we do have a good use-case for KIP-491 ? > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk. Clients with {{acks=-1}} also see issues at this > time, because replication is effectively stalled. > To quote Todd Palino from the mailing list: > bq. There is an admin CLI command to trigger the preferred replica election > manually. There is also a broker configuration “auto.leader.rebalance.enable” > which you can set to have the broker automatically perform the PLE when > needed. DO NOT USE THIS OPTION. There are serious performance issues when > doing so, especially on larger clusters. It needs some development work that > has not been fully identified yet. > This setting is extremely useful for smaller clusters, but with high > partition counts causes the huge issues stated above. > One potential fix could be adding a new configuration for the number of > partitions to do automated leader rebalancing for at once, and *stop* once > that number of leader rebalances are in flight, until they're done. There may > be better mechanisms, and I'd love to hear if anybody has any ideas. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9522) kafka-connect failed when the topic can not be accessed
Deblock created KAFKA-9522: -- Summary: kafka-connect failed when the topic can not be accessed Key: KAFKA-9522 URL: https://issues.apache.org/jira/browse/KAFKA-9522 Project: Kafka Issue Type: Improvement Components: KafkaConnect Affects Versions: 2.3.1 Reporter: Deblock The kafka-connect fail if the topic can not be join (permission issue or topic doesn't exists). This issue happend using Debezium CDC : [https://issues.redhat.com/browse/DBZ-1770] The topic can be choosen using a column on the database. If the topic on a database contains an issue (permission issue or topic doesn't exists), the connect stop to work, and new event will not be sent. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17032200#comment-17032200 ] Evan Williams commented on KAFKA-4084: -- [~junrao] Yes, I can see that it's been throttled to exactly 10MB/sec. So is working. I'm not sure how the fetcherthreads work, or if num.fetcher.threads=1 can still spawn multiple child threads that can take over the CPU (and how that interacts with the number of set IO/network threads). But yes, 10MB/s on a 4vcpu server shouldn't cause this. Anyway, didn't mean to turn this into a support thread :) But it just show's that I could very much use a 'blacklist' feature, to stop a broker from becoming leader while it replicates. Then it can hit higher CPU limits, without effecting any clients. It seems that it's not easy to control load, even when trying to apply throttles. Am open to any other ideas to try, but yes - thanks for all your help. > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk. Clients with {{acks=-1}} also see issues at this > time, because replication is effectively stalled. > To quote Todd Palino from the mailing list: > bq. There is an admin CLI command to trigger the preferred replica election > manually. There is also a broker configuration “auto.leader.rebalance.enable” > which you can set to have the broker automatically perform the PLE when > needed. DO NOT USE THIS OPTION. There are serious performance issues when > doing so, especially on larger clusters. It needs some development work that > has not been fully identified yet. > This setting is extremely useful for smaller clusters, but with high > partition counts causes the huge issues stated above. > One potential fix could be adding a new configuration for the number of > partitions to do automated leader rebalancing for at once, and *stop* once > that number of leader rebalances are in flight, until they're done. There may > be better mechanisms, and I'd love to hear if anybody has any ideas. -- This message was sent by Atlassian Jira (v8.3.4#803005)