aliehsaeedii commented on code in PR #22570:
URL: https://github.com/apache/kafka/pull/22570#discussion_r3460259495
##########
streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java:
##########
@@ -53,17 +54,23 @@
@FunctionalInterface
public interface StreamPartitioner<K, V> {
+ @Deprecated(since = "4.3", forRemoval = true)
Review Comment:
`@Deprecated(since = "4.3", forRemoval = true)` — trunk is currently
`4.4.0-SNAPSHOT`, so this deprecation will ship in 4.4, not 4.3. Should this be
`since = "4.4"`? Worth confirming against the KIP's target release.
##########
streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java:
##########
@@ -53,17 +54,23 @@
@FunctionalInterface
public interface StreamPartitioner<K, V> {
+ @Deprecated(since = "4.3", forRemoval = true)
+ Optional<Set<Integer>> partitions(String topic, K key, V value, int
numPartitions);
+
/**
- * Determine the number(s) of the partition(s) to which a record with the
given key and value should be sent,
+ * Determine the number(s) of the partition(s) to which a record with the
given key and value should be sent,
* for the given topic and current partition count
* @param topic the topic name this record is sent to
* @param key the key of the record
* @param value the value of the record
+ * @param headers the record headers
* @param numPartitions the total number of partitions
* @return an Optional of Set of integers between 0 and {@code
numPartitions-1},
* Empty optional means using default partitioner
* Optional of an empty set means the record won't be sent to any
partitions i.e drop it.
* Optional of Set of integers means the partitions to which the record
should be sent to.
* */
- Optional<Set<Integer>> partitions(String topic, K key, V value, int
numPartitions);
+ default Optional<Set<Integer>> partitions(final String topic, final K key,
final V value, final Headers headers, final int numPartitions) {
Review Comment:
Keeping the deprecated 4-arg method abstract while the new headers-aware
method is the default means new implementers must still implement the
deprecated method (as the tests do by throwing `AssertionError`). Was making
the new method abstract and the deprecated one a default considered? It would
break the `@FunctionalInterface`/existing-lambda compatibility, so this is
likely intentional — flagging only to confirm.
##########
streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java:
##########
@@ -53,17 +54,23 @@
@FunctionalInterface
public interface StreamPartitioner<K, V> {
+ @Deprecated(since = "4.3", forRemoval = true)
+ Optional<Set<Integer>> partitions(String topic, K key, V value, int
numPartitions);
Review Comment:
This method is now `@Deprecated(forRemoval = true)` but has no Javadoc. A
removal-bound deprecation should carry a `@deprecated` tag pointing readers to
the replacement, e.g. `@deprecated Use {@link #partitions(String, Object,
Object, Headers, int)} instead.`
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -137,8 +138,21 @@ public class RecordCollectorTest {
private final StringSerializer stringSerializer = new StringSerializer();
private final ByteArraySerializer byteArraySerializer = new
ByteArraySerializer();
- private final StreamPartitioner<String, Object> streamPartitioner =
- (topic, key, value, numPartitions) ->
Optional.of(Collections.singleton(Integer.parseInt(key) % numPartitions));
+ private final StreamPartitioner<String, Object> streamPartitioner = new
StreamPartitioner<String, Object>() {
+ @SuppressWarnings("removal")
+ @Override
+ public Optional<Set<Integer>> partitions(final String topic, final
String key, final Object value, final int numPartitions) {
+ throw new AssertionError("Deprecated 4-argument partitions method
was called instead of 5-argument method containing headers.");
+ }
+
+ @Override
+ public Optional<Set<Integer>> partitions(final String topic, final
String key, final Object value, final Headers headers, final int numPartitions)
{
+ if (headers != null && headers.lastHeader("key") != null) {
Review Comment:
What is the main purpose of this condition? `if (headers != null &&
headers.lastHeader("key") != null)` assertion only runs when the header is
already present, so it can't fail in the scenario it appears to guard: if a
regression dropped headers, `lastHeader("key")` would be null, the branch would
be skipped, and the test would still pass. The guard is needed here because
these tests fire a mix of null-header and populated-header sends through the
same partitioner, but that also makes the assertion toothless for verifying
propagation. Since propagation is already covered strictly by
`shouldPropagateHeadersToPartitioner`, consider dropping this conditional
assertion. Same pattern repeats in the other partitioners in this file.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]