vvcephei commented on a change in pull request #11412:
URL: https://github.com/apache/kafka/pull/11412#discussion_r744947553



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignJoinSubscriptionSendProcessorSupplier.java
##########
@@ -93,28 +97,28 @@ public void init(final 
org.apache.kafka.streams.processor.ProcessorContext conte
         }
 
         @Override
-        public void process(final K key, final Change<V> change) {
-            final long[] currentHash = change.newValue == null ?
+        public void process(final Record<K, Change<V>> record) {
+            final long[] currentHash = record.value().newValue == null ?
                 null :
-                Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, 
change.newValue));
+                Murmur3.hash128(valueSerializer.serialize(valueSerdeTopic, 
record.value().newValue));
 
-            if (change.oldValue != null) {
-                final KO oldForeignKey = 
foreignKeyExtractor.apply(change.oldValue);
+            if (record.value().oldValue != null) {
+                final KO oldForeignKey = 
foreignKeyExtractor.apply(record.value().oldValue);
                 if (oldForeignKey == null) {
-                    LOG.warn(
-                        "Skipping record due to null foreign key. value=[{}] 
topic=[{}] partition=[{}] offset=[{}]",
-                        change.oldValue, context().topic(), 
context().partition(), context().offset()
-                    );
+//                    LOG.warn(
+//                        "Skipping record due to null foreign key. value=[{}] 
topic=[{}] partition=[{}] offset=[{}]",
+//                        change.oldValue, context().topic(), 
context().partition(), context().offset()
+//                    );

Review comment:
       What's up with this? (and below)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
##########
@@ -142,7 +142,7 @@ public void process(final Record<KIn, VIn> record) {
         private TimestampedKeyValueStore<KIn, VAgg> store;
 
         @Override
-        public void init(final 
org.apache.kafka.streams.processor.ProcessorContext context) {
+        public void init(final ProcessorContext<?, ?> context) {

Review comment:
       I think the forwarding type for a value getter should be `<Void, Void>`, 
right?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
##########
@@ -159,17 +158,17 @@ public void 
shouldEmitTombstoneForInnerJoinWhenRightIsNull() {
                 JOINER,
                 leftJoin
             );
-        final org.apache.kafka.streams.processor.Processor<String, 
SubscriptionResponseWrapper<String>> processor = processorSupplier.get();
-        final MockProcessorContext context = new MockProcessorContext();
+        final Processor<String, SubscriptionResponseWrapper<String>, String, 
String> processor = processorSupplier.get();
+        final MockProcessorContext<String, String> context = new 
MockProcessorContext<>();
         processor.init(context);
-        context.setRecordMetadata("topic", 0, 0, new RecordHeaders(), 0);
+        context.setRecordMetadata("topic", 0, 0);
 
         valueGetterSupplier.put("lhs1", "lhsValue");
         final long[] hash = 
Murmur3.hash128(STRING_SERIALIZER.serialize("topic-join-resolver", "lhsValue"));
-        processor.process("lhs1", new SubscriptionResponseWrapper<>(hash, 
null));
-        final List<MockProcessorContext.CapturedForward> forwarded = 
context.forwarded();
+        processor.process(new Record<>("lhs1", new 
SubscriptionResponseWrapper<>(hash, null), 0));
+        final List<MockProcessorContext.CapturedForward<? extends String, ? 
extends String>> forwarded = context.forwarded();
         assertThat(forwarded.size(), is(1));
-        assertThat(forwarded.get(0).keyValue(), is(new KeyValue<>("lhs1", 
null)));
+//        assertThat(forwarded.get(0).keyValue(), is(new KeyValue<>("lhs1", 
null)));

Review comment:
       Another disabled assertion.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
##########
@@ -133,17 +132,17 @@ public void shouldForwardWhenHashMatches() {
                 JOINER,
                 leftJoin
             );
-        final org.apache.kafka.streams.processor.Processor<String, 
SubscriptionResponseWrapper<String>> processor = processorSupplier.get();
-        final MockProcessorContext context = new MockProcessorContext();
+        final Processor<String, SubscriptionResponseWrapper<String>, String, 
String> processor = processorSupplier.get();
+        final MockProcessorContext<String, String> context = new 
MockProcessorContext<>();
         processor.init(context);
-        context.setRecordMetadata("topic", 0, 0, new RecordHeaders(), 0);
+        context.setRecordMetadata("topic", 0, 0);
 
         valueGetterSupplier.put("lhs1", "lhsValue");
         final long[] hash = 
Murmur3.hash128(STRING_SERIALIZER.serialize("topic-join-resolver", "lhsValue"));
-        processor.process("lhs1", new SubscriptionResponseWrapper<>(hash, 
"rhsValue"));
-        final List<MockProcessorContext.CapturedForward> forwarded = 
context.forwarded();
+        processor.process(new Record<>("lhs1", new 
SubscriptionResponseWrapper<>(hash, "rhsValue"), 0));
+        final List<MockProcessorContext.CapturedForward<? extends String, ? 
extends String>> forwarded = context.forwarded();
         assertThat(forwarded.size(), is(1));
-        assertThat(forwarded.get(0).keyValue(), is(new KeyValue<>("lhs1", 
"(lhsValue,rhsValue)")));
+//        assertThat(forwarded.get(0).keyValue(), is(new KeyValue<>("lhs1", 
"(lhsValue,rhsValue)")));

Review comment:
       Looks like this needs to be re-enabled.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
##########
@@ -211,16 +210,16 @@ public void 
shouldEmitTombstoneForLeftJoinWhenRightIsNullAndLeftIsNull() {
                 JOINER,
                 leftJoin
             );
-        final org.apache.kafka.streams.processor.Processor<String, 
SubscriptionResponseWrapper<String>> processor = processorSupplier.get();
-        final MockProcessorContext context = new MockProcessorContext();
+        final Processor<String, SubscriptionResponseWrapper<String>, String, 
String> processor = processorSupplier.get();
+        final MockProcessorContext<String, String> context = new 
MockProcessorContext<>();
         processor.init(context);
-        context.setRecordMetadata("topic", 0, 0, new RecordHeaders(), 0);
+        context.setRecordMetadata("topic", 0, 0);
 
         valueGetterSupplier.put("lhs1", null);
         final long[] hash = null;
-        processor.process("lhs1", new SubscriptionResponseWrapper<>(hash, 
null));
-        final List<MockProcessorContext.CapturedForward> forwarded = 
context.forwarded();
+        processor.process(new Record<>("lhs1", new 
SubscriptionResponseWrapper<>(hash, null), 0));
+        final List<MockProcessorContext.CapturedForward<? extends String, ? 
extends String>> forwarded = context.forwarded();
         assertThat(forwarded.size(), is(1));
-        assertThat(forwarded.get(0).keyValue(), is(new KeyValue<>("lhs1", 
null)));
+//        assertThat(forwarded.get(0).keyValue(), is(new KeyValue<>("lhs1", 
null)));

Review comment:
       And here

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResolverJoinProcessorSupplierTest.java
##########
@@ -185,17 +184,17 @@ public void shouldEmitResultForLeftJoinWhenRightIsNull() {
                 JOINER,
                 leftJoin
             );
-        final org.apache.kafka.streams.processor.Processor<String, 
SubscriptionResponseWrapper<String>> processor = processorSupplier.get();
-        final MockProcessorContext context = new MockProcessorContext();
+        final Processor<String, SubscriptionResponseWrapper<String>, String, 
String> processor = processorSupplier.get();
+        final MockProcessorContext<String, String> context = new 
MockProcessorContext<>();
         processor.init(context);
-        context.setRecordMetadata("topic", 0, 0, new RecordHeaders(), 0);
+        context.setRecordMetadata("topic", 0, 0);
 
         valueGetterSupplier.put("lhs1", "lhsValue");
         final long[] hash = 
Murmur3.hash128(STRING_SERIALIZER.serialize("topic-join-resolver", "lhsValue"));
-        processor.process("lhs1", new SubscriptionResponseWrapper<>(hash, 
null));
-        final List<MockProcessorContext.CapturedForward> forwarded = 
context.forwarded();
+        processor.process(new Record<>("lhs1", new 
SubscriptionResponseWrapper<>(hash, null), 0));
+        final List<MockProcessorContext.CapturedForward<? extends String, ? 
extends String>> forwarded = context.forwarded();
         assertThat(forwarded.size(), is(1));
-        assertThat(forwarded.get(0).keyValue(), is(new KeyValue<>("lhs1", 
"(lhsValue,null)")));
+//        assertThat(forwarded.get(0).keyValue(), is(new KeyValue<>("lhs1", 
"(lhsValue,null)")));

Review comment:
       And here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to