mjsax commented on code in PR #20605:
URL: https://github.com/apache/kafka/pull/20605#discussion_r2388371695


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java:
##########
@@ -128,47 +128,105 @@ public void process(final Record<KLeft, Change<VLeft>> 
record) {
         }
 
         private void leftJoinInstructions(final Record<KLeft, Change<VLeft>> 
record) {
-            if (record.value().oldValue != null) {
-                final KRight oldForeignKey = 
foreignKeyExtractor.extract(record.key(), record.value().oldValue);
-                final KRight newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
-                if (oldForeignKey != null && 
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
+            final VLeft oldValue = record.value().oldValue;
+            final VLeft newValue = record.value().newValue;
+
+            if (oldValue == null && newValue == null) {
+                // no output for idempotent left hand side deletes
+                return;
+            }
+
+            final KRight oldForeignKey = oldValue == null ? null : 
foreignKeyExtractor.extract(record.key(), oldValue);
+            final KRight newForeignKey = newValue == null ? null : 
foreignKeyExtractor.extract(record.key(), newValue);
+
+            final boolean maybeUnsubscribe = oldForeignKey != null;
+            if (maybeUnsubscribe) {
+                // delete old subscription only if FK changed
+                //
+                // if FK did change, we need to explicitly delete the old 
subscription,
+                // because the new subscription goes to a different partition
+                final boolean foreignKeyChanged = 
!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey));
+
+                if (foreignKeyChanged) {
+                    // this may lead to unnecessary tombstones if the old FK 
did not join;
+                    // however, we cannot avoid it as we have no means to know 
if the old FK joined or not
                     forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
                 }
-                forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
-            } else if (record.value().newValue != null) {
-                final KRight newForeignKey = 
foreignKeyExtractor.extract(record.key(), record.value().newValue);
-                forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
             }
+
+            // for all cases (insert, update, and delete), we send a new 
subscription;
+            // we need to get a response back for all cases to always produce 
a left-join result
+            //
+            // note: for delete, `newForeignKey` is null, what is a "hack"
+            // no actual subscription will be added for null-FK on the right 
hand sice, but we still get the response back we need
+            //
+            // this may lead to unnecessary tombstones if the old FK did not 
join;
+            // however, we cannot avoid it as we have no means to know if the 
old FK joined or not
+            forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
         }
 
         private void defaultJoinInstructions(final Record<KLeft, 
Change<VLeft>> record) {
-            if (record.value().oldValue != null) {
-                final KRight oldForeignKey = 
foreignKeyExtractor.extract(record.key(), record.value().oldValue);
-                final KRight newForeignKey = record.value().newValue == null ? 
null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
+            final VLeft oldValue = record.value().oldValue;
+            final VLeft newValue = record.value().newValue;
+
+            final KRight oldForeignKey = oldValue == null ? null : 
foreignKeyExtractor.extract(record.key(), oldValue);
+            final boolean needToUnsubscribe = oldForeignKey != null;
 
-                if (oldForeignKey == null && newForeignKey == null) {
+            // if left row is inserted or updated, subscribe to new FK (if new 
FK is valid)
+            if (newValue != null) {
+                final KRight newForeignKey = 
foreignKeyExtractor.extract(record.key(), newValue);
+
+                if (newForeignKey == null) { // invalid FK
                     logSkippedRecordDueToNullForeignKey();
-                } else if (oldForeignKey == null) {
-                    forward(record, newForeignKey, 
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
-                } else if (newForeignKey == null) {
-                    forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
-                } else if (!Arrays.equals(serialize(newForeignKey), 
serialize(oldForeignKey))) {
-                    //Different Foreign Key - delete the old key value and 
propagate the new one.
-                    //Delete it from the oldKey's state store
-                    forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
-                    //Add to the newKey's state store. Additionally, propagate 
null if no FK is found there,
-                    //since we must "unset" any output set by the previous 
FK-join. This is true for both INNER
-                    //and LEFT join.
-                    forward(record, newForeignKey, 
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
-                } else { // unchanged FK
-                    forward(record, newForeignKey, 
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
+                    if (needToUnsubscribe) {
+                        // delete old subscription
+                        //
+                        // this may lead to unnecessary tombstones if the old 
FK did not join;
+                        // however, we cannot avoid it as we have no means to 
know if the old FK joined or not
+                        forward(record, oldForeignKey, 
DELETE_KEY_AND_PROPAGATE);
+                    }
+                } else { // valid FK

Review Comment:
   Not sure if I can follow?
   ```
   if (newForeignKey = null) {
   ...
   } else { // what would we add here? -> `else if (newForeignKey != null)` 
would be redundant 
   ...
   }



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to