[jira] [Created] (KAFKA-14128) Kafka Streams terminates on topic check
Patrik Kleindl created KAFKA-14128: -- Summary: Kafka Streams terminates on topic check Key: KAFKA-14128 URL: https://issues.apache.org/jira/browse/KAFKA-14128 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.0.0 Environment: Any Reporter: Patrik Kleindl Our streams application shut down unexpectedly after some network issues that should have been easily recoverable. Logs: {code:java} 2022-07-29 13:39:37.854 INFO 25843 --- [348aefeff-admin] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Disconnecting from node 3 due to request timeout. 2022-07-29 13:39:37.854 INFO 25843 --- [348aefeff-admin] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Cancelled in-flight METADATA request with correlation id 985 due to node 3 being disconnected (elapsed time since creation: 60023ms, elapsed time since send: 60023ms, request timeout: 3ms) 2022-07-29 13:39:37.867 ERROR 25843 --- [-StreamThread-1] o.a.k.s.p.i.InternalTopicManager : stream-thread [main] Unexpected error during topic description for L.DII.A-COGROUPKSTREAM-AGGREGATE-STATE-STORE-03-changelog. Error message was: org.apache.kafka.common.errors.TimeoutException: Call(callName=describeTopics, deadlineMs=1659101977830, tries=1, nextAllowedTryMs=1659101977955) timed out at 1659101977855 after 1 attempt(s) 2022-07-29 13:39:37.869 INFO 25843 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN {code} I think the relevant code is in [https://github.com/apache/kafka/blob/31ff6d7f8af57e8c39061f31774a61bd1728904e/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L524|https://github.com/apache/kafka/blob/31ff6d7f8af57e8c39061f31774a61bd1728904e/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L523-L550] {code:java} topicFuture.getValue().get();{code} without a timeout value cannot throw a TimeoutException, so the TimeoutException of the AdminClient will be an ExecutionException and hit the last else branch where the StreamsException is thrown. Possible fix: Use the KafkaFuture method with timeout: {code:java} public abstract T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;{code} instead of {code:java} public abstract T get() throws InterruptedException, ExecutionException;{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-13213) StreamThread: State transition from PARTITIONS_ASSIGNED to PARTITIONS_ASSIGNED
Patrik Kleindl created KAFKA-13213: -- Summary: StreamThread: State transition from PARTITIONS_ASSIGNED to PARTITIONS_ASSIGNED Key: KAFKA-13213 URL: https://issues.apache.org/jira/browse/KAFKA-13213 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.6.0 Reporter: Patrik Kleindl One of our streaming apps sometimes enters a strange looping behaviour. It is deployed on 2 pods in a kubernetes cluster, but only one of them shows this: {code:java} 2021-08-18 11:27:20,402 [app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] INFO o.a.k.s.p.i.StreamThread - stream-thread [app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] State transition from PARTITIONS_ASSIGNED to PARTITIONS_ASSIGNED2021-08-18 11:27:20,402 [app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] INFO o.a.k.s.p.i.StreamThread - stream-thread [app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] State transition from PARTITIONS_ASSIGNED to PARTITIONS_ASSIGNED2021-08-18 11:37:23,516 [app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1-consumer, groupId=app] Attempt to heartbeat failed since group is rebalancing2021-08-18 11:37:23,518 [app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1-consumer, groupId=app] (Re-)joining group2021-08-18 11:37:23,676 [app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1-consumer, groupId=app] Successfully joined group with generation 135...2021-08-18 11:37:23,678 [app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] INFO o.a.k.s.p.i.TaskManager - stream-thread [app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] Handle new assignment with: New active tasks: [0_0, 0_1, 0_2, 0_3, 0_4, 3_1, 0_5, 0_6, 3_3, 0_7, 0_8, 3_5, 0_9, 0_10, 3_7, 0_11, 3_9, 3_11] New standby tasks: [1_3, 1_1] Existing active tasks: [0_0, 0_1, 0_2, 0_3, 0_4, 3_1, 0_5, 0_6, 3_3, 0_7, 0_8, 3_5, 0_9, 0_10, 3_7, 0_11, 3_9, 3_11] Existing standby tasks: [1_1, 1_3]2021-08-18 11:37:23,678 [app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1-consumer, groupId=app] Adding newly assigned partitions: 2021-08-18 11:37:23,679 [app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] INFO o.a.k.s.p.i.StreamThread - stream-thread [app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] State transition from PARTITIONS_ASSIGNED to PARTITIONS_ASSIGNED2021-08-18 11:47:26,768 [app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1-consumer, groupId=app] Attempt to heartbeat failed since group is rebalancing{code} The other one shows: {code:java} 2021-08-18 11:37:23,710 [app-1c2d16b0-7456-409d-89c5-792026f08647-StreamThread-1] INFO o.a.k.s.p.i.StreamsPartitionAssignor - stream-thread [app-1c2d16b0-7456-409d-89c5-792026f08647-StreamThread-1-consumer] Requested to schedule probing rebalance for 1629280043706 ms.2021-08-18 11:37:23,710 [app-1c2d16b0-7456-409d-89c5-792026f08647-StreamThread-1] INFO o.a.k.s.p.i.StreamsPartitionAssignor - stream-thread [app-1c2d16b0-7456-409d-89c5-792026f08647-StreamThread-1-consumer] Requested to schedule probing rebalance for 1629280043706 ms.2021-08-18 11:37:23,710 [app-1c2d16b0-7456-409d-89c5-792026f08647-StreamThread-1] INFO o.a.k.s.p.i.TaskManager - stream-thread [app-1c2d16b0-7456-409d-89c5-792026f08647-StreamThread-1] Handle new assignment with: New active tasks: [1_0, 2_0, 1_1, 2_1, 3_0, 1_2, 2_2, 1_3, 2_3, 3_2, 1_4, 2_4, 1_5, 2_5, 3_4, 1_6, 2_6, 1_7, 2_7, 3_6, 1_8, 2_8, 1_9, 2_9, 3_8, 1_10, 2_10, 1_11, 2_11, 3_10] New standby tasks: [] Existing active tasks: [1_0, 1_1, 2_0, 1_2, 2_1, 3_0, 1_3, 2_2, 1_4, 2_3, 3_2, 1_5, 2_4, 1_6, 2_5, 3_4, 1_7, 2_6, 1_8, 2_7, 3_6, 1_9, 2_8, 1_10, 2_9, 3_8, 1_11, 2_10, 2_11, 3_10] Existing standby tasks: []2021-08-18 11:37:23,711 [app-1c2d16b0-7456-409d-89c5-792026f08647-StreamThread-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=app-1c2d16b0-7456-409d-89c5-792026f08647-StreamThread-1-consumer, groupId=app] Adding newly assigned partitions: 2021-08-18 11:37:23,711
[jira] [Resolved] (KAFKA-8838) Allow consumer group tool to work with non-existing consumer groups
[ https://issues.apache.org/jira/browse/KAFKA-8838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Patrik Kleindl resolved KAFKA-8838. --- Resolution: Not A Problem > Allow consumer group tool to work with non-existing consumer groups > --- > > Key: KAFKA-8838 > URL: https://issues.apache.org/jira/browse/KAFKA-8838 > Project: Kafka > Issue Type: Improvement > Components: tools >Reporter: Patrik Kleindl >Priority: Minor > > The streams application reset tool works for non-existing consumer groups and > allows to "pre-set" offsets before a new deployment. > The consumer group tool does not allow the same which would be a nice > enhancement. > If this should work and the NullPointerException is not expected this can be > converted to a bug. > > {code:java} > ./kafka-consumer-groups --bootstrap-server broker:9092 --group applicationId > --reset-offsets --by-duration P60D --topic topic1 --executeError: Executing > consumer group command failed due to nulljava.lang.NullPointerException at > scala.collection.convert.Wrappers$JListWrapper.iterator(Wrappers.scala:88) at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at > scala.collection.AbstractTraversable.map(Traversable.scala:104) at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService$$anonfun$parseTopicPartitionsToReset$1.apply(ConsumerGroupCommand.scala:477) > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService$$anonfun$parseTopicPartitionsToReset$1.apply(ConsumerGroupCommand.scala:471) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at > scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService.parseTopicPartitionsToReset(ConsumerGroupCommand.scala:471) > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService.getPartitionsToReset(ConsumerGroupCommand.scala:486) > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService.resetOffsets(ConsumerGroupCommand.scala:310) > at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:64) at > kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala){code} -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (KAFKA-8838) Allow consumer group tool to work with non-existing consumer groups
Patrik Kleindl created KAFKA-8838: - Summary: Allow consumer group tool to work with non-existing consumer groups Key: KAFKA-8838 URL: https://issues.apache.org/jira/browse/KAFKA-8838 Project: Kafka Issue Type: Improvement Components: tools Reporter: Patrik Kleindl The streams application reset tool works for non-existing consumer groups and allows to "pre-set" offsets before a new deployment. The consumer group tool does not allow the same which would be a nice enhancement. If this should work and the NullPointerException is not expected this can be converted to a bug. {code:java} ./kafka-consumer-groups --bootstrap-server broker:9092 --group applicationId --reset-offsets --by-duration P60D --topic topic1 --executeError: Executing consumer group command failed due to nulljava.lang.NullPointerException at scala.collection.convert.Wrappers$JListWrapper.iterator(Wrappers.scala:88) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$$anonfun$parseTopicPartitionsToReset$1.apply(ConsumerGroupCommand.scala:477) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$$anonfun$parseTopicPartitionsToReset$1.apply(ConsumerGroupCommand.scala:471) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.parseTopicPartitionsToReset(ConsumerGroupCommand.scala:471) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.getPartitionsToReset(ConsumerGroupCommand.scala:486) at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.resetOffsets(ConsumerGroupCommand.scala:310) at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:64) at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala){code} -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (KAFKA-8766) Allow a custom offset policy for Kafka Streams applications
Patrik Kleindl created KAFKA-8766: - Summary: Allow a custom offset policy for Kafka Streams applications Key: KAFKA-8766 URL: https://issues.apache.org/jira/browse/KAFKA-8766 Project: Kafka Issue Type: Improvement Components: streams Reporter: Patrik Kleindl Currently when starting a new streams application (= new consumer group) you can only choose between starting from the beginning of all topics or only processing newly arriving records. To start processing at any give point in the past (e.g. only processing data of the last month) the application has to be started (so the consumer group exists), stopped, the offsets reset and then restarted. It would be helpful if this could be passed in with the help of some kind of "offset reset strategy" which could be provided by the user. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Resolved] (KAFKA-8093) Fix JavaDoc markup
[ https://issues.apache.org/jira/browse/KAFKA-8093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Patrik Kleindl resolved KAFKA-8093. --- Resolution: Fixed Fixed by other commits, see PR for discussion. > Fix JavaDoc markup > -- > > Key: KAFKA-8093 > URL: https://issues.apache.org/jira/browse/KAFKA-8093 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Matthias J. Sax >Assignee: Patrik Kleindl >Priority: Trivial > > Running `./gradlew install` gives the following warning > {code:java} > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java:87: > warning - Tag @link: reference not found: java.nio.channels.Selector > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java:87: > warning - Tag @link: reference not found: > java.nio.channels.Selector#wakeup() wakeup() > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java:34: > warning - Tag @link: reference not found: > org.apache.kafka.clients.producer.ProducerRecord ProducerRecord > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261: > warning - @Header is an unknown tag. > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261: > warning - @Header is an unknown tag. > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261: > warning - @Header is an unknown tag. > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261: > warning - @Header is an unknown tag. > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261: > warning - @Header is an unknown tag. > /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261: > warning - @Header is an unknown tag. > {code} > Those should be fixed -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8200) TopologyTestDriver should offer an iterable signature of readOutput
[ https://issues.apache.org/jira/browse/KAFKA-8200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Patrik Kleindl resolved KAFKA-8200. --- Resolution: Won't Do Discarded in favor of [https://cwiki.apache.org/confluence/display/KAFKA/KIP-456%3A+Helper+classes+to+make+it+simpler+to+write+test+logic+with+TopologyTestDriver] > TopologyTestDriver should offer an iterable signature of readOutput > --- > > Key: KAFKA-8200 > URL: https://issues.apache.org/jira/browse/KAFKA-8200 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Michael Drogalis >Assignee: Patrik Kleindl >Priority: Minor > Labels: needs-kip > > When using the TopologyTestDriver, one examines the output on a topic with > the readOutput method. This method returns one record at a time, until no > more records can be found, at which point in returns null. > Many times, the usage pattern around readOutput will involve writing a loop > to extract a number of records from the topic, building up a list of records, > until it returns null. > It would be helpful to offer an iterable signature of readOutput, which > returns either an iterator or list over the records that are currently > available in the topic. This would effectively remove the loop that a user > needs to write for him/herself each time. > Such a signature might look like: > {code:java} > public Iterable> readOutput(java.lang.String > topic); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8024) UtilsTest.testFormatBytes fails with german locale
Patrik Kleindl created KAFKA-8024: - Summary: UtilsTest.testFormatBytes fails with german locale Key: KAFKA-8024 URL: https://issues.apache.org/jira/browse/KAFKA-8024 Project: Kafka Issue Type: Bug Reporter: Patrik Kleindl The unit test fails when the default locale is not English (in my case, deAT) assertEquals("1.1 MB", formatBytes((long) (1.1 * 1024 * 1024))); org.apache.kafka.common.utils.UtilsTest > testFormatBytes FAILED org.junit.ComparisonFailure: expected:<1[.]1 MB> but was:<1[,]1 MB> at org.junit.Assert.assertEquals(Assert.java:115) at org.junit.Assert.assertEquals(Assert.java:144) at org.apache.kafka.common.utils.UtilsTest.testFormatBytes(UtilsTest.java:106) The easiest fix in this case should be adding {code:java} jvmArgs '-Duser.language=en -Duser.country=US'{code} to the test configuration [https://github.com/apache/kafka/blob/b03e8c234a8aeecd10c2c96b683cfb39b24b548a/build.gradle#L270] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8023) Improve global state store restoration by using multiple update threads
Patrik Kleindl created KAFKA-8023: - Summary: Improve global state store restoration by using multiple update threads Key: KAFKA-8023 URL: https://issues.apache.org/jira/browse/KAFKA-8023 Project: Kafka Issue Type: Improvement Components: streams Reporter: Patrik Kleindl Currently global state stores are restored sequentially and the partitions of each global state store are restored sequentially too. Loop over stores: https://github.com/apache/kafka/blob/b03e8c234a8aeecd10c2c96b683cfb39b24b548a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L155 Loop over partitions: https://github.com/apache/kafka/blob/b03e8c234a8aeecd10c2c96b683cfb39b24b548a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L256 It would be a great improvement if one or both of those loops could be processed in parallel. Possible related task is https://issues.apache.org/jira/browse/KAFKA-6721 Mail discussion: https://lists.apache.org/thread.html/6fc4772eb8635c04b0ee6682003a99a5ef37ebccffea6c89752e96b1@%3Cusers.kafka.apache.org%3E -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7996) KafkaStreams does not pass timeout when closing Producer
Patrik Kleindl created KAFKA-7996: - Summary: KafkaStreams does not pass timeout when closing Producer Key: KAFKA-7996 URL: https://issues.apache.org/jira/browse/KAFKA-7996 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.1.0 Reporter: Patrik Kleindl [https://confluentcommunity.slack.com/messages/C48AHTCUQ/convo/C48AHTCUQ-1550831721.026100/] We are running 2.1 and have a case where the shutdown of a streams application takes several minutes I noticed that although we call streams.close with a timeout of 30 seconds the log says [Producer clientId=…-8be49feb-8a2e-4088-bdd7-3c197f6107bb-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. Matthias J Sax [vor 3 Tagen] I just checked the code, and yes, we don't provide a timeout for the producer on close()... -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7660) Stream Metrics - Memory Analysis
Patrik Kleindl created KAFKA-7660: - Summary: Stream Metrics - Memory Analysis Key: KAFKA-7660 URL: https://issues.apache.org/jira/browse/KAFKA-7660 Project: Kafka Issue Type: Bug Components: metrics, streams Affects Versions: 2.0.0 Reporter: Patrik Kleindl Attachments: Mem_Collections.jpeg, Mem_DuplicateStrings.jpeg, Mem_DuplicateStrings2.jpeg, Mem_Hotspots.jpeg, Mem_KeepAliveSet.jpeg, Mem_References.jpeg During the analysis of JVM memory two possible issues were shown which I would like to bring to your attention: 1) Duplicate strings Top findings: string_content="stream-processor-node-metrics" count="534,277" string_content="processor-node-id" count="148,437" string_content="stream-rocksdb-state-metrics" count="41,832" string_content="punctuate-latency-avg" count="29,681" "stream-processor-node-metrics" seems to be used in Sensors.java as a literal and not interned. 2) The HashMap parentSensors from org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl was reported multiple times as suspicious for potentially keeping alive a lot of objects. In our case the reported size was 40-50MB each. I haven't looked too deep in the code but noticed that the class Sensor.java which is used as a key in the HashMap does not implement equals or hashCode method. Not sure this is a problem though. The analysis was done with Dynatrace 7.0 We are running Confluent 5.0/Kafka2.0-cp1 (Brokers as well as Clients) Screenshots are attached. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7332) Improve error message when trying to produce message without key for compacted topic
Patrik Kleindl created KAFKA-7332: - Summary: Improve error message when trying to produce message without key for compacted topic Key: KAFKA-7332 URL: https://issues.apache.org/jira/browse/KAFKA-7332 Project: Kafka Issue Type: Improvement Components: producer Affects Versions: 1.1.0 Reporter: Patrik Kleindl Goal: Return a specific error message like e.g. "Message without a key is not valid for a compacted topic" when trying to produce such a message instead of a CorruptRecordException. > Yesterday we had the following exception: > > Exception thrown when sending a message with key='null' and payload='...' > to topic sometopic:: org.apache.kafka.common.errors.CorruptRecordException: > This message has failed its CRC checksum, exceeds the valid size, or is > otherwise corrupt. > > The cause was identified with the help of > >[https://stackoverflow.com/questions/49098274/kafka-stream-get-corruptrecordexception] > > Is it possible / would it makes sense to open an issue to improve the error > message for this case? > A simple "Message without a key is not valid for a compacted topic" would > suffice and point a user in the right direction. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7281) Fix documentation and error message regarding cleanup.policy=[compact,delete]
Patrik Kleindl created KAFKA-7281: - Summary: Fix documentation and error message regarding cleanup.policy=[compact,delete] Key: KAFKA-7281 URL: https://issues.apache.org/jira/browse/KAFKA-7281 Project: Kafka Issue Type: Task Components: config Affects Versions: 1.1.0 Reporter: Patrik Kleindl Issue as requested in: https://lists.apache.org/thread.html/621821e321b9ae5a8af623f5918edc4ceee564e0561009317fc705af@%3Cusers.kafka.apache.org%3E 1) The documentation at [https://kafka.apache.org/documentation/] is missing the updated information regarding the "compact,delete" cleanup policy on topic level. log.cleanup.policy on broker level The default cleanup policy for segments beyond the retention window. A comma separated list of valid policies. Valid policies are: "delete" and "compact" cleanup.policy on topic level A string that is either "delete" or "compact". 2) Also the special notation for the command-line client should be noted: ./kafka-configs --zookeeper broker0:2181 --alter --entity-type topics --entity-name test --add-config cleanup.policy=[compact,delete] Completed Updating config for entity: topic 'test'. 3) The config command does not show this new notation in the error message: ./kafka-configs --zookeeper broker0:2181 --alter --entity-type topics --entity-name test --add-config cleanup.policy=test Error while executing config command with args '--zookeeper broker0:2181 --alter --entity-type topics --entity-name test --add-config cleanup.policy=test' org.apache.kafka.common.config.ConfigException: Invalid value test for configuration cleanup.policy: String must be one of: compact, delete at org.apache.kafka.common.config.ConfigDef$ValidString.ensureValid(ConfigDef.java:930) at org.apache.kafka.common.config.ConfigDef$ValidList.ensureValid(ConfigDef.java:906) at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:478) at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:462) at kafka.log.LogConfig$.validate(LogConfig.scala:299) at kafka.zk.AdminZkClient.validateTopicConfig(AdminZkClient.scala:336) at kafka.zk.AdminZkClient.changeTopicConfig(AdminZkClient.scala:348) at kafka.zk.AdminZkClient.changeConfigs(AdminZkClient.scala:285) at kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:133) at kafka.admin.ConfigCommand$.processCommandWithZk(ConfigCommand.scala:100) at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:77) at kafka.admin.ConfigCommand.main(ConfigCommand.scala) -- This message was sent by Atlassian JIRA (v7.6.3#76005)