vvcephei commented on a change in pull request #8574:
URL: https://github.com/apache/kafka/pull/8574#discussion_r416764158



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionWrapperSerde.java
##########
@@ -76,6 +78,10 @@ public void setIfUnset(final Serializer<K> 
defaultSerializer) {
                 throw new UnsupportedVersionException("SubscriptionWrapper 
version is larger than maximum supported 0x7F");
             }
 
+            if (primaryKeySerializationPseudoTopic == null) {
+                primaryKeySerializationPseudoTopic = 
primaryKeySerializationPseudoTopicSupplier.get();
+            }

Review comment:
       This (and below) is a bit awkward.
   
   Our requirement is not to call the supplier until after the app starts, but 
we can call it any time after the app starts.
   
   The natural place would be in `configure`, but unfortunately, that method is 
basically useless for our internal serdes. The reason is that we previously 
decided that `configure` should be called externally to the DSL, but our 
internal serdes are constructed _internal_ to the DSL. Plus, `configure` must 
be called at run time (when the config is available), but by run time, we can 
no longer tell whether our serde is "internal" or not. So, there's no good 
place where we can call `configure` for our internal serdes.
   
   I'm side-stepping the problem here by just invoking the supplier when we 
first need to use it, which is also at run time.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -972,13 +974,26 @@ boolean sendingOldValueEnabled() {
         //This occurs whenever the extracted foreignKey changes values.
         enableSendingOldValues();
 
+        final NamedInternal renamed = new NamedInternal(joinName);
+
+        final String subscriptionTopicName = renamed.suffixWithOrElseGet(
+            "-subscription-registration",
+            builder,
+            SUBSCRIPTION_REGISTRATION
+        ) + TOPIC_SUFFIX;
 
+        // the decoration can't be performed until we have the configuration 
available when the app runs,
+        // so we pass Suppliers into the components, which they can call at 
run time

Review comment:
       Hopefully, this explains what's going on here.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -1224,6 +1224,10 @@ private static Pattern buildPattern(final 
Collection<String> sourceTopics,
         return decoratedTopics;
     }
 
+    public String decoratePseudoTopic(final String topic) {

Review comment:
       I'm adding a new public method for our specific use case here, to 
document that we should _only_ need to invoke this method publicly for "pseudo" 
topics.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
##########
@@ -218,19 +220,19 @@ public void shouldUseExpectedTopicsWithSerde() {
         }
         // verifying primarily that no extra pseudo-topics were used, but it's 
nice to also verify the rest of the
         // topics our serdes serialize data for
-        assertThat(serdeScope.registeredTopics(), CoreMatchers.is(mkSet(
+        assertThat(serdeScope.registeredTopics(), is(mkSet(
             // expected pseudo-topics
-            
"KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-fk--key",
-            
"KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-pk--key",
-            
"KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-vh--value",
+            applicationId + 
"-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-fk--key",
+            applicationId + 
"-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-pk--key",
+            applicationId + 
"-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic-vh--value",

Review comment:
       This verifies the fix: the pseudo topics should also be prefixed. I 
should have noticed before that they weren't.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to