[jira] [Resolved] (KAFKA-8365) Protocol and consumer support for follower fetching
[ https://issues.apache.org/jira/browse/KAFKA-8365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-8365. Resolution: Fixed > Protocol and consumer support for follower fetching > --- > > Key: KAFKA-8365 > URL: https://issues.apache.org/jira/browse/KAFKA-8365 > Project: Kafka > Issue Type: New Feature > Components: consumer >Reporter: David Arthur >Assignee: David Arthur >Priority: Major > Fix For: 2.3.0 > > > Add the consumer client changes and implement the protocol support for > [KIP-392|https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8365) Protocol and consumer support for follower fetching
[ https://issues.apache.org/jira/browse/KAFKA-8365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843092#comment-16843092 ] ASF GitHub Bot commented on KAFKA-8365: --- hachikuji commented on pull request #6731: KAFKA-8365 Consumer support for follower fetch URL: https://github.com/apache/kafka/pull/6731 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Protocol and consumer support for follower fetching > --- > > Key: KAFKA-8365 > URL: https://issues.apache.org/jira/browse/KAFKA-8365 > Project: Kafka > Issue Type: New Feature > Components: consumer >Reporter: David Arthur >Assignee: David Arthur >Priority: Major > Fix For: 2.3.0 > > > Add the consumer client changes and implement the protocol support for > [KIP-392|https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4834) Kafka cannot delete topic with ReplicaStateMachine went wrong
[ https://issues.apache.org/jira/browse/KAFKA-4834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16843082#comment-16843082 ] zhangxiong commented on KAFKA-4834: --- same problem found in production enviroment on version 0.10.0.1 > Kafka cannot delete topic with ReplicaStateMachine went wrong > - > > Key: KAFKA-4834 > URL: https://issues.apache.org/jira/browse/KAFKA-4834 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.10.1.1 >Reporter: Dan >Priority: Major > Labels: reliability > > It happened several times that some topics can not be deleted in our > production environment. By analyzing the log, we found ReplicaStateMachine > went wrong. Here are the error messages: > In state-change.log: > ERROR Controller 2 epoch 201 initiated state change of replica 1 for > partition [test_create_topic1,1] from OnlineReplica to ReplicaDeletionStarted > failed (state.change.logger) > java.lang.AssertionError: assertion failed: Replica > [Topic=test_create_topic1,Partition=1,Replica=1] should be in the > OfflineReplica states before moving to ReplicaDeletionStarted state. Instead > it is in OnlineReplica state > at scala.Predef$.assert(Predef.scala:179) > at > kafka.controller.ReplicaStateMachine.assertValidPreviousStates(ReplicaStateMachine.scala:309) > at > kafka.controller.ReplicaStateMachine.handleStateChange(ReplicaStateMachine.scala:190) > at > kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114) > at > kafka.controller.ReplicaStateMachine$$anonfun$handleStateChanges$2.apply(ReplicaStateMachine.scala:114) > at > scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153) > at > scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306) > at > kafka.controller.ReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:114) > at > kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:344) > at > kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2.apply(TopicDeletionManager.scala:334) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) > at > kafka.controller.TopicDeletionManager.startReplicaDeletion(TopicDeletionManager.scala:334) > at > kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onPartitionDeletion(TopicDeletionManager.scala:367) > at > kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:313) > at > kafka.controller.TopicDeletionManager$$anonfun$kafka$controller$TopicDeletionManager$$onTopicDeletion$2.apply(TopicDeletionManager.scala:312) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) > at > kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$onTopicDeletion(TopicDeletionManager.scala:312) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:431) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1$$anonfun$apply$mcV$sp$4.apply(TopicDeletionManager.scala:403) > at > scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153) > at > scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306) > at > scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply$mcV$sp(TopicDeletionManager.scala:403) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread$$anonfun$doWork$1.apply(TopicDeletionManager.scala:397) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234) > at > kafka.controller.TopicDeletionManager$DeleteTopicsThread.doWork(TopicDeletionManager.scala:397) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) > In controller.log: > INFO Leader not yet assigned for partition [test_create_topic1,1]. Skip > sending UpdateMetadataRequest. (kafka.controller.ControllerBrokerRequestBatch) > There may exist two controllers in the cluster because creating a new topic > may trigger two machines to change the state of same partition, eg. > NonExistentPartition -> NewPartition. > On the other controller, we found following messages in controller.log of > several days earlier: > [2017-02-25 16:51:22,353] INFO [Topic
[jira] [Updated] (KAFKA-8388) Add methods to query for entries in KTable using timestamp
[ https://issues.apache.org/jira/browse/KAFKA-8388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-8388: -- Description: Currently, in Kafka Streams, KTable API does not allow users to query for entries with a specific timestamp. The goal of this ticket is to allow a couple of things: * Retrieve entries in KTable that were inserted in the last {{K}} milliseconds where {{K}} could be determined by the user * Find key-value pairs which were put into the KTable in a specified time interval. KTable already has methods like {{filter}} and {{filterNot}}, but the information available to the user in these methods does not include the publish timestamp. A couple of things we could do to attack this issue: # Modify the current {{filter}} and {{filterNot}} methods so that the timestamp information is also available to the user for reference. # Add new methods which creates a new KTable based solely on timestamp. was: Currently, in Kafka Streams, KTable API does not allow users to query for entries with a specific timestamp. The goal of this ticket is to allow a couple of things: * Retrieve entries in KTable that were inserted in the last {{K}} milliseconds where {{K}} could be determined by the user * Find key-value pairs which were put into the KTable in a specified time interval. KTable already has methods like {{filter}} and {{filterNot}}, but the information given to the user does not include the publish timestamp. A couple of things we could do to attack this issue: # Modify the current {{filter}} and {{filterNot}} methods so that the timestamp information is also available to the user for reference. # Add new methods which creates a new KTable based solely on timestamp. > Add methods to query for entries in KTable using timestamp > -- > > Key: KAFKA-8388 > URL: https://issues.apache.org/jira/browse/KAFKA-8388 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Richard Yu >Priority: Major > Labels: needs-kip > > Currently, in Kafka Streams, KTable API does not allow users to query for > entries with a specific timestamp. The goal of this ticket is to allow a > couple of things: > * Retrieve entries in KTable that were inserted in the last {{K}} > milliseconds where {{K}} could be determined by the user > * Find key-value pairs which were put into the KTable in a specified time > interval. > KTable already has methods like {{filter}} and {{filterNot}}, but the > information available to the user in these methods does not include the > publish timestamp. > A couple of things we could do to attack this issue: > # Modify the current {{filter}} and {{filterNot}} methods so that the > timestamp information is also available to the user for reference. > # Add new methods which creates a new KTable based solely on timestamp. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8388) Add methods to query for entries in KTable using timestamp
[ https://issues.apache.org/jira/browse/KAFKA-8388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-8388: -- Description: Currently, in Kafka Streams, KTable API does not allow users to query for entries with a specific timestamp. The goal of this ticket is to allow a couple of things: * Retrieve entries in KTable that were inserted in the last {{K}} milliseconds where {{K}} could be determined by the user * Find key-value pairs which were put into the KTable in a specified time interval. KTable already has methods like {{filter}} and {{filterNot}}, but the information given to the user does not include the publish timestamp. A couple of things we could do to attack this issue: # Modify the current {{filter}} and {{filterNot}} methods so that the timestamp information is also available to the user for reference. # Add new methods which creates a new KTable based solely on timestamp. was:Currently, in Kafka Streams, KTable API does not allow users to query for entries with a specific timestamp. The goal of this ticket is to allow users to > Add methods to query for entries in KTable using timestamp > -- > > Key: KAFKA-8388 > URL: https://issues.apache.org/jira/browse/KAFKA-8388 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Richard Yu >Priority: Major > Labels: needs-kip > > Currently, in Kafka Streams, KTable API does not allow users to query for > entries with a specific timestamp. The goal of this ticket is to allow a > couple of things: > * Retrieve entries in KTable that were inserted in the last {{K}} > milliseconds where {{K}} could be determined by the user > * Find key-value pairs which were put into the KTable in a specified time > interval. > KTable already has methods like {{filter}} and {{filterNot}}, but the > information given to the user does not include the publish timestamp. > A couple of things we could do to attack this issue: > # Modify the current {{filter}} and {{filterNot}} methods so that the > timestamp information is also available to the user for reference. > # Add new methods which creates a new KTable based solely on timestamp. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8389) Duplicated MockProcessorSupplier / MockProcessor in TopologyTestDriverTest
Guozhang Wang created KAFKA-8389: Summary: Duplicated MockProcessorSupplier / MockProcessor in TopologyTestDriverTest Key: KAFKA-8389 URL: https://issues.apache.org/jira/browse/KAFKA-8389 Project: Kafka Issue Type: Improvement Reporter: Guozhang Wang We have stand-alone classes of MockProcessorSupplier / MockProcessor classes, yet we have those in TopologyTestDriverTest as well. We should consider removing them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8388) Add methods to query for entries in KTable using timestamp
[ https://issues.apache.org/jira/browse/KAFKA-8388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-8388: -- Labels: needs-kip (was: ) > Add methods to query for entries in KTable using timestamp > -- > > Key: KAFKA-8388 > URL: https://issues.apache.org/jira/browse/KAFKA-8388 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Richard Yu >Priority: Major > Labels: needs-kip > > Currently, in Kafka Streams, KTable API does not allow users to query for > entries with a specific timestamp. The goal of this ticket is to allow users > to -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8388) Add methods to query for entries in KTable using timestamp
[ https://issues.apache.org/jira/browse/KAFKA-8388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu updated KAFKA-8388: -- Description: Currently, in Kafka Streams, KTable API does not allow users to query for entries with a specific timestamp. The goal of this ticket is to allow users to > Add methods to query for entries in KTable using timestamp > -- > > Key: KAFKA-8388 > URL: https://issues.apache.org/jira/browse/KAFKA-8388 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Richard Yu >Priority: Major > > Currently, in Kafka Streams, KTable API does not allow users to query for > entries with a specific timestamp. The goal of this ticket is to allow users > to -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8388) Add methods to query for entries in KTable using timestamp
Richard Yu created KAFKA-8388: - Summary: Add methods to query for entries in KTable using timestamp Key: KAFKA-8388 URL: https://issues.apache.org/jira/browse/KAFKA-8388 Project: Kafka Issue Type: Improvement Components: streams Reporter: Richard Yu -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8387) Add `Fenced` state to AbstractCoordinator
Boyang Chen created KAFKA-8387: -- Summary: Add `Fenced` state to AbstractCoordinator Key: KAFKA-8387 URL: https://issues.apache.org/jira/browse/KAFKA-8387 Project: Kafka Issue Type: Improvement Reporter: Boyang Chen Assignee: Boyang Chen Right now in some requests such as async commit or heartbeat could encounter fencing exception which should fail the consumer application entirely. It is better to track it within MemberState by adding a new `Fenced` stage so that the main thread could shutdown immediately. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6455) Improve timestamp propagation at DSL level
[ https://issues.apache.org/jira/browse/KAFKA-6455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842810#comment-16842810 ] ASF GitHub Bot commented on KAFKA-6455: --- bbejeck commented on pull request #6725: KAFKA-6455: Improve DSL operator timestamp semantics URL: https://github.com/apache/kafka/pull/6725 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve timestamp propagation at DSL level > -- > > Key: KAFKA-6455 > URL: https://issues.apache.org/jira/browse/KAFKA-6455 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Major > Labels: needs-kip > > At DSL level, we inherit the timestamp propagation "contract" from the > Processor API. This contract in not optimal at DSL level, and we should > define a DSL level contract that matches the semantics of the corresponding > DSL operator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8286) KIP-460 Admin Leader Election RPC
[ https://issues.apache.org/jira/browse/KAFKA-8286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio updated KAFKA-8286: -- Description: Tracking issue for implementing KIP-460. Tasks: # [Done] Design KIP # [Done] Review KIP # [Done] Approve KIP # [Done] Update RPC to support KIP # [Done] Update controller to support KIP # [Done] Create CLI command (kafka-leader-election) that implement KIP # [Done] Search and replace any usage of “preferred” in the code # Add test for command # [Done] Add test for controller functionality # Revisit all of the documentation - generate and audit the new javadocs # Deprecated since... needs to be update # Review PR # Merge PR # Update the KIP based on the latest implementation was: Tracking issue for implementing KIP-460. Tasks: # [Done] Design KIP # [Done] Review KIP # [Done] Approve KIP # [Done] Update RPC to support KIP # [Done] Update controller to support KIP # [Done] Create CLI command (kafka-leader-election) that implement KIP # [Done] Search and replace any usage of “preferred” in the code # Add test for command # [Done] Add test for controller functionality # Revisit all of the documentation - generate and audit the new javadocs # Deprecated since... needs to be update # Review PR # Merge PR > KIP-460 Admin Leader Election RPC > - > > Key: KAFKA-8286 > URL: https://issues.apache.org/jira/browse/KAFKA-8286 > Project: Kafka > Issue Type: New Feature > Components: admin, clients, core >Reporter: Jose Armando Garcia Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Major > > Tracking issue for implementing KIP-460. Tasks: > # [Done] Design KIP > # [Done] Review KIP > # [Done] Approve KIP > # [Done] Update RPC to support KIP > # [Done] Update controller to support KIP > # [Done] Create CLI command (kafka-leader-election) that implement KIP > # [Done] Search and replace any usage of “preferred” in the code > # Add test for command > # [Done] Add test for controller functionality > # Revisit all of the documentation - generate and audit the new javadocs > # Deprecated since... needs to be update > # Review PR > # Merge PR > # Update the KIP based on the latest implementation -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8367) Non-heap memory leak in Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842778#comment-16842778 ] Sophie Blee-Goldman edited comment on KAFKA-8367 at 5/17/19 11:11 PM: -- Bummer. Thanks for helping with the investigation – it'll be helpful to know if this affects 2.1 as well. Was your 2.0.1 app basically the same as your 2.2 app? Actually, would you be able to share your code? (No chance this is just an unclosed iterator pinning resources I assume) was (Author: ableegoldman): Bummer. Thanks for helping with the investigation – it'll be helpful to know if this affects 2.1 as well. Was your 2.0.1 app basically the same as your 2.2 app? No chance of a new iterator that isn't being closed causing this? > Non-heap memory leak in Kafka Streams > - > > Key: KAFKA-8367 > URL: https://issues.apache.org/jira/browse/KAFKA-8367 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.2.0 >Reporter: Pavel Savov >Priority: Major > Attachments: memory-prod.png, memory-test.png > > > We have been observing a non-heap memory leak after upgrading to Kafka > Streams 2.2.0 from 2.0.1. We suspect the source to be around RocksDB as the > leak only happens when we enable stateful stream operations (utilizing > stores). We are aware of *KAFKA-8323* and have created our own fork of 2.2.0 > and ported the fix scheduled for release in 2.2.1 to our fork. It did not > stop the leak, however. > We are having this memory leak in our production environment where the > consumer group is auto-scaled in and out in response to changes in traffic > volume, and in our test environment where we have two consumers, no > autoscaling and relatively constant traffic. > Below is some information I'm hoping will be of help: > * RocksDB Config: > Block cache size: 4 MiB > Write buffer size: 2 MiB > Block size: 16 KiB > Cache index and filter blocks: true > Manifest preallocation size: 64 KiB > Max write buffer number: 3 > Max open files: 6144 > > * Memory usage in production > The attached graph (memory-prod.png) shows memory consumption for each > instance as a separate line. The horizontal red line at 6 GiB is the memory > limit. > As illustrated on the attached graph from production, memory consumption in > running instances goes up around autoscaling events (scaling the consumer > group either in or out) and associated rebalancing. It stabilizes until the > next autoscaling event but it never goes back down. > An example of scaling out can be seen from around 21:00 hrs where three new > instances are started in response to a traffic spike. > Just after midnight traffic drops and some instances are shut down. Memory > consumption in the remaining running instances goes up. > Memory consumption climbs again from around 6:00AM due to increased traffic > and new instances are being started until around 10:30AM. Memory consumption > never drops until the cluster is restarted around 12:30. > > * Memory usage in test > As illustrated by the attached graph (memory-test.png) we have a fixed number > of two instances in our test environment and no autoscaling. Memory > consumption rises linearly until it reaches the limit (around 2:00 AM on > 5/13) and Mesos restarts the offending instances, or we restart the cluster > manually. > > * No heap leaks observed > * Window retention: 2 or 11 minutes (depending on operation type) > * Issue not present in Kafka Streams 2.0.1 > * No memory leak for stateless stream operations (when no RocksDB stores are > used) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3522) Consider adding version information into rocksDB storage format
[ https://issues.apache.org/jira/browse/KAFKA-3522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842789#comment-16842789 ] ASF GitHub Bot commented on KAFKA-3522: --- bbejeck commented on pull request #6756: KAFKA-3522: TopologyTestDriver should only return custom stores via untyped getStateStore() method URL: https://github.com/apache/kafka/pull/6756 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Consider adding version information into rocksDB storage format > --- > > Key: KAFKA-3522 > URL: https://issues.apache.org/jira/browse/KAFKA-3522 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Matthias J. Sax >Priority: Major > Labels: architecture > Fix For: 2.3.0 > > > Kafka Streams does not introduce any modifications to the data format in the > underlying Kafka protocol, but it does use RocksDB for persistent state > storage, and currently its data format is fixed and hard-coded. We want to > consider the evolution path in the future we we change the data format, and > hence having some version info stored along with the storage file / directory > would be useful. > And this information could be even out of the storage file; for example, we > can just use a small "version indicator" file in the rocksdb directory for > this purposes. Thoughts? [~enothereska] [~jkreps] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8367) Non-heap memory leak in Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842778#comment-16842778 ] Sophie Blee-Goldman commented on KAFKA-8367: Bummer. Thanks for helping with the investigation – it'll be helpful to know if this affects 2.1 as well. Was your 2.0.1 app basically the same as your 2.2 app? No chance of a new iterator that isn't being closed causing this? > Non-heap memory leak in Kafka Streams > - > > Key: KAFKA-8367 > URL: https://issues.apache.org/jira/browse/KAFKA-8367 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.2.0 >Reporter: Pavel Savov >Priority: Major > Attachments: memory-prod.png, memory-test.png > > > We have been observing a non-heap memory leak after upgrading to Kafka > Streams 2.2.0 from 2.0.1. We suspect the source to be around RocksDB as the > leak only happens when we enable stateful stream operations (utilizing > stores). We are aware of *KAFKA-8323* and have created our own fork of 2.2.0 > and ported the fix scheduled for release in 2.2.1 to our fork. It did not > stop the leak, however. > We are having this memory leak in our production environment where the > consumer group is auto-scaled in and out in response to changes in traffic > volume, and in our test environment where we have two consumers, no > autoscaling and relatively constant traffic. > Below is some information I'm hoping will be of help: > * RocksDB Config: > Block cache size: 4 MiB > Write buffer size: 2 MiB > Block size: 16 KiB > Cache index and filter blocks: true > Manifest preallocation size: 64 KiB > Max write buffer number: 3 > Max open files: 6144 > > * Memory usage in production > The attached graph (memory-prod.png) shows memory consumption for each > instance as a separate line. The horizontal red line at 6 GiB is the memory > limit. > As illustrated on the attached graph from production, memory consumption in > running instances goes up around autoscaling events (scaling the consumer > group either in or out) and associated rebalancing. It stabilizes until the > next autoscaling event but it never goes back down. > An example of scaling out can be seen from around 21:00 hrs where three new > instances are started in response to a traffic spike. > Just after midnight traffic drops and some instances are shut down. Memory > consumption in the remaining running instances goes up. > Memory consumption climbs again from around 6:00AM due to increased traffic > and new instances are being started until around 10:30AM. Memory consumption > never drops until the cluster is restarted around 12:30. > > * Memory usage in test > As illustrated by the attached graph (memory-test.png) we have a fixed number > of two instances in our test environment and no autoscaling. Memory > consumption rises linearly until it reaches the limit (around 2:00 AM on > 5/13) and Mesos restarts the offending instances, or we restart the cluster > manually. > > * No heap leaks observed > * Window retention: 2 or 11 minutes (depending on operation type) > * Issue not present in Kafka Streams 2.0.1 > * No memory leak for stateless stream operations (when no RocksDB stores are > used) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8386) Use COORDINATOR_NOT_AVAILABLE to replace UNKNOWN_MEMBER_ID when the group is not available
Boyang Chen created KAFKA-8386: -- Summary: Use COORDINATOR_NOT_AVAILABLE to replace UNKNOWN_MEMBER_ID when the group is not available Key: KAFKA-8386 URL: https://issues.apache.org/jira/browse/KAFKA-8386 Project: Kafka Issue Type: Sub-task Reporter: Boyang Chen Assignee: Boyang Chen When the group is dead or unavailable on the coordinator, current approach is to return `UNKNOWN_MEMBER_ID` to let the member reset generation and rejoin. It is not particularly safe for static members in this case, since resetting `member.id` could delay the detection for duplicate instance.id. Also considering the fact that group unavailability could mostly be caused by migration, it is favorable to trigger a coordinator rediscovery immediately than one more bounce. Thus, we decide to use `COORDINATOR_NOT_AVAILABLE` as top line citizen. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8346) Improve replica fetcher behavior in handling partition failures
[ https://issues.apache.org/jira/browse/KAFKA-8346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842762#comment-16842762 ] ASF GitHub Bot commented on KAFKA-8346: --- hachikuji commented on pull request #6716: KAFKA-8346; Improve replica fetcher behavior for handling partition failure URL: https://github.com/apache/kafka/pull/6716 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve replica fetcher behavior in handling partition failures > --- > > Key: KAFKA-8346 > URL: https://issues.apache.org/jira/browse/KAFKA-8346 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Aishwarya Gune >Assignee: Aishwarya Gune >Priority: Major > > The replica fetcher thread terminates in case one of the partitions being > monitored fails. This leads to under-replicated partitions. The thread > behavior can be improved by dropping that particular partition and continuing > with the rest of the partitions. > KIP-461: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-461+-+Improve+Replica+Fetcher+behavior+at+handling+partition+failure] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8324) User constructed RocksObjects leak memory
[ https://issues.apache.org/jira/browse/KAFKA-8324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-8324: --- Affects Version/s: 0.10.1.0 > User constructed RocksObjects leak memory > - > > Key: KAFKA-8324 > URL: https://issues.apache.org/jira/browse/KAFKA-8324 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.0 >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > Labels: kip > Fix For: 2.3.0 > > > Some of the RocksDB options a user can set when extending RocksDBConfigSetter > take Rocks objects as parameters. Many of these – including potentially large > objects like Cache and Filter – inherit from AbstractNativeReference and must > be closed explicitly in order to free the memory of the backing C++ object. > However the user has no way of closing any objects they have created in > RocksDBConfigSetter, and we do not ever close them for them. > KIP-453: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-453%3A+Add+close%28%29+method+to+RocksDBConfigSetter] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-1983) TestEndToEndLatency can be unreliable after hard kill
[ https://issues.apache.org/jira/browse/KAFKA-1983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Grayson Chao reassigned KAFKA-1983: --- Assignee: (was: Grayson Chao) > TestEndToEndLatency can be unreliable after hard kill > - > > Key: KAFKA-1983 > URL: https://issues.apache.org/jira/browse/KAFKA-1983 > Project: Kafka > Issue Type: Improvement >Reporter: Jun Rao >Priority: Major > Labels: newbie > > If you hard kill TestEndToEndLatency, the committed offset remains the last > checkpointed one. However, more messages are now appended after the last > checkpointed offset. When restarting TestEndToEndLatency, the consumer > resumes from the last checkpointed offset and will report really low latency > since it doesn't need to wait for a new message to be produced to read the > next message. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8347) Choose next record to process by timestamp
[ https://issues.apache.org/jira/browse/KAFKA-8347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-8347: --- Fix Version/s: 2.2.1 2.1.2 > Choose next record to process by timestamp > -- > > Key: KAFKA-8347 > URL: https://issues.apache.org/jira/browse/KAFKA-8347 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.1.0 >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.3.0, 2.1.2, 2.2.1 > > > Currently PartitionGroup will determine the next record to process by > choosing the partition with the lowest stream time. However if a partition > contains out of order data its stream time may be significantly larger than > the timestamp of the next record. The next record should instead be chosen as > the record with the lowest timestamp across all partitions, regardless of > which partition it comes from or what its partition time is. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8385) Fix leader election RPC for all partition so that only partition that had elections are returned
Jose Armando Garcia Sancio created KAFKA-8385: - Summary: Fix leader election RPC for all partition so that only partition that had elections are returned Key: KAFKA-8385 URL: https://issues.apache.org/jira/browse/KAFKA-8385 Project: Kafka Issue Type: Task Components: core Reporter: Jose Armando Garcia Sancio Assignee: Jose Armando Garcia Sancio Fix For: 2.3.0 Currently the elect leaders RPC returns all the partitions when election across all of the partition is request even if some of the partitions already have a leader (for unclean) or a preferred leader (for preferred). Change this behavior so that only partitions that changed leader are returned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8384) Write integration tests for leader election command
Jose Armando Garcia Sancio created KAFKA-8384: - Summary: Write integration tests for leader election command Key: KAFKA-8384 URL: https://issues.apache.org/jira/browse/KAFKA-8384 Project: Kafka Issue Type: Task Components: core Reporter: Jose Armando Garcia Sancio Assignee: Jose Armando Garcia Sancio Fix For: 2.3.0 Add integration tests for leader election command. Some cases that we should cover: # Topic doesn't exists => UNKNOWN_TOPIC_OR_PARTITION # Unclean/preferred election not needed => ELECTION_NOT_NEEDED # Unclean/preferred election not possible => ELIGIBLE_LEADERS_NOT_AVAILABLE # Election succeeded => NONE # All partition (partitions==null) => NONE for all performed election. The result map should only contain NONE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8383) Write integration test for electLeaders
Jose Armando Garcia Sancio created KAFKA-8383: - Summary: Write integration test for electLeaders Key: KAFKA-8383 URL: https://issues.apache.org/jira/browse/KAFKA-8383 Project: Kafka Issue Type: Task Components: core Reporter: Jose Armando Garcia Sancio Assignee: Jose Armando Garcia Sancio Fix For: 2.3.0 Add tests for electLeaders in AdminClientIntegrationTest. Some cases that we should cover: # Topic doesn't exists => UNKNOWN_TOPIC_OR_PARTITION # Unclean/preferred election not needed => ELECTION_NOT_NEEDED # Unclean/preferred election not possible => ELIGIBLE_LEADERS_NOT_AVAILABLE # Election succeeded => NONE # All partition (partitions==null) => NONE for all performed election. The result map should only contain NONE. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8347) Choose next record to process by timestamp
[ https://issues.apache.org/jira/browse/KAFKA-8347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842620#comment-16842620 ] Bill Bejeck commented on KAFKA-8347: cherry-picked to 2.2 and 2.1 > Choose next record to process by timestamp > -- > > Key: KAFKA-8347 > URL: https://issues.apache.org/jira/browse/KAFKA-8347 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.1.0 >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.3.0 > > > Currently PartitionGroup will determine the next record to process by > choosing the partition with the lowest stream time. However if a partition > contains out of order data its stream time may be significantly larger than > the timestamp of the next record. The next record should instead be chosen as > the record with the lowest timestamp across all partitions, regardless of > which partition it comes from or what its partition time is. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8315) Historical join issues
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842568#comment-16842568 ] Matthias J. Sax commented on KAFKA-8315: Agreed. As mentioned above: https://issues.apache.org/jira/browse/KAFKA-8315?focusedCommentId=16841378=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16841378 It's a known issue tracked as KAFKA-7458 > Historical join issues > -- > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The problem we are experiencing is that we cannot reliably perform simple > joins over pre-populated kafka topics. This seems more apparent where one > topic has records at less frequent record timestamp intervals that the other. > An example of the issue is provided in this repository : > [https://github.com/the4thamigo-uk/join-example] > > The only way to increase the period of historically joined records is to > increase the grace period for the join windows, and this has repercussions > when you extend it to a large period e.g. 2 years of minute-by-minute records. > Related slack conversations : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1557733979453900] > > Research on this issue has gone through a few phases : > 1) This issue was initially thought to be due to the inability to set the > retention period for a join window via {{Materialized: i.e.}} > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > This was considered to be a problem with the documentation not with the API > and is addressed in [https://github.com/apache/kafka/pull/6664] > 2) We then found an apparent issue in the code which would affect the > partition that is selected to deliver the next record to the join. This would > only be a problem for data that is out-of-order, and join-example uses data > that is in order of timestamp in both topics. So this fix is thought not to > affect join-example. > This was considered to be an issue and is being addressed in > [https://github.com/apache/kafka/pull/6719] > 3) Further investigation using a crafted unit test seems to show that the > partition-selection and ordering (PartitionGroup/RecordQueue) seems to work ok > [https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b] > 4) the current assumption is that the issue is rooted in the way records are > consumed from the topics : > We have tried to set various options to suppress reads form the source topics > but it doesnt seem to make any difference : > [https://github.com/the4thamigo-uk/join-example/commit/c674977fd0fdc689152695065d9277abea6bef63] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8315) Historical join issues
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842566#comment-16842566 ] John Roesler commented on KAFKA-8315: - The existing behavior of the consumer is effectively to round-robin the inputs. If you're subscribed to A,B, and C, and you request 20 records, and it gets 15 records from A and 5 from B, then the next time around, it should give you maybe 10 more from B and then 10 from C. I actually think the problem might just be on startup (but would need to verify), since we have no visibility into which partitions have been polled at all. After startup, the Consumer behavior in addition to the existing pause logic should take care of preferring to poll partitions that are empty. If a partition is actually empty (we are caught up), then this is what the max.idle.time is for. But this ticket seems different, since we never even tried to poll all the inputs before starting work on just one side of the join Or maybe I'm not thinking about it clearly. The point is, as a functional requirement, it seems like historical joins should function properly even with a zero idle time. > Historical join issues > -- > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The problem we are experiencing is that we cannot reliably perform simple > joins over pre-populated kafka topics. This seems more apparent where one > topic has records at less frequent record timestamp intervals that the other. > An example of the issue is provided in this repository : > [https://github.com/the4thamigo-uk/join-example] > > The only way to increase the period of historically joined records is to > increase the grace period for the join windows, and this has repercussions > when you extend it to a large period e.g. 2 years of minute-by-minute records. > Related slack conversations : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1557733979453900] > > Research on this issue has gone through a few phases : > 1) This issue was initially thought to be due to the inability to set the > retention period for a join window via {{Materialized: i.e.}} > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > This was considered to be a problem with the documentation not with the API > and is addressed in [https://github.com/apache/kafka/pull/6664] > 2) We then found an apparent issue in the code which would affect the > partition that is selected to deliver the next record to the join. This would > only be a problem for data that is out-of-order, and join-example uses data > that is in order of timestamp in both topics. So this fix is thought not to > affect join-example. > This was considered to be an issue and is being addressed in > [https://github.com/apache/kafka/pull/6719] > 3) Further investigation using a crafted unit test seems to show that the > partition-selection and ordering (PartitionGroup/RecordQueue) seems to work ok > [https://github.com/the4thamigo-uk/kafka/commit/5121851491f2fd0471d8f3c49940175e23a26f1b] > 4) the current assumption is that the issue is rooted in the way records are > consumed from the topics : > We have tried to set various options to suppress reads form the source topics > but it doesnt seem to make any difference : > [https://github.com/the4thamigo-uk/join-example/commit/c674977fd0fdc689152695065d9277abea6bef63] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-8215) Limit memory usage of RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-8215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-8215: -- Assignee: Sophie Blee-Goldman (was: Bill Bejeck) > Limit memory usage of RocksDB > - > > Key: KAFKA-8215 > URL: https://issues.apache.org/jira/browse/KAFKA-8215 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.3.0 > > > The memory usage of Streams is currently unbounded in part because of > RocksDB, which consumes memory on a per-instance basis. Each instance (ie > each persistent state store) will have its own write buffer, index blocks, > and block cache. The size of these can be configured individually, but there > is currently no way for a Streams app to limit the total memory available > across instances. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-8215) Limit memory usage of RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-8215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-8215: -- Assignee: Bill Bejeck > Limit memory usage of RocksDB > - > > Key: KAFKA-8215 > URL: https://issues.apache.org/jira/browse/KAFKA-8215 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sophie Blee-Goldman >Assignee: Bill Bejeck >Priority: Major > Fix For: 2.3.0 > > > The memory usage of Streams is currently unbounded in part because of > RocksDB, which consumes memory on a per-instance basis. Each instance (ie > each persistent state store) will have its own write buffer, index blocks, > and block cache. The size of these can be configured individually, but there > is currently no way for a Streams app to limit the total memory available > across instances. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8215) Limit memory usage of RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-8215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-8215: --- Fix Version/s: 2.3.0 > Limit memory usage of RocksDB > - > > Key: KAFKA-8215 > URL: https://issues.apache.org/jira/browse/KAFKA-8215 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Major > Fix For: 2.3.0 > > > The memory usage of Streams is currently unbounded in part because of > RocksDB, which consumes memory on a per-instance basis. Each instance (ie > each persistent state store) will have its own write buffer, index blocks, > and block cache. The size of these can be configured individually, but there > is currently no way for a Streams app to limit the total memory available > across instances. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8215) Limit memory usage of RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-8215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman resolved KAFKA-8215. Resolution: Fixed > Limit memory usage of RocksDB > - > > Key: KAFKA-8215 > URL: https://issues.apache.org/jira/browse/KAFKA-8215 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Major > Fix For: 2.3.0 > > > The memory usage of Streams is currently unbounded in part because of > RocksDB, which consumes memory on a per-instance basis. Each instance (ie > each persistent state store) will have its own write buffer, index blocks, > and block cache. The size of these can be configured individually, but there > is currently no way for a Streams app to limit the total memory available > across instances. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8215) Limit memory usage of RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-8215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842558#comment-16842558 ] ASF GitHub Bot commented on KAFKA-8215: --- bbejeck commented on pull request #6743: KAFKA-8215: Upgrade Rocks to v5.18.3 URL: https://github.com/apache/kafka/pull/6743 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Limit memory usage of RocksDB > - > > Key: KAFKA-8215 > URL: https://issues.apache.org/jira/browse/KAFKA-8215 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Major > > The memory usage of Streams is currently unbounded in part because of > RocksDB, which consumes memory on a per-instance basis. Each instance (ie > each persistent state store) will have its own write buffer, index blocks, > and block cache. The size of these can be configured individually, but there > is currently no way for a Streams app to limit the total memory available > across instances. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8354) Replace SyncGroup request/response with automated protocol
[ https://issues.apache.org/jira/browse/KAFKA-8354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-8354. Resolution: Fixed > Replace SyncGroup request/response with automated protocol > -- > > Key: KAFKA-8354 > URL: https://issues.apache.org/jira/browse/KAFKA-8354 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8347) Choose next record to process by timestamp
[ https://issues.apache.org/jira/browse/KAFKA-8347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8347: --- Affects Version/s: (was: 2.2.0) > Choose next record to process by timestamp > -- > > Key: KAFKA-8347 > URL: https://issues.apache.org/jira/browse/KAFKA-8347 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.1.0 >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.3.0 > > > Currently PartitionGroup will determine the next record to process by > choosing the partition with the lowest stream time. However if a partition > contains out of order data its stream time may be significantly larger than > the timestamp of the next record. The next record should instead be chosen as > the record with the lowest timestamp across all partitions, regardless of > which partition it comes from or what its partition time is. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8347) Choose next record to process by timestamp
[ https://issues.apache.org/jira/browse/KAFKA-8347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-8347: --- Affects Version/s: (was: 2.1.1) 2.1.0 > Choose next record to process by timestamp > -- > > Key: KAFKA-8347 > URL: https://issues.apache.org/jira/browse/KAFKA-8347 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.1.0, 2.2.0 >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.3.0 > > > Currently PartitionGroup will determine the next record to process by > choosing the partition with the lowest stream time. However if a partition > contains out of order data its stream time may be significantly larger than > the timestamp of the next record. The next record should instead be chosen as > the record with the lowest timestamp across all partitions, regardless of > which partition it comes from or what its partition time is. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8052) Intermittent INVALID_FETCH_SESSION_EPOCH error on FETCH request
[ https://issues.apache.org/jira/browse/KAFKA-8052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842386#comment-16842386 ] Andrew Olson commented on KAFKA-8052: - [~rsivaram] There are some comments on your pull request, as well as a merge conflict that needs resolved. > Intermittent INVALID_FETCH_SESSION_EPOCH error on FETCH request > > > Key: KAFKA-8052 > URL: https://issues.apache.org/jira/browse/KAFKA-8052 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.1.0 >Reporter: Bartek Jakub >Assignee: Rajini Sivaram >Priority: Major > > I noticed in my logs some weird behavior. I see in logs intermittent log: > {noformat} > 2019-03-06 14:02:13.024 INFO 1 --- [container-1-C-1] > o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=consumer-4, > groupId=service-main] Node 2 was unable to process the fetch request with > (sessionId=1321134604, epoch=125730): INVALID_FETCH_SESSION_EPOCH.{noformat} > which happens every ~1 hour. > > I was wondering if it's my Kafka provider fault so I decided to investigate > the problem and I tried to reproduce the issue on my local - with success. My > configuration is: > * Kafka Clients version - 2.0.1 > * Kafka - 2.12_2.1.0 > > I enabled trace logs for 'org.apache.kafka.clients' and that's what I get: > {noformat} > 2019-03-05 21:04:16.161 DEBUG 3052 --- [container-0-C-1] > o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=consumer-3, > groupId=service-main] Built incremental fetch (sessionId=197970881, > epoch=525) for node 1001. Added (), altered (), removed () out of > (itunes-command-19, itunes-command-18, itunes-command-11, itunes-command-10, > itunes-command-13, itunes-command-12, itunes-command-15, itunes-command-14, > itunes-command-17, itunes-command-16) > 2019-03-05 21:04:16.161 DEBUG 3052 --- [container-0-C-1] > o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-3, > groupId=service-main] Sending READ_UNCOMMITTED > IncrementalFetchRequest(toSend=(), toForget=(), implied=(itunes-command-19, > itunes-command-18, itunes-command-11, itunes-command-10, itunes-command-13, > itunes-command-12, itunes-command-15, itunes-command-14, itunes-command-17, > itunes-command-16)) to broker localhost:9092 (id: 1001 rack: null) > 2019-03-05 21:04:16.161 TRACE 3052 --- [container-0-C-1] > org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-3, > groupId=service-main] Sending FETCH > {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=197970881,epoch=525,topics=[],forgotten_topics_data=[]} > with correlation id 629 to node 1001 > 2019-03-05 21:04:16.664 TRACE 3052 --- [container-0-C-1] > org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-3, > groupId=service-main] Completed receive from node 1001 for FETCH with > correlation id 629, received > {throttle_time_ms=0,error_code=0,session_id=197970881,responses=[]} > 2019-03-05 21:04:16.664 DEBUG 3052 --- [container-0-C-1] > o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=consumer-3, > groupId=service-main] Node 1001 sent an incremental fetch response for > session 197970881 with response=(), implied=(itunes-command-19, > itunes-command-18, itunes-command-11, itunes-command-10, itunes-command-13, > itunes-command-12, itunes-command-15, itunes-command-14, itunes-command-17, > itunes-command-16) > 2019-03-05 21:04:16.665 DEBUG 3052 --- [container-0-C-1] > o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=consumer-3, > groupId=service-main] Built incremental fetch (sessionId=197970881, > epoch=526) for node 1001. Added (), altered (), removed () out of > (itunes-command-19, itunes-command-18, itunes-command-11, itunes-command-10, > itunes-command-13, itunes-command-12, itunes-command-15, itunes-command-14, > itunes-command-17, itunes-command-16) > 2019-03-05 21:04:16.665 DEBUG 3052 --- [container-0-C-1] > o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-3, > groupId=service-main] Sending READ_UNCOMMITTED > IncrementalFetchRequest(toSend=(), toForget=(), implied=(itunes-command-19, > itunes-command-18, itunes-command-11, itunes-command-10, itunes-command-13, > itunes-command-12, itunes-command-15, itunes-command-14, itunes-command-17, > itunes-command-16)) to broker localhost:9092 (id: 1001 rack: null) > 2019-03-05 21:04:16.665 TRACE 3052 --- [container-0-C-1] > org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-3, > groupId=service-main - F630] Sending FETCH > {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=197970881,epoch=526,topics=[],forgotten_topics_data=[]} > with correlation id 630 to node 1001 > 2019-03-05
[jira] [Commented] (KAFKA-8381) SSL factory for inter-broker listener is broken
[ https://issues.apache.org/jira/browse/KAFKA-8381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842381#comment-16842381 ] ASF GitHub Bot commented on KAFKA-8381: --- rajinisivaram commented on pull request #6757: KAFKA-8381; Disable hostname validation when verifying inter-broker SSL URL: https://github.com/apache/kafka/pull/6757 - Make endpoint validation configurable on SslEngineBuilder when creating an engine - Disable endpoint validation for engines created for inter-broker SSL validation since it is unsafe to use `localhost` - Use empty hostname in validation engine to ensure tests fail if validation is re-enabled by mistake - Add tests to verify inter-broker SSL validation ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > SSL factory for inter-broker listener is broken > --- > > Key: KAFKA-8381 > URL: https://issues.apache.org/jira/browse/KAFKA-8381 > Project: Kafka > Issue Type: Bug > Components: security >Affects Versions: 2.3.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Blocker > Fix For: 2.3.0 > > > From a system test failure: > {code} > [2019-05-17 15:48:12,453] ERROR [KafkaServer id=1] Fatal error during > KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) > org.apache.kafka.common.KafkaException: > org.apache.kafka.common.config.ConfigException: Invalid value > javax.net.ssl.SSLHandshakeException: General SSLEngine problem for > configuration A client SSLEngine created with the provided settings can't > connect to a server SSLEngine created with those settings. > at > org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:162) > at > org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146) > at > org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:85) > at kafka.network.Processor.(SocketServer.scala:747) > at kafka.network.SocketServer.newProcessor(SocketServer.scala:388) > at > kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:282) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) > at > kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:281) > at > kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:244) > at > kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:241) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at > kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:241) > at kafka.network.SocketServer.startup(SocketServer.scala:120) > at kafka.server.KafkaServer.startup(KafkaServer.scala:293) > {code} > Looks like the changes under > https://github.com/apache/kafka/commit/0494cd329f3aaed94b3b46de0abe495f80faaedd > added validation for inter-broker SSL factory with hostname verification > enabled and `localhost` as the hostname. As a result, integration tests pass, > but system tests fail. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8347) Choose next record to process by timestamp
[ https://issues.apache.org/jira/browse/KAFKA-8347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-8347: --- Affects Version/s: (was: 2.3.0) 2.2.0 2.1.1 > Choose next record to process by timestamp > -- > > Key: KAFKA-8347 > URL: https://issues.apache.org/jira/browse/KAFKA-8347 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.2.0, 2.1.1 >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.3.0 > > > Currently PartitionGroup will determine the next record to process by > choosing the partition with the lowest stream time. However if a partition > contains out of order data its stream time may be significantly larger than > the timestamp of the next record. The next record should instead be chosen as > the record with the lowest timestamp across all partitions, regardless of > which partition it comes from or what its partition time is. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3522) Consider adding version information into rocksDB storage format
[ https://issues.apache.org/jira/browse/KAFKA-3522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842343#comment-16842343 ] ASF GitHub Bot commented on KAFKA-3522: --- mjsax commented on pull request #6756: KAFKA-3522: TopologyTestDriver should only return custom stores via untyped getStateStore() method URL: https://github.com/apache/kafka/pull/6756 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Consider adding version information into rocksDB storage format > --- > > Key: KAFKA-3522 > URL: https://issues.apache.org/jira/browse/KAFKA-3522 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Matthias J. Sax >Priority: Major > Labels: architecture > Fix For: 2.3.0 > > > Kafka Streams does not introduce any modifications to the data format in the > underlying Kafka protocol, but it does use RocksDB for persistent state > storage, and currently its data format is fixed and hard-coded. We want to > consider the evolution path in the future we we change the data format, and > hence having some version info stored along with the storage file / directory > would be useful. > And this information could be even out of the storage file; for example, we > can just use a small "version indicator" file in the rocksdb directory for > this purposes. Thoughts? [~enothereska] [~jkreps] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8265) Connect Client Config Override policy
[ https://issues.apache.org/jira/browse/KAFKA-8265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842322#comment-16842322 ] ASF GitHub Bot commented on KAFKA-8265: --- mageshn commented on pull request #6755: KAFKA-8265 : Fix config name to match KIP-458, Return a copy of the ConfigDef in Client Configs. URL: https://github.com/apache/kafka/pull/6755 In the initial implementation for KIP-458 https://github.com/apache/kafka/pull/6624, the config name was incorrect and not consistent with what was specified in the KIP. This PR fixes the inconsistency. There was also a concern raised about the mutability of `ConfigDef` in https://github.com/apache/kafka/pull/6624#pullrequestreview-238877899. I have made an attempt to fix it by returning a copy every time. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Connect Client Config Override policy > - > > Key: KAFKA-8265 > URL: https://issues.apache.org/jira/browse/KAFKA-8265 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Magesh kumar Nandakumar >Assignee: Magesh kumar Nandakumar >Priority: Major > Fix For: 2.3.0 > > > Right now, each source connector and sink connector inherit their client > configurations from the worker properties. Within the worker properties, all > configurations that have a prefix of "producer." or "consumer." are applied > to all source connectors and sink connectors respectively. > We should allow the "producer." or "consumer." to be overridden in > accordance to an override policy determined by the administrator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8382) Add TimestampedSessionStore
Matthias J. Sax created KAFKA-8382: -- Summary: Add TimestampedSessionStore Key: KAFKA-8382 URL: https://issues.apache.org/jira/browse/KAFKA-8382 Project: Kafka Issue Type: New Feature Components: streams Reporter: Matthias J. Sax Follow up to KIP-258, to complete the KIP by adding TimestampedSessionStores. [https://cwiki.apache.org/confluence/display/KAFKA/KIP-258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8381) SSL factory for inter-broker listener is broken
Rajini Sivaram created KAFKA-8381: - Summary: SSL factory for inter-broker listener is broken Key: KAFKA-8381 URL: https://issues.apache.org/jira/browse/KAFKA-8381 Project: Kafka Issue Type: Bug Components: security Affects Versions: 2.3.0 Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 2.3.0 >From a system test failure: {code} [2019-05-17 15:48:12,453] ERROR [KafkaServer id=1] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) org.apache.kafka.common.KafkaException: org.apache.kafka.common.config.ConfigException: Invalid value javax.net.ssl.SSLHandshakeException: General SSLEngine problem for configuration A client SSLEngine created with the provided settings can't connect to a server SSLEngine created with those settings. at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:162) at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146) at org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:85) at kafka.network.Processor.(SocketServer.scala:747) at kafka.network.SocketServer.newProcessor(SocketServer.scala:388) at kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:282) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) at kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:281) at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:244) at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:241) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:241) at kafka.network.SocketServer.startup(SocketServer.scala:120) at kafka.server.KafkaServer.startup(KafkaServer.scala:293) {code} Looks like the changes under https://github.com/apache/kafka/commit/0494cd329f3aaed94b3b46de0abe495f80faaedd added validation for inter-broker SSL factory with hostname verification enabled and `localhost` as the hostname. As a result, integration tests pass, but system tests fail. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8324) User constructed RocksObjects leak memory
[ https://issues.apache.org/jira/browse/KAFKA-8324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-8324. Resolution: Fixed Fix Version/s: 2.3.0 > User constructed RocksObjects leak memory > - > > Key: KAFKA-8324 > URL: https://issues.apache.org/jira/browse/KAFKA-8324 > Project: Kafka > Issue Type: Bug >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.3.0 > > > Some of the RocksDB options a user can set when extending RocksDBConfigSetter > take Rocks objects as parameters. Many of these – including potentially large > objects like Cache and Filter – inherit from AbstractNativeReference and must > be closed explicitly in order to free the memory of the backing C++ object. > However the user has no way of closing any objects they have created in > RocksDBConfigSetter, and we do not ever close them for them. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8324) User constructed RocksObjects leak memory
[ https://issues.apache.org/jira/browse/KAFKA-8324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8324: --- Description: Some of the RocksDB options a user can set when extending RocksDBConfigSetter take Rocks objects as parameters. Many of these – including potentially large objects like Cache and Filter – inherit from AbstractNativeReference and must be closed explicitly in order to free the memory of the backing C++ object. However the user has no way of closing any objects they have created in RocksDBConfigSetter, and we do not ever close them for them. KIP-453: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-453%3A+Add+close%28%29+method+to+RocksDBConfigSetter] was: Some of the RocksDB options a user can set when extending RocksDBConfigSetter take Rocks objects as parameters. Many of these – including potentially large objects like Cache and Filter – inherit from AbstractNativeReference and must be closed explicitly in order to free the memory of the backing C++ object. However the user has no way of closing any objects they have created in RocksDBConfigSetter, and we do not ever close them for them. > User constructed RocksObjects leak memory > - > > Key: KAFKA-8324 > URL: https://issues.apache.org/jira/browse/KAFKA-8324 > Project: Kafka > Issue Type: Bug >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > Labels: kip > Fix For: 2.3.0 > > > Some of the RocksDB options a user can set when extending RocksDBConfigSetter > take Rocks objects as parameters. Many of these – including potentially large > objects like Cache and Filter – inherit from AbstractNativeReference and must > be closed explicitly in order to free the memory of the backing C++ object. > However the user has no way of closing any objects they have created in > RocksDBConfigSetter, and we do not ever close them for them. > KIP-453: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-453%3A+Add+close%28%29+method+to+RocksDBConfigSetter] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8324) User constructed RocksObjects leak memory
[ https://issues.apache.org/jira/browse/KAFKA-8324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8324: --- Labels: kip (was: ) > User constructed RocksObjects leak memory > - > > Key: KAFKA-8324 > URL: https://issues.apache.org/jira/browse/KAFKA-8324 > Project: Kafka > Issue Type: Bug >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > Labels: kip > Fix For: 2.3.0 > > > Some of the RocksDB options a user can set when extending RocksDBConfigSetter > take Rocks objects as parameters. Many of these – including potentially large > objects like Cache and Filter – inherit from AbstractNativeReference and must > be closed explicitly in order to free the memory of the backing C++ object. > However the user has no way of closing any objects they have created in > RocksDBConfigSetter, and we do not ever close them for them. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-8324) User constructed RocksObjects leak memory
[ https://issues.apache.org/jira/browse/KAFKA-8324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-8324: -- Assignee: Sophie Blee-Goldman > User constructed RocksObjects leak memory > - > > Key: KAFKA-8324 > URL: https://issues.apache.org/jira/browse/KAFKA-8324 > Project: Kafka > Issue Type: Bug >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > > Some of the RocksDB options a user can set when extending RocksDBConfigSetter > take Rocks objects as parameters. Many of these – including potentially large > objects like Cache and Filter – inherit from AbstractNativeReference and must > be closed explicitly in order to free the memory of the backing C++ object. > However the user has no way of closing any objects they have created in > RocksDBConfigSetter, and we do not ever close them for them. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7190) Under low traffic conditions purging repartition topics cause WARN statements about UNKNOWN_PRODUCER_ID
[ https://issues.apache.org/jira/browse/KAFKA-7190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-7190. -- Resolution: Fixed Fix Version/s: 2.3.0 > Under low traffic conditions purging repartition topics cause WARN statements > about UNKNOWN_PRODUCER_ID > - > > Key: KAFKA-7190 > URL: https://issues.apache.org/jira/browse/KAFKA-7190 > Project: Kafka > Issue Type: Improvement > Components: core, streams >Affects Versions: 1.1.0, 1.1.1 >Reporter: Bill Bejeck >Assignee: lambdaliu >Priority: Major > Fix For: 2.3.0 > > > When a streams application has little traffic, then it is possible that > consumer purging would delete > even the last message sent by a producer (i.e., all the messages sent by > this producer have been consumed and committed), and as a result, the broker > would delete that producer's ID. The next time when this producer tries to > send, it will get this UNKNOWN_PRODUCER_ID error code, but in this case, > this error is retriable: the producer would just get a new producer id and > retries, and then this time it will succeed. > > Possible fixes could be on the broker side, i.e., delaying the deletion of > the produderIDs for a more extended period or on the streams side developing > a more conservative approach to deleting offsets from repartition topics > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8034) Replace DeleteTopics request/response with automated protocol
[ https://issues.apache.org/jira/browse/KAFKA-8034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison resolved KAFKA-8034. --- Resolution: Fixed > Replace DeleteTopics request/response with automated protocol > - > > Key: KAFKA-8034 > URL: https://issues.apache.org/jira/browse/KAFKA-8034 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8034) Replace DeleteTopics request/response with automated protocol
[ https://issues.apache.org/jira/browse/KAFKA-8034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842285#comment-16842285 ] Mickael Maison commented on KAFKA-8034: --- The PR was merged, marking as Resolved > Replace DeleteTopics request/response with automated protocol > - > > Key: KAFKA-8034 > URL: https://issues.apache.org/jira/browse/KAFKA-8034 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8379) Flaky test KafkaAdminClientTest.testUnreachableBootstrapServer
[ https://issues.apache.org/jira/browse/KAFKA-8379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-8379. --- Resolution: Fixed Reviewer: Ismael Juma > Flaky test KafkaAdminClientTest.testUnreachableBootstrapServer > -- > > Key: KAFKA-8379 > URL: https://issues.apache.org/jira/browse/KAFKA-8379 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.3.0 > > > Test failed with: > {code:java} > org.junit.runners.model.TestTimedOutException: test timed out after 12 > milliseconds > at java.lang.Object.wait(Native Method) > at java.lang.Object.wait(Object.java:502) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:92) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > org.apache.kafka.clients.admin.KafkaAdminClientTest.testUnreachableBootstrapServer(KafkaAdminClientTest.java:303) > {code} > Standard output shows: > {code} > [2019-05-17 06:38:01,854] ERROR Uncaught exception in thread > 'kafka-admin-client-thread | adminclient-35': > (org.apache.kafka.common.utils.KafkaThread:51) > java.lang.IllegalStateException: Cannot send > ClientRequest(expectResponse=true, callback=null, destination=-1, > correlationId=0, clientId=mockClientId, createdTimeMs=1558075081853, > requestBuilder=MetadataRequestData(topics=[], allowAutoTopicCreation=true, > includeClusterAuthorizedOperations=false, > includeTopicAuthorizedOperations=false)) since the destination is not ready > at org.apache.kafka.clients.MockClient.send(MockClient.java:186) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.sendEligibleCalls(KafkaAdminClient.java:943) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1140) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8367) Non-heap memory leak in Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-8367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842266#comment-16842266 ] Pavel Savov commented on KAFKA-8367: Still no joy, I'm afraid. I built from that branch but the leak is still there. None of the settings in our RocksDBConfigSetter changed when we upgraded to 2.2.0. I'll also try downgrading Kafka Streams to 2.1.0 and let you know how it goes. Thanks! > Non-heap memory leak in Kafka Streams > - > > Key: KAFKA-8367 > URL: https://issues.apache.org/jira/browse/KAFKA-8367 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.2.0 >Reporter: Pavel Savov >Priority: Major > Attachments: memory-prod.png, memory-test.png > > > We have been observing a non-heap memory leak after upgrading to Kafka > Streams 2.2.0 from 2.0.1. We suspect the source to be around RocksDB as the > leak only happens when we enable stateful stream operations (utilizing > stores). We are aware of *KAFKA-8323* and have created our own fork of 2.2.0 > and ported the fix scheduled for release in 2.2.1 to our fork. It did not > stop the leak, however. > We are having this memory leak in our production environment where the > consumer group is auto-scaled in and out in response to changes in traffic > volume, and in our test environment where we have two consumers, no > autoscaling and relatively constant traffic. > Below is some information I'm hoping will be of help: > * RocksDB Config: > Block cache size: 4 MiB > Write buffer size: 2 MiB > Block size: 16 KiB > Cache index and filter blocks: true > Manifest preallocation size: 64 KiB > Max write buffer number: 3 > Max open files: 6144 > > * Memory usage in production > The attached graph (memory-prod.png) shows memory consumption for each > instance as a separate line. The horizontal red line at 6 GiB is the memory > limit. > As illustrated on the attached graph from production, memory consumption in > running instances goes up around autoscaling events (scaling the consumer > group either in or out) and associated rebalancing. It stabilizes until the > next autoscaling event but it never goes back down. > An example of scaling out can be seen from around 21:00 hrs where three new > instances are started in response to a traffic spike. > Just after midnight traffic drops and some instances are shut down. Memory > consumption in the remaining running instances goes up. > Memory consumption climbs again from around 6:00AM due to increased traffic > and new instances are being started until around 10:30AM. Memory consumption > never drops until the cluster is restarted around 12:30. > > * Memory usage in test > As illustrated by the attached graph (memory-test.png) we have a fixed number > of two instances in our test environment and no autoscaling. Memory > consumption rises linearly until it reaches the limit (around 2:00 AM on > 5/13) and Mesos restarts the offending instances, or we restart the cluster > manually. > > * No heap leaks observed > * Window retention: 2 or 11 minutes (depending on operation type) > * Issue not present in Kafka Streams 2.0.1 > * No memory leak for stateless stream operations (when no RocksDB stores are > used) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8380) We can not create a topic, immediately write to it and then read.
[ https://issues.apache.org/jira/browse/KAFKA-8380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Darya Merkureva updated KAFKA-8380: --- Summary: We can not create a topic, immediately write to it and then read. (was: I can not create a topic, immediately write to it and then read.) > We can not create a topic, immediately write to it and then read. > - > > Key: KAFKA-8380 > URL: https://issues.apache.org/jira/browse/KAFKA-8380 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.0 >Reporter: Darya Merkureva >Priority: Blocker > > We are trying to create a topic, immediately write to it and read. > For some reason, we read nothing in spite of the fact that we are waiting for > the completion of KafkaFuture. > {code:java} > public class main { > private static final String TOPIC_NAME = "topic"; > private static final String KEY_NAME = "key"; > public static void main(String[] args) { > final Properties prodProps = new Properties(); > prodProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092"); > prodProps.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); > prodProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5); > prodProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > prodProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > final Producer prod = new > KafkaProducer<>(prodProps); > final Properties admProps = new Properties(); > admProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092"); > final AdminClient adm = KafkaAdminClient.create(admProps); > final Properties consProps = new Properties(); > consProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092"); > consProps.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer"); > consProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); > consProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, > "1000"); > consProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, > "3"); > consProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringDeserializer"); > consProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringDeserializer"); > final Consumer cons = new > KafkaConsumer<>(consProps); > > try { > final NewTopic newTopic = new NewTopic(TOPIC_NAME, 1, > (short)1); > val createTopicsResult = > adm.createTopics(Collections.singleton(newTopic)); > createTopicsResult.values().get(TOPIC_NAME).get(); > } catch (InterruptedException | ExecutionException e) { > if (!(e.getCause() instanceof TopicExistsException)) { > throw new RuntimeException(e.getMessage(), e); > } > } > > final ProducerRecord producerRecord = > new ProducerRecord<>(TOPIC_NAME, KEY_NAME, > "data"); > prod.send(producerRecord); > prod.send(producerRecord); > prod.send(producerRecord); > prod.send(producerRecord); > cons.subscribe(Arrays.asList(TOPIC_NAME)); > val records = cons.poll(Duration.ofSeconds(10)); > for(var record: records){ > System.out.println(record.value()); > } > } > } > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8380) I can not create a topic, immediately write to it and then read.
Darya Merkureva created KAFKA-8380: -- Summary: I can not create a topic, immediately write to it and then read. Key: KAFKA-8380 URL: https://issues.apache.org/jira/browse/KAFKA-8380 Project: Kafka Issue Type: Bug Affects Versions: 2.2.0 Reporter: Darya Merkureva We are trying to create a topic, immediately write to it and read. For some reason, we read nothing in spite of the fact that we are waiting for the completion of KafkaFuture. {code:java} public class main { private static final String TOPIC_NAME = "topic"; private static final String KEY_NAME = "key"; public static void main(String[] args) { final Properties prodProps = new Properties(); prodProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); prodProps.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); prodProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5); prodProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); prodProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); final Producer prod = new KafkaProducer<>(prodProps); final Properties admProps = new Properties(); admProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); final AdminClient adm = KafkaAdminClient.create(admProps); final Properties consProps = new Properties(); consProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); consProps.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer"); consProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); consProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); consProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "3"); consProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); consProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); final Consumer cons = new KafkaConsumer<>(consProps); try { final NewTopic newTopic = new NewTopic(TOPIC_NAME, 1, (short)1); val createTopicsResult = adm.createTopics(Collections.singleton(newTopic)); createTopicsResult.values().get(TOPIC_NAME).get(); } catch (InterruptedException | ExecutionException e) { if (!(e.getCause() instanceof TopicExistsException)) { throw new RuntimeException(e.getMessage(), e); } } final ProducerRecord producerRecord = new ProducerRecord<>(TOPIC_NAME, KEY_NAME, "data"); prod.send(producerRecord); prod.send(producerRecord); prod.send(producerRecord); prod.send(producerRecord); cons.subscribe(Arrays.asList(TOPIC_NAME)); val records = cons.poll(Duration.ofSeconds(10)); for(var record: records){ System.out.println(record.value()); } } } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8379) Flaky test KafkaAdminClientTest.testUnreachableBootstrapServer
[ https://issues.apache.org/jira/browse/KAFKA-8379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842165#comment-16842165 ] ASF GitHub Bot commented on KAFKA-8379: --- rajinisivaram commented on pull request #6753: KAFKA-8379; Fix flaky test KafkaAdminClientTest.testUnreachableBootstrapServer URL: https://github.com/apache/kafka/pull/6753 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flaky test KafkaAdminClientTest.testUnreachableBootstrapServer > -- > > Key: KAFKA-8379 > URL: https://issues.apache.org/jira/browse/KAFKA-8379 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.3.0 > > > Test failed with: > {code:java} > org.junit.runners.model.TestTimedOutException: test timed out after 12 > milliseconds > at java.lang.Object.wait(Native Method) > at java.lang.Object.wait(Object.java:502) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:92) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > org.apache.kafka.clients.admin.KafkaAdminClientTest.testUnreachableBootstrapServer(KafkaAdminClientTest.java:303) > {code} > Standard output shows: > {code} > [2019-05-17 06:38:01,854] ERROR Uncaught exception in thread > 'kafka-admin-client-thread | adminclient-35': > (org.apache.kafka.common.utils.KafkaThread:51) > java.lang.IllegalStateException: Cannot send > ClientRequest(expectResponse=true, callback=null, destination=-1, > correlationId=0, clientId=mockClientId, createdTimeMs=1558075081853, > requestBuilder=MetadataRequestData(topics=[], allowAutoTopicCreation=true, > includeClusterAuthorizedOperations=false, > includeTopicAuthorizedOperations=false)) since the destination is not ready > at org.apache.kafka.clients.MockClient.send(MockClient.java:186) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.sendEligibleCalls(KafkaAdminClient.java:943) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1140) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8183) Trogdor - ProduceBench should retry on UnknownTopicOrPartitionException during topic creation
[ https://issues.apache.org/jira/browse/KAFKA-8183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842109#comment-16842109 ] Stanislav Kozlovski commented on KAFKA-8183: The merged PR did not seem to fix the issue. I am seeing it again > Trogdor - ProduceBench should retry on UnknownTopicOrPartitionException > during topic creation > - > > Key: KAFKA-8183 > URL: https://issues.apache.org/jira/browse/KAFKA-8183 > Project: Kafka > Issue Type: Improvement >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Minor > > There exists a race condition in the Trogdor produce bench worker code where > `WorkerUtils#createTopics()` [notices the topic > exists|https://github.com/apache/kafka/blob/4824dc994d7fc56b7540b643a78aadb4bdd0f14d/tools/src/main/java/org/apache/kafka/trogdor/common/WorkerUtils.java#L159] > yet when it goes on to verify the topics, the DescribeTopics call throws an > `UnknownTopicOrPartitionException`. > We should add sufficient retries such that this does not fail the task. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8379) Flaky test KafkaAdminClientTest.testUnreachableBootstrapServer
[ https://issues.apache.org/jira/browse/KAFKA-8379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842071#comment-16842071 ] ASF GitHub Bot commented on KAFKA-8379: --- rajinisivaram commented on pull request #6753: KAFKA-8379; Fix flaky test KafkaAdminClientTest.testUnreachableBootstrapServer URL: https://github.com/apache/kafka/pull/6753 The test starts an AdminClient with a MockClient. After the admin client network thread had started, it was disconnecting one of the nodes and marking it unreachable from the main thread. This interferes with the admin client network thread, causing a timing issue if the disconnect occurred while the network thread was processing the first metadata request. This PR makes the test safer by marking the node unreachable in MockClient before starting the admin client. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flaky test KafkaAdminClientTest.testUnreachableBootstrapServer > -- > > Key: KAFKA-8379 > URL: https://issues.apache.org/jira/browse/KAFKA-8379 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.3.0 > > > Test failed with: > {code:java} > org.junit.runners.model.TestTimedOutException: test timed out after 12 > milliseconds > at java.lang.Object.wait(Native Method) > at java.lang.Object.wait(Object.java:502) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:92) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > org.apache.kafka.clients.admin.KafkaAdminClientTest.testUnreachableBootstrapServer(KafkaAdminClientTest.java:303) > {code} > Standard output shows: > {code} > [2019-05-17 06:38:01,854] ERROR Uncaught exception in thread > 'kafka-admin-client-thread | adminclient-35': > (org.apache.kafka.common.utils.KafkaThread:51) > java.lang.IllegalStateException: Cannot send > ClientRequest(expectResponse=true, callback=null, destination=-1, > correlationId=0, clientId=mockClientId, createdTimeMs=1558075081853, > requestBuilder=MetadataRequestData(topics=[], allowAutoTopicCreation=true, > includeClusterAuthorizedOperations=false, > includeTopicAuthorizedOperations=false)) since the destination is not ready > at org.apache.kafka.clients.MockClient.send(MockClient.java:186) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.sendEligibleCalls(KafkaAdminClient.java:943) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1140) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8347) Choose next record to process by timestamp
[ https://issues.apache.org/jira/browse/KAFKA-8347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842063#comment-16842063 ] Matthias J. Sax commented on KAFKA-8347: [~ableegoldman] The "affects version" field seems not to be correct. We should mark the first release that was affected. Hence, I think it should be `2.1` as KIP-353 was done in this release. > Choose next record to process by timestamp > -- > > Key: KAFKA-8347 > URL: https://issues.apache.org/jira/browse/KAFKA-8347 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.3.0 >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.3.0 > > > Currently PartitionGroup will determine the next record to process by > choosing the partition with the lowest stream time. However if a partition > contains out of order data its stream time may be significantly larger than > the timestamp of the next record. The next record should instead be chosen as > the record with the lowest timestamp across all partitions, regardless of > which partition it comes from or what its partition time is. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7895) Ktable supress operator emitting more than one record for the same key per window
[ https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842047#comment-16842047 ] Matthias J. Sax commented on KAFKA-7895: As pointed out in the email John linked to, the RC artifact can be downloaded: [https://home.apache.org/~vahid/kafka-2.2.1-rc0/] > Ktable supress operator emitting more than one record for the same key per > window > - > > Key: KAFKA-7895 > URL: https://issues.apache.org/jira/browse/KAFKA-7895 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0, 2.2.0, 2.1.1 >Reporter: prasanthi >Assignee: John Roesler >Priority: Blocker > Fix For: 2.1.2, 2.2.1 > > > Hi, We are using kstreams to get the aggregated counts per vendor(key) within > a specified window. > Here's how we configured the suppress operator to emit one final record per > key/window. > {code:java} > KTable, Long> windowedCount = groupedStream > .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L))) > .count(Materialized.with(Serdes.Integer(),Serdes.Long())) > .suppress(Suppressed.untilWindowCloses(unbounded())); > {code} > But we are getting more than one record for the same key/window as shown > below. > {code:java} > [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1039 > [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1162 > [KTABLE-TOSTREAM-10]: [9@154906704/154906710], 6584 > [KTABLE-TOSTREAM-10]: [88@154906704/154906710], 107 > [KTABLE-TOSTREAM-10]: [108@154906704/154906710], 315 > [KTABLE-TOSTREAM-10]: [119@154906704/154906710], 119 > [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 746 > [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 809{code} > Could you please take a look? > Thanks > > > Added by John: > Acceptance Criteria: > * add suppress to system tests, such that it's exercised with crash/shutdown > recovery, rebalance, etc. > ** [https://github.com/apache/kafka/pull/6278] > * make sure that there's some system test coverage with caching disabled. > ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943 > * test with tighter time bounds with windows of say 30 seconds and use > system time without adding any extra time for verification > ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8379) Flaky test KafkaAdminClientTest.testUnreachableBootstrapServer
Rajini Sivaram created KAFKA-8379: - Summary: Flaky test KafkaAdminClientTest.testUnreachableBootstrapServer Key: KAFKA-8379 URL: https://issues.apache.org/jira/browse/KAFKA-8379 Project: Kafka Issue Type: Bug Components: clients Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 2.3.0 Test failed with: {code:java} org.junit.runners.model.TestTimedOutException: test timed out after 12 milliseconds at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:502) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:92) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at org.apache.kafka.clients.admin.KafkaAdminClientTest.testUnreachableBootstrapServer(KafkaAdminClientTest.java:303) {code} Standard output shows: {code} [2019-05-17 06:38:01,854] ERROR Uncaught exception in thread 'kafka-admin-client-thread | adminclient-35': (org.apache.kafka.common.utils.KafkaThread:51) java.lang.IllegalStateException: Cannot send ClientRequest(expectResponse=true, callback=null, destination=-1, correlationId=0, clientId=mockClientId, createdTimeMs=1558075081853, requestBuilder=MetadataRequestData(topics=[], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false)) since the destination is not ready at org.apache.kafka.clients.MockClient.send(MockClient.java:186) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.sendEligibleCalls(KafkaAdminClient.java:943) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1140) at java.lang.Thread.run(Thread.java:748) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842015#comment-16842015 ] Mateusz Owczarek commented on KAFKA-5998: - [~guozhang] Any updates on this? I have the same issues with Kafka 2.1.1. > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1 >Reporter: Yogesh BG >Priority: Critical > Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, > props.txt, streams.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > 09:43:25.974 [ks_0_inst-StreamThread-15] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at >
[jira] [Commented] (KAFKA-8265) Connect Client Config Override policy
[ https://issues.apache.org/jira/browse/KAFKA-8265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16842002#comment-16842002 ] ASF GitHub Bot commented on KAFKA-8265: --- ijuma commented on pull request #6624: KAFKA-8265: Initial implementation for ConnectorClientConfigPolicy to enable overrides (KIP-458) URL: https://github.com/apache/kafka/pull/6624 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Connect Client Config Override policy > - > > Key: KAFKA-8265 > URL: https://issues.apache.org/jira/browse/KAFKA-8265 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Magesh kumar Nandakumar >Assignee: Magesh kumar Nandakumar >Priority: Major > Fix For: 2.3.0 > > > Right now, each source connector and sink connector inherit their client > configurations from the worker properties. Within the worker properties, all > configurations that have a prefix of "producer." or "consumer." are applied > to all source connectors and sink connectors respectively. > We should allow the "producer." or "consumer." to be overridden in > accordance to an override policy determined by the administrator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8378) Kafka start error when topic data error
[ https://issues.apache.org/jira/browse/KAFKA-8378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ulysses you updated KAFKA-8378: --- Description: When start kafka server, controller log output error log, but not show what topic error. Unfortunately, my kafka cluster has 300+ topics, so its difficult to find the error topic. {code:java} [2019-05-17 13:02:28,778] ERROR [Controller 12]: Error while electing or becoming controller on broker 12 (kafka.controller.KafkaController) kafka.common.KafkaException: Can't parse json string: null at kafka.utils.Json$.liftedTree1$1(Json.scala:40) at kafka.utils.Json$.parseFull(Json.scala:36) at kafka.utils.ZkUtils$$anonfun$getReplicaAssignmentForTopics$1.apply(ZkUtils.scala:726) at kafka.utils.ZkUtils$$anonfun$getReplicaAssignmentForTopics$1.apply(ZkUtils.scala:722) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at kafka.utils.ZkUtils.getReplicaAssignmentForTopics(ZkUtils.scala:722) at kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:657) at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:260) at kafka.controller.KafkaController.elect(KafkaController.scala:1578) at kafka.controller.KafkaController$Reelect$.process(KafkaController.scala:1513) at kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:50) at kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:50) at kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:50) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) at kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:49) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) Caused by: java.lang.NullPointerException at scala.util.parsing.combinator.lexical.Scanners$Scanner.(Scanners.scala:44) at scala.util.parsing.json.JSON$.parseRaw(JSON.scala:51) at scala.util.parsing.json.JSON$.parseFull(JSON.scala:65) at kafka.utils.Json$.liftedTree1$1(Json.scala:37) ... 16 more {code} In my scene, the error topic has node in zookeeper, just like `/brokers/topics/mytopic` , but the data of node is null. This patch avoid broker starting error because of error topic. was: When start kafka controller output error log, but not show what topic error. Unfortunately, my kafka cluster has 300+ topics, so its difficult to find the error topic. {code:java} [2019-05-17 13:02:28,778] ERROR [Controller 12]: Error while electing or becoming controller on broker 12 (kafka.controller.KafkaController) kafka.common.KafkaException: Can't parse json string: null at kafka.utils.Json$.liftedTree1$1(Json.scala:40) at kafka.utils.Json$.parseFull(Json.scala:36) at kafka.utils.ZkUtils$$anonfun$getReplicaAssignmentForTopics$1.apply(ZkUtils.scala:726) at kafka.utils.ZkUtils$$anonfun$getReplicaAssignmentForTopics$1.apply(ZkUtils.scala:722) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at kafka.utils.ZkUtils.getReplicaAssignmentForTopics(ZkUtils.scala:722) at kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:657) at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:260) at kafka.controller.KafkaController.elect(KafkaController.scala:1578) at kafka.controller.KafkaController$Reelect$.process(KafkaController.scala:1513) at kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:50) at kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:50) at kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:50) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) at kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:49) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) Caused by: java.lang.NullPointerException at scala.util.parsing.combinator.lexical.Scanners$Scanner.(Scanners.scala:44) at scala.util.parsing.json.JSON$.parseRaw(JSON.scala:51) at scala.util.parsing.json.JSON$.parseFull(JSON.scala:65) at kafka.utils.Json$.liftedTree1$1(Json.scala:37) ... 16 more {code} In my scene, the error topic has node in zookeeper, just like `/brokers/topics/mytopic` , but the data of node is null. This patch avoid broker starting error because of error topic. Summary: Kafka start error when topic data error (was: kafka start error when topic data error) > Kafka start error when topic data error >
[jira] [Created] (KAFKA-8378) kafka start error when topic data error
ulysses you created KAFKA-8378: -- Summary: kafka start error when topic data error Key: KAFKA-8378 URL: https://issues.apache.org/jira/browse/KAFKA-8378 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 2.1.1, 2.2.0, 2.0.1, 1.1.1, 1.0.2 Reporter: ulysses you Attachments: 89399ffe4995ea9b4c2ebdb788f5dfd55001bc80.patch When start kafka controller output error log, but not show what topic error. Unfortunately, my kafka cluster has 300+ topics, so its difficult to find the error topic. {code:java} [2019-05-17 13:02:28,778] ERROR [Controller 12]: Error while electing or becoming controller on broker 12 (kafka.controller.KafkaController) kafka.common.KafkaException: Can't parse json string: null at kafka.utils.Json$.liftedTree1$1(Json.scala:40) at kafka.utils.Json$.parseFull(Json.scala:36) at kafka.utils.ZkUtils$$anonfun$getReplicaAssignmentForTopics$1.apply(ZkUtils.scala:726) at kafka.utils.ZkUtils$$anonfun$getReplicaAssignmentForTopics$1.apply(ZkUtils.scala:722) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at kafka.utils.ZkUtils.getReplicaAssignmentForTopics(ZkUtils.scala:722) at kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:657) at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:260) at kafka.controller.KafkaController.elect(KafkaController.scala:1578) at kafka.controller.KafkaController$Reelect$.process(KafkaController.scala:1513) at kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:50) at kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:50) at kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:50) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) at kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:49) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64) Caused by: java.lang.NullPointerException at scala.util.parsing.combinator.lexical.Scanners$Scanner.(Scanners.scala:44) at scala.util.parsing.json.JSON$.parseRaw(JSON.scala:51) at scala.util.parsing.json.JSON$.parseFull(JSON.scala:65) at kafka.utils.Json$.liftedTree1$1(Json.scala:37) ... 16 more {code} In my scene, the error topic has node in zookeeper, just like `/brokers/topics/mytopic` , but the data of node is null. This patch avoid broker starting error because of error topic. -- This message was sent by Atlassian JIRA (v7.6.3#76005)