fonsdant commented on code in PR #18811:
URL: https://github.com/apache/kafka/pull/18811#discussion_r2195259197


##########
docs/streams/developer-guide/draft.md:
##########
@@ -0,0 +1,209 @@
+# Optimizing Kafka Streams with skipRepartition
+
+## Introduction
+
+Apache Kafka Streams automatically triggers repartitioning when operations 
change the message key before a stateful
+operation like `groupByKey()`, `aggregate()`, or `count()`. This behavior 
ensures that data is correctly distributed
+across partitions to guarantee accurate calculations.
+
+However, in many cases, the data is already partitioned correctly, making 
repartitioning unnecessary and inefficient. To
+address this, we introduce `skipRepartition()`, an API that allows developers 
to bypass the repartitioning step when it
+is safe to do so, resulting in reduced latency and lower infrastructure costs.
+
+## Motivation
+
+Imagine a streaming e-commerce application where events are partitioned by 
`customerId`. We want to calculate the total
+amount spent by each customer:
+
+```java
+KStream<String, Order> orders = builder.stream("orders-topic")
+        .selectKey((key, order) -> order.customerId) // Already correctly 
partitioned!
+        .groupByKey() // By default, this triggers a repartition
+        .aggregate(
+                () -> 0,
+                (key, order, total) -> total + order.amount,
+                Materialized.with(Serdes.String(), Serdes.Integer())
+        );
+```
+
+* **The problem:** Even though the stream is already partitioned correctly by 
customerId, Kafka Streams will create an
+  unnecessary repartition topic, leading to increased latency and resource 
consumption.
+* **The solution:** We can use `skipRepartition()` to prevent this:
+
+```java
+KStream<String, Order> orders = builder.stream("orders-topic")
+        .selectKey((key, order) -> order.customerId)
+        .skipRepartition() // Avoids unnecessary repartitioning
+        .groupByKey()
+        .aggregate(
+                () -> 0,
+                (key, order, total) -> total + order.amount,
+                Materialized.with(Serdes.String(), Serdes.Integer())
+        );
+```
+
+With `skipRepartition()`, Kafka Streams will skip the repartitioning step and 
process the aggregation directly,
+optimizing performance.
+
+## When NOT to Use skipRepartition
+
+Although `skipRepartition()` is a powerful optimization tool, it should not be 
used indiscriminately. Here are some
+cases where it must be avoided:
+
+### Stream Joins
+
+Kafka Streams relies on repartitioning during stream joins to align records by 
key.
+
+```java
+KStream<String, Order> orders = builder.stream("orders-topic")
+        .selectKey((key, order) -> order.customerId)
+        .skipRepartition()
+        .join(
+                builder.table("customers-topic"),
+                (order, customer) -> order.amount + customer.discount,
+                Joined.with(Serdes.String(), Serdes.Integer(), Serdes.String())
+        ); // May produce incorrect results!
+```
+
+Joins expect a composite key produced during repartitioning. Skipping this 
step may cause misaligned records.

Review Comment:
   Sure! I think this except from KIP can help answering this question:
   
   > The usage of this operation complicates the usage of IQ(Interactive Query) 
and joins. For reasons that when repartitions happen, records are physically 
shuffled by a composite key defined in the stateful operation. However, when 
the repartitions are canceled, records stayed in their original partition by 
their original key. IQ assumes and uses the composite key instead of the 
original key. That's when IQ can break downstream. The same applies to joins. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to