aliehsaeedii commented on code in PR #22570:
URL: https://github.com/apache/kafka/pull/22570#discussion_r3460259495


##########
streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java:
##########
@@ -53,17 +54,23 @@
 @FunctionalInterface
 public interface StreamPartitioner<K, V> {
 
+    @Deprecated(since = "4.3", forRemoval = true)

Review Comment:
   `@Deprecated(since = "4.3", forRemoval = true)` — trunk is currently 
`4.4.0-SNAPSHOT`, so this deprecation will ship in 4.4, not 4.3. Should this be 
`since = "4.4"`? Worth confirming against the KIP's target release.



##########
streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java:
##########
@@ -53,17 +54,23 @@
 @FunctionalInterface
 public interface StreamPartitioner<K, V> {
 
+    @Deprecated(since = "4.3", forRemoval = true)
+    Optional<Set<Integer>> partitions(String topic, K key, V value, int 
numPartitions);
+
     /**
-     * Determine the number(s) of the partition(s) to which a record with the 
given key and value should be sent, 
+     * Determine the number(s) of the partition(s) to which a record with the 
given key and value should be sent,
      * for the given topic and current partition count
      * @param topic the topic name this record is sent to
      * @param key the key of the record
      * @param value the value of the record
+     * @param headers the record headers
      * @param numPartitions the total number of partitions
      * @return an Optional of Set of integers between 0 and {@code 
numPartitions-1},
      * Empty optional means using default partitioner
      * Optional of an empty set means the record won't be sent to any 
partitions i.e drop it.
      * Optional of Set of integers means the partitions to which the record 
should be sent to.
      * */
-    Optional<Set<Integer>> partitions(String topic, K key, V value, int 
numPartitions);
+    default Optional<Set<Integer>> partitions(final String topic, final K key, 
final V value, final Headers headers, final int numPartitions) {

Review Comment:
   Keeping the deprecated 4-arg method abstract while the new headers-aware 
method is the default means new implementers must still implement the 
deprecated method (as the tests do by throwing `AssertionError`). Was making 
the new method abstract and the deprecated one a default considered? It would 
break the `@FunctionalInterface`/existing-lambda compatibility, so this is 
likely intentional — flagging only to confirm.



##########
streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java:
##########
@@ -53,17 +54,23 @@
 @FunctionalInterface
 public interface StreamPartitioner<K, V> {
 
+    @Deprecated(since = "4.3", forRemoval = true)
+    Optional<Set<Integer>> partitions(String topic, K key, V value, int 
numPartitions);

Review Comment:
   This method is now `@Deprecated(forRemoval = true)` but has no Javadoc. A 
removal-bound deprecation should carry a `@deprecated` tag pointing readers to 
the replacement, e.g. `@deprecated Use {@link #partitions(String, Object, 
Object, Headers, int)} instead.`



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -137,8 +138,21 @@ public class RecordCollectorTest {
     private final StringSerializer stringSerializer = new StringSerializer();
     private final ByteArraySerializer byteArraySerializer = new 
ByteArraySerializer();
 
-    private final StreamPartitioner<String, Object> streamPartitioner =
-        (topic, key, value, numPartitions) -> 
Optional.of(Collections.singleton(Integer.parseInt(key) % numPartitions));
+    private final StreamPartitioner<String, Object> streamPartitioner = new 
StreamPartitioner<String, Object>() {
+        @SuppressWarnings("removal")
+        @Override
+        public Optional<Set<Integer>> partitions(final String topic, final 
String key, final Object value, final int numPartitions) {
+            throw new AssertionError("Deprecated 4-argument partitions method 
was called instead of 5-argument method containing headers.");
+        }
+
+        @Override
+        public Optional<Set<Integer>> partitions(final String topic, final 
String key, final Object value, final Headers headers, final int numPartitions) 
{
+            if (headers != null && headers.lastHeader("key") != null) {

Review Comment:
   What is the main purpose of this condition? `if (headers != null && 
headers.lastHeader("key") != null)` assertion only runs when the header is 
already present, so it can't fail in the scenario it appears to guard: if a 
regression dropped headers, `lastHeader("key")` would be null, the branch would 
be skipped, and the test would still pass. The guard is needed here because 
these tests fire a mix of null-header and populated-header sends through the 
same partitioner, but that also makes the assertion toothless for verifying 
propagation. Since propagation is already covered strictly by 
`shouldPropagateHeadersToPartitioner`, consider dropping this conditional 
assertion. Same pattern repeats in the other partitioners in this file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to