vvcephei commented on a change in pull request #9708:
URL: https://github.com/apache/kafka/pull/9708#discussion_r540375321



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/StreamJoined.java
##########
@@ -267,7 +296,49 @@ private StreamJoined(final Serde<K> keySerde,
             thisStoreSupplier,
             otherStoreSupplier,
             name,
-            storeName
+            storeName,
+            loggingEnabled,
+            topicConfig
+        );
+    }
+
+    /**
+     * Configures logging for both state stores. The changelog will be created 
with the provided configs.
+     * <p>
+     * Note: Any unrecognized configs will be ignored
+     * @param config  configs applied to the changelog topic
+     * @return            a new {@link StreamJoined} configured with logging 
enabled
+     */
+    public StreamJoined<K, V1, V2> withLoggingEnabled(final Map<String, 
String> config) {
+
+        return new StreamJoined<>(
+            keySerde,
+            valueSerde,
+            otherValueSerde,
+            thisStoreSupplier,
+            otherStoreSupplier,
+            name,
+            storeName,
+            true,
+            config
+        );
+    }
+
+    /**
+     * Disable change logging for both state stores.
+     * @return            a new {@link StreamJoined} configured with logging 
disabled
+     */
+    public StreamJoined<K, V1, V2> withLoggingDisabled() {
+        return new StreamJoined<>(
+            keySerde,
+            valueSerde,
+            otherValueSerde,
+            thisStoreSupplier,
+            otherStoreSupplier,
+            name,
+            storeName,
+            false,
+            null

Review comment:
       I think you meant this here:
   
   ```suggestion
               new HashMap<>()
   ```

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -152,6 +155,52 @@ private void 
shouldLogAndMeterOnSkippedRecordsWithNullValue(final String builtIn
         }
     }
 
+    @Test
+    public void shouldDisableLoggingOnStreamJoined() {
+
+        final JoinWindows joinWindows = 
JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+        final StreamJoined<String, Integer, Integer> streamJoined = 
StreamJoined.with(Serdes.String(), Serdes.Integer(), 
Serdes.Integer()).withStoreName("store").withLoggingDisabled();
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, Integer> left = builder.stream("left", 
Consumed.with(Serdes.String(), Serdes.Integer()));
+        final KStream<String, Integer> right = builder.stream("right", 
Consumed.with(Serdes.String(), Serdes.Integer()));
+
+        left.join(
+            right,
+            (value1, value2) -> value1 + value2,
+            joinWindows,
+            streamJoined);
+
+        final Topology topology = builder.build();
+        final InternalTopologyBuilder internalTopologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(topology);
+
+        
assertThat(internalTopologyBuilder.stateStores().get("store-this-join-store").loggingEnabled(),
 equalTo(false));
+        
assertThat(internalTopologyBuilder.stateStores().get("store-other-join-store").loggingEnabled(),
 equalTo(false));
+    }
+
+    @Test
+    public void shouldEnableLoggingWithCustomConfigOnStreamJoined() {
+
+        final JoinWindows joinWindows = 
JoinWindows.of(ofMillis(100)).grace(Duration.ofMillis(50));
+        final StreamJoined<String, Integer, Integer> streamJoined = 
StreamJoined.with(Serdes.String(), Serdes.Integer(), 
Serdes.Integer()).withStoreName("store").withLoggingEnabled(Collections.emptyMap());
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KStream<String, Integer> left = builder.stream("left", 
Consumed.with(Serdes.String(), Serdes.Integer()));
+        final KStream<String, Integer> right = builder.stream("right", 
Consumed.with(Serdes.String(), Serdes.Integer()));
+
+        left.join(
+            right,
+            (value1, value2) -> value1 + value2,
+            joinWindows,
+            streamJoined);
+
+        final Topology topology = builder.build();
+        final InternalTopologyBuilder internalTopologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(topology);
+
+        
assertThat(internalTopologyBuilder.stateStores().get("store-this-join-store").loggingEnabled(),
 equalTo(true));
+        
assertThat(internalTopologyBuilder.stateStores().get("store-other-join-store").loggingEnabled(),
 equalTo(true));

Review comment:
       It seems like these would pass by default. Maybe we should check set 
some log configs and then check that they got propagated ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to