[ https://issues.apache.org/jira/browse/KAFKA-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16688333#comment-16688333 ]
Bill Bejeck commented on KAFKA-7608: ------------------------------------ Hi Andy, The fact that transform, transformValues, and process don't trigger a repartition is by design. I'm not sure we should implement an automatic repartition when using a state store with these operations. For one, we'll force a repartition on others already using state stores and don't require a repartitioning. Second, the state store in use could be a custom store that is not a key-value store so repartitioning would not help. But I completely understand your issue. There is an existing KIP proposal ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Repartition+Topic+Hints+in+Streams)] that looks to add some user control over repartitioning topics. If we were to add some additional features to this KIP that # Allowed for Kafka Streams to create the topic via a {{KStream.through(String name,Produced produced)}} method # With the appropriate information contained in the {{produced}} parameter allow Kafka Streams to manage the topic created from the {{through}} call as repartition topic (meaning contents purged) Would that suit your needs? While the repartitioning would not be automatic, it would be a simple matter of adding a single method call to the DSL and Kafka Streams would handle topic creation and management of the contents. Thanks, Bill > A Kafka Streams DSL transform or process call should potentially trigger a > repartition > -------------------------------------------------------------------------------------- > > Key: KAFKA-7608 > URL: https://issues.apache.org/jira/browse/KAFKA-7608 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 2.0.0 > Reporter: Andy Bryant > Priority: Major > > Currently in Kafka Streams, if any DSL operation occurs that may modify the > keys of the record stream, the stream is flagged for repartitioning. > Currently this flag is checked prior to a stream join or an aggregation and > if set the stream is piped through a transient repartition topic. This > ensures messages with the same key are always co-located in the same > partition and hence same stream task and state store. > The same mechanism should be used to trigger repartitioning prior to stream > {{transform}}, {{transformValues}} and {{process}} calls that specify one or > more state stores. > Currently without the forced repartitioning, for streams where the key has > been modified, there is no guarantee the same keys will be processed by the > same task which would be what you expect when using a state store. Given that > aggregations and joins already automatically make this guarantee it seems > inconsistent that {{transform}} and {{process}} do not provide the same > guarantees. > To achieve the same guarantees currently, developers must manually pipe the > stream through a topic to force the repartitioning. This works, but is > sub-optimal since you don't get the handy optimisation where the repartition > topic contents is purged after use. -- This message was sent by Atlassian JIRA (v7.6.3#76005)