[jira] [Created] (KAFKA-14128) Kafka Streams terminates on topic check

2022-08-01 Thread Patrik Kleindl (Jira)
Patrik Kleindl created KAFKA-14128:
--

 Summary: Kafka Streams terminates on topic check
 Key: KAFKA-14128
 URL: https://issues.apache.org/jira/browse/KAFKA-14128
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.0.0
 Environment: Any
Reporter: Patrik Kleindl


Our streams application shut down unexpectedly after some network issues that 
should have been easily recoverable.

Logs:

 
{code:java}
2022-07-29 13:39:37.854  INFO 25843 --- [348aefeff-admin] 
org.apache.kafka.clients.NetworkClient   : [AdminClient 
clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Disconnecting from 
node 3 due to request timeout.
2022-07-29 13:39:37.854  INFO 25843 --- [348aefeff-admin] 
org.apache.kafka.clients.NetworkClient   : [AdminClient 
clientId=L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-admin] Cancelled 
in-flight METADATA request with correlation id 985 due to node 3 being 
disconnected (elapsed time since creation: 60023ms, elapsed time since send: 
60023ms, request timeout: 3ms)
2022-07-29 13:39:37.867 ERROR 25843 --- [-StreamThread-1] 
o.a.k.s.p.i.InternalTopicManager : stream-thread [main] Unexpected 
error during topic description for 
L.DII.A-COGROUPKSTREAM-AGGREGATE-STATE-STORE-03-changelog.
Error message was: org.apache.kafka.common.errors.TimeoutException: 
Call(callName=describeTopics, deadlineMs=1659101977830, tries=1, 
nextAllowedTryMs=1659101977955) timed out at 1659101977855 after 1 attempt(s)
2022-07-29 13:39:37.869  INFO 25843 --- [-StreamThread-1] 
o.a.k.s.p.internals.StreamThread     : stream-thread 
[L.DII.A-b1355e4a-b909-4da1-a832-dd3348aefeff-StreamThread-1] State transition 
from RUNNING to PENDING_SHUTDOWN
{code}
I think the relevant code is in 
[https://github.com/apache/kafka/blob/31ff6d7f8af57e8c39061f31774a61bd1728904e/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L524|https://github.com/apache/kafka/blob/31ff6d7f8af57e8c39061f31774a61bd1728904e/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java#L523-L550]
{code:java}
topicFuture.getValue().get();{code}
without a timeout value cannot throw a TimeoutException, so the 
TimeoutException of the AdminClient will be an ExecutionException and hit the 
last else branch where the StreamsException is thrown.

Possible fix:

Use the KafkaFuture method with timeout:
{code:java}
public abstract T get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException,
TimeoutException;{code}
instead of 
{code:java}
public abstract T get() throws InterruptedException, ExecutionException;{code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-13213) StreamThread: State transition from PARTITIONS_ASSIGNED to PARTITIONS_ASSIGNED

2021-08-18 Thread Patrik Kleindl (Jira)
Patrik Kleindl created KAFKA-13213:
--

 Summary: StreamThread: State transition from PARTITIONS_ASSIGNED 
to PARTITIONS_ASSIGNED
 Key: KAFKA-13213
 URL: https://issues.apache.org/jira/browse/KAFKA-13213
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.6.0
Reporter: Patrik Kleindl


One of our streaming apps sometimes enters a strange looping behaviour.

It is deployed on 2 pods in a kubernetes cluster, but only one of them shows 
this:
{code:java}
2021-08-18 11:27:20,402 
[app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] INFO  
o.a.k.s.p.i.StreamThread                           - stream-thread 
[app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] State transition from 
PARTITIONS_ASSIGNED to PARTITIONS_ASSIGNED2021-08-18 11:27:20,402 
[app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] INFO  
o.a.k.s.p.i.StreamThread                           - stream-thread 
[app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] State transition from 
PARTITIONS_ASSIGNED to PARTITIONS_ASSIGNED2021-08-18 11:37:23,516 
[app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] INFO  
o.a.k.c.c.i.AbstractCoordinator                    - [Consumer 
clientId=app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1-consumer, 
groupId=app] Attempt to heartbeat failed since group is rebalancing2021-08-18 
11:37:23,518 [app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] INFO  
o.a.k.c.c.i.AbstractCoordinator                    - [Consumer 
clientId=app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1-consumer, 
groupId=app] (Re-)joining group2021-08-18 11:37:23,676 
[app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] INFO  
o.a.k.c.c.i.AbstractCoordinator                    - [Consumer 
clientId=app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1-consumer, 
groupId=app] Successfully joined group with generation 135...2021-08-18 
11:37:23,678 [app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] INFO  
o.a.k.s.p.i.TaskManager                            - stream-thread 
[app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] Handle new assignment 
with: New active tasks: [0_0, 0_1, 0_2, 0_3, 0_4, 3_1, 0_5, 0_6, 3_3, 0_7, 0_8, 
3_5, 0_9, 0_10, 3_7, 0_11, 3_9, 3_11] New standby tasks: [1_3, 1_1] Existing 
active tasks: [0_0, 0_1, 0_2, 0_3, 0_4, 3_1, 0_5, 0_6, 3_3, 0_7, 0_8, 3_5, 0_9, 
0_10, 3_7, 0_11, 3_9, 3_11] Existing standby tasks: [1_1, 1_3]2021-08-18 
11:37:23,678 [app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] INFO  
o.a.k.c.c.i.ConsumerCoordinator                    - [Consumer 
clientId=app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1-consumer, 
groupId=app] Adding newly assigned partitions: 2021-08-18 11:37:23,679 
[app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] INFO  
o.a.k.s.p.i.StreamThread                           - stream-thread 
[app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] State transition from 
PARTITIONS_ASSIGNED to PARTITIONS_ASSIGNED2021-08-18 11:47:26,768 
[app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1] INFO  
o.a.k.c.c.i.AbstractCoordinator                    - [Consumer 
clientId=app-53da371b-98fb-4c73-ad08-5b93b9c0d1d4-StreamThread-1-consumer, 
groupId=app] Attempt to heartbeat failed since group is rebalancing{code}
The other one shows:
{code:java}
2021-08-18 11:37:23,710 
[app-1c2d16b0-7456-409d-89c5-792026f08647-StreamThread-1] INFO  
o.a.k.s.p.i.StreamsPartitionAssignor               - stream-thread 
[app-1c2d16b0-7456-409d-89c5-792026f08647-StreamThread-1-consumer] Requested to 
schedule probing rebalance for 1629280043706 ms.2021-08-18 11:37:23,710 
[app-1c2d16b0-7456-409d-89c5-792026f08647-StreamThread-1] INFO  
o.a.k.s.p.i.StreamsPartitionAssignor               - stream-thread 
[app-1c2d16b0-7456-409d-89c5-792026f08647-StreamThread-1-consumer] Requested to 
schedule probing rebalance for 1629280043706 ms.2021-08-18 11:37:23,710 
[app-1c2d16b0-7456-409d-89c5-792026f08647-StreamThread-1] INFO  
o.a.k.s.p.i.TaskManager                            - stream-thread 
[app-1c2d16b0-7456-409d-89c5-792026f08647-StreamThread-1] Handle new assignment 
with: New active tasks: [1_0, 2_0, 1_1, 2_1, 3_0, 1_2, 2_2, 1_3, 2_3, 3_2, 1_4, 
2_4, 1_5, 2_5, 3_4, 1_6, 2_6, 1_7, 2_7, 3_6, 1_8, 2_8, 1_9, 2_9, 3_8, 1_10, 
2_10, 1_11, 2_11, 3_10] New standby tasks: [] Existing active tasks: [1_0, 1_1, 
2_0, 1_2, 2_1, 3_0, 1_3, 2_2, 1_4, 2_3, 3_2, 1_5, 2_4, 1_6, 2_5, 3_4, 1_7, 2_6, 
1_8, 2_7, 3_6, 1_9, 2_8, 1_10, 2_9, 3_8, 1_11, 2_10, 2_11, 3_10] Existing 
standby tasks: []2021-08-18 11:37:23,711 
[app-1c2d16b0-7456-409d-89c5-792026f08647-StreamThread-1] INFO  
o.a.k.c.c.i.ConsumerCoordinator                    - [Consumer 
clientId=app-1c2d16b0-7456-409d-89c5-792026f08647-StreamThread-1-consumer, 
groupId=app] Adding newly assigned partitions: 2021-08-18 11:37:23,711 

[jira] [Resolved] (KAFKA-8838) Allow consumer group tool to work with non-existing consumer groups

2019-09-12 Thread Patrik Kleindl (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Patrik Kleindl resolved KAFKA-8838.
---
Resolution: Not A Problem

> Allow consumer group tool to work with non-existing consumer groups
> ---
>
> Key: KAFKA-8838
> URL: https://issues.apache.org/jira/browse/KAFKA-8838
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Patrik Kleindl
>Priority: Minor
>
> The streams application reset tool works for non-existing consumer groups and 
> allows to "pre-set" offsets before a new deployment.
> The consumer group tool does not allow the same which would be a nice 
> enhancement.
> If this should work and the NullPointerException is not expected this can be 
> converted to a bug.
>  
> {code:java}
> ./kafka-consumer-groups --bootstrap-server broker:9092 --group applicationId 
> --reset-offsets --by-duration P60D --topic topic1 --executeError: Executing 
> consumer group command failed due to nulljava.lang.NullPointerException at 
> scala.collection.convert.Wrappers$JListWrapper.iterator(Wrappers.scala:88) at 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$$anonfun$parseTopicPartitionsToReset$1.apply(ConsumerGroupCommand.scala:477)
>  at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$$anonfun$parseTopicPartitionsToReset$1.apply(ConsumerGroupCommand.scala:471)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at 
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService.parseTopicPartitionsToReset(ConsumerGroupCommand.scala:471)
>  at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService.getPartitionsToReset(ConsumerGroupCommand.scala:486)
>  at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService.resetOffsets(ConsumerGroupCommand.scala:310)
>  at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:64) at 
> kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala){code}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (KAFKA-8838) Allow consumer group tool to work with non-existing consumer groups

2019-08-27 Thread Patrik Kleindl (Jira)
Patrik Kleindl created KAFKA-8838:
-

 Summary: Allow consumer group tool to work with non-existing 
consumer groups
 Key: KAFKA-8838
 URL: https://issues.apache.org/jira/browse/KAFKA-8838
 Project: Kafka
  Issue Type: Improvement
  Components: tools
Reporter: Patrik Kleindl


The streams application reset tool works for non-existing consumer groups and 
allows to "pre-set" offsets before a new deployment.

The consumer group tool does not allow the same which would be a nice 
enhancement.

If this should work and the NullPointerException is not expected this can be 
converted to a bug.

 
{code:java}
./kafka-consumer-groups --bootstrap-server broker:9092 --group applicationId 
--reset-offsets --by-duration P60D --topic topic1 --executeError: Executing 
consumer group command failed due to nulljava.lang.NullPointerException at 
scala.collection.convert.Wrappers$JListWrapper.iterator(Wrappers.scala:88) at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
kafka.admin.ConsumerGroupCommand$ConsumerGroupService$$anonfun$parseTopicPartitionsToReset$1.apply(ConsumerGroupCommand.scala:477)
 at 
kafka.admin.ConsumerGroupCommand$ConsumerGroupService$$anonfun$parseTopicPartitionsToReset$1.apply(ConsumerGroupCommand.scala:471)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at scala.collection.Iterator$class.foreach(Iterator.scala:891) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at 
scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at 
kafka.admin.ConsumerGroupCommand$ConsumerGroupService.parseTopicPartitionsToReset(ConsumerGroupCommand.scala:471)
 at 
kafka.admin.ConsumerGroupCommand$ConsumerGroupService.getPartitionsToReset(ConsumerGroupCommand.scala:486)
 at 
kafka.admin.ConsumerGroupCommand$ConsumerGroupService.resetOffsets(ConsumerGroupCommand.scala:310)
 at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:64) at 
kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala){code}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (KAFKA-8766) Allow a custom offset policy for Kafka Streams applications

2019-08-07 Thread Patrik Kleindl (JIRA)
Patrik Kleindl created KAFKA-8766:
-

 Summary: Allow a custom offset policy for Kafka Streams 
applications 
 Key: KAFKA-8766
 URL: https://issues.apache.org/jira/browse/KAFKA-8766
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Patrik Kleindl


Currently when starting a new streams application (= new consumer group) you 
can only choose between starting from the beginning of all topics or only 
processing newly arriving records.

To start processing at any give point in the past (e.g. only processing data of 
the last month) the application has to be started (so the consumer group 
exists), stopped, the offsets reset and then restarted.

It would be helpful if this could be passed in with the help of some kind of 
"offset reset strategy" which could be provided by the user.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Resolved] (KAFKA-8093) Fix JavaDoc markup

2019-05-23 Thread Patrik Kleindl (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Patrik Kleindl resolved KAFKA-8093.
---
Resolution: Fixed

Fixed by other commits, see PR for discussion.

> Fix JavaDoc markup
> --
>
> Key: KAFKA-8093
> URL: https://issues.apache.org/jira/browse/KAFKA-8093
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Matthias J. Sax
>Assignee: Patrik Kleindl
>Priority: Trivial
>
> Running `./gradlew install` gives the following warning
> {code:java}
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java:87:
>  warning - Tag @link: reference not found: java.nio.channels.Selector
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java:87:
>  warning - Tag @link: reference not found: 
> java.nio.channels.Selector#wakeup() wakeup()
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java:34:
>  warning - Tag @link: reference not found: 
> org.apache.kafka.clients.producer.ProducerRecord ProducerRecord
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261:
>  warning - @Header is an unknown tag.
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261:
>  warning - @Header is an unknown tag.
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261:
>  warning - @Header is an unknown tag.
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261:
>  warning - @Header is an unknown tag.
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261:
>  warning - @Header is an unknown tag.
> /Users/matthias/IdeaProjects/kafka/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java:261:
>  warning - @Header is an unknown tag.
> {code}
> Those should be fixed



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8200) TopologyTestDriver should offer an iterable signature of readOutput

2019-04-26 Thread Patrik Kleindl (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Patrik Kleindl resolved KAFKA-8200.
---
Resolution: Won't Do

Discarded in favor of 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-456%3A+Helper+classes+to+make+it+simpler+to+write+test+logic+with+TopologyTestDriver]

> TopologyTestDriver should offer an iterable signature of readOutput
> ---
>
> Key: KAFKA-8200
> URL: https://issues.apache.org/jira/browse/KAFKA-8200
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michael Drogalis
>Assignee: Patrik Kleindl
>Priority: Minor
>  Labels: needs-kip
>
> When using the TopologyTestDriver, one examines the output on a topic with 
> the readOutput method. This method returns one record at a time, until no 
> more records can be found, at which point in returns null.
> Many times, the usage pattern around readOutput will involve writing a loop 
> to extract a number of records from the topic, building up a list of records, 
> until it returns null.
> It would be helpful to offer an iterable signature of readOutput, which 
> returns either an iterator or list over the records that are currently 
> available in the topic. This would effectively remove the loop that a user 
> needs to write for him/herself each time.
> Such a signature might look like:
> {code:java}
> public Iterable> readOutput(java.lang.String 
> topic);
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8024) UtilsTest.testFormatBytes fails with german locale

2019-03-01 Thread Patrik Kleindl (JIRA)
Patrik Kleindl created KAFKA-8024:
-

 Summary: UtilsTest.testFormatBytes fails with german locale
 Key: KAFKA-8024
 URL: https://issues.apache.org/jira/browse/KAFKA-8024
 Project: Kafka
  Issue Type: Bug
Reporter: Patrik Kleindl


The unit test fails when the default locale is not English (in my case, deAT)

assertEquals("1.1 MB", formatBytes((long) (1.1 * 1024 * 1024)));

 
org.apache.kafka.common.utils.UtilsTest > testFormatBytes FAILED
    org.junit.ComparisonFailure: expected:<1[.]1 MB> but was:<1[,]1 MB>
        at org.junit.Assert.assertEquals(Assert.java:115)
        at org.junit.Assert.assertEquals(Assert.java:144)
        at 
org.apache.kafka.common.utils.UtilsTest.testFormatBytes(UtilsTest.java:106)
 

The easiest fix in this case should be adding
{code:java}
jvmArgs '-Duser.language=en -Duser.country=US'{code}
to the test configuration 

[https://github.com/apache/kafka/blob/b03e8c234a8aeecd10c2c96b683cfb39b24b548a/build.gradle#L270]

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8023) Improve global state store restoration by using multiple update threads

2019-03-01 Thread Patrik Kleindl (JIRA)
Patrik Kleindl created KAFKA-8023:
-

 Summary: Improve global state store restoration by using multiple 
update threads
 Key: KAFKA-8023
 URL: https://issues.apache.org/jira/browse/KAFKA-8023
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Patrik Kleindl


Currently global state stores are restored sequentially and the partitions of 
each global state store are restored sequentially too.

Loop over stores:

https://github.com/apache/kafka/blob/b03e8c234a8aeecd10c2c96b683cfb39b24b548a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L155

Loop over partitions:

https://github.com/apache/kafka/blob/b03e8c234a8aeecd10c2c96b683cfb39b24b548a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L256

It would be a great improvement if one or both of those loops could be 
processed in parallel.

Possible related task is https://issues.apache.org/jira/browse/KAFKA-6721 

Mail discussion: 
https://lists.apache.org/thread.html/6fc4772eb8635c04b0ee6682003a99a5ef37ebccffea6c89752e96b1@%3Cusers.kafka.apache.org%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7996) KafkaStreams does not pass timeout when closing Producer

2019-02-25 Thread Patrik Kleindl (JIRA)
Patrik Kleindl created KAFKA-7996:
-

 Summary: KafkaStreams does not pass timeout when closing Producer
 Key: KAFKA-7996
 URL: https://issues.apache.org/jira/browse/KAFKA-7996
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.1.0
Reporter: Patrik Kleindl


[https://confluentcommunity.slack.com/messages/C48AHTCUQ/convo/C48AHTCUQ-1550831721.026100/]

We are running 2.1 and have a case where the shutdown of a streams application 
takes several minutes
I noticed that although we call streams.close with a timeout of 30 seconds the 
log says
[Producer 
clientId=…-8be49feb-8a2e-4088-bdd7-3c197f6107bb-StreamThread-1-producer] 
Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.

Matthias J Sax [vor 3 Tagen]
I just checked the code, and yes, we don't provide a timeout for the producer 
on close()...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7660) Stream Metrics - Memory Analysis

2018-11-20 Thread Patrik Kleindl (JIRA)
Patrik Kleindl created KAFKA-7660:
-

 Summary: Stream Metrics - Memory Analysis
 Key: KAFKA-7660
 URL: https://issues.apache.org/jira/browse/KAFKA-7660
 Project: Kafka
  Issue Type: Bug
  Components: metrics, streams
Affects Versions: 2.0.0
Reporter: Patrik Kleindl
 Attachments: Mem_Collections.jpeg, Mem_DuplicateStrings.jpeg, 
Mem_DuplicateStrings2.jpeg, Mem_Hotspots.jpeg, Mem_KeepAliveSet.jpeg, 
Mem_References.jpeg

During the analysis of JVM memory two possible issues were shown which I would 
like to bring to your attention:
1) Duplicate strings
Top findings: 
string_content="stream-processor-node-metrics" count="534,277"
string_content="processor-node-id" count="148,437"
string_content="stream-rocksdb-state-metrics" count="41,832"
string_content="punctuate-latency-avg" count="29,681" 
 
"stream-processor-node-metrics"  seems to be used in Sensors.java as a literal 
and not interned.
 
2) The HashMap parentSensors from 
org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
 was reported multiple times as suspicious for potentially keeping alive a lot 
of objects. In our case the reported size was 40-50MB each.
I haven't looked too deep in the code but noticed that the class Sensor.java 
which is used as a key in the HashMap does not implement equals or hashCode 
method. Not sure this is a problem though.
 
The analysis was done with Dynatrace 7.0
We are running Confluent 5.0/Kafka2.0-cp1 (Brokers as well as Clients)
 
Screenshots are attached.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7332) Improve error message when trying to produce message without key for compacted topic

2018-08-23 Thread Patrik Kleindl (JIRA)
Patrik Kleindl created KAFKA-7332:
-

 Summary: Improve error message when trying to produce message 
without key for compacted topic
 Key: KAFKA-7332
 URL: https://issues.apache.org/jira/browse/KAFKA-7332
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 1.1.0
Reporter: Patrik Kleindl


Goal:

Return a specific error message like e.g. "Message without a key is not valid 
for a compacted topic" when trying to produce such a message instead of a 
CorruptRecordException.

 

> Yesterday we had the following exception:
> 
> Exception thrown when sending a message with key='null' and payload='...'
> to topic sometopic:: org.apache.kafka.common.errors.CorruptRecordException:
> This message has failed its CRC checksum, exceeds the valid size, or is
> otherwise corrupt.
> 
> The cause was identified with the help of
> 
>[https://stackoverflow.com/questions/49098274/kafka-stream-get-corruptrecordexception]
> 
> Is it possible / would it makes sense to open an issue to improve the error
> message for this case?
> A simple "Message without a key is not valid for a compacted topic" would
> suffice and point a user  in the right direction.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7281) Fix documentation and error message regarding cleanup.policy=[compact,delete]

2018-08-13 Thread Patrik Kleindl (JIRA)
Patrik Kleindl created KAFKA-7281:
-

 Summary: Fix documentation and error message regarding 
cleanup.policy=[compact,delete]
 Key: KAFKA-7281
 URL: https://issues.apache.org/jira/browse/KAFKA-7281
 Project: Kafka
  Issue Type: Task
  Components: config
Affects Versions: 1.1.0
Reporter: Patrik Kleindl


Issue as requested in: 
https://lists.apache.org/thread.html/621821e321b9ae5a8af623f5918edc4ceee564e0561009317fc705af@%3Cusers.kafka.apache.org%3E

1) The documentation at [https://kafka.apache.org/documentation/] is missing 
the updated information regarding the "compact,delete" cleanup policy on topic 
level.
log.cleanup.policy on broker level
The default cleanup policy for segments beyond the retention window. A comma 
separated list of valid policies. Valid policies are: "delete" and "compact"
cleanup.policy on topic level
A string that is either "delete" or "compact".
 
2) Also the special notation for the command-line client should be noted:
./kafka-configs  --zookeeper broker0:2181 --alter --entity-type topics 
--entity-name test --add-config cleanup.policy=[compact,delete]

Completed Updating config for entity: topic 'test'.

3) The config command does not show this new notation in the error message:

./kafka-configs  --zookeeper broker0:2181 --alter --entity-type topics 
--entity-name test --add-config cleanup.policy=test

Error while executing config command with args '--zookeeper broker0:2181 
--alter --entity-type topics --entity-name test --add-config 
cleanup.policy=test'

org.apache.kafka.common.config.ConfigException: Invalid value test for 
configuration cleanup.policy: String must be one of: compact, delete

 at 
org.apache.kafka.common.config.ConfigDef$ValidString.ensureValid(ConfigDef.java:930)

 at 
org.apache.kafka.common.config.ConfigDef$ValidList.ensureValid(ConfigDef.java:906)

 at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:478)

 at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:462)

 at kafka.log.LogConfig$.validate(LogConfig.scala:299)

 at kafka.zk.AdminZkClient.validateTopicConfig(AdminZkClient.scala:336)

 at kafka.zk.AdminZkClient.changeTopicConfig(AdminZkClient.scala:348)

 at kafka.zk.AdminZkClient.changeConfigs(AdminZkClient.scala:285)

 at kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:133)

 at kafka.admin.ConfigCommand$.processCommandWithZk(ConfigCommand.scala:100)

 at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:77)

 at kafka.admin.ConfigCommand.main(ConfigCommand.scala)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)