mjsax commented on code in PR #15619:
URL: https://github.com/apache/kafka/pull/15619#discussion_r1543277915


##########
streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java:
##########
@@ -596,7 +597,9 @@ public synchronized <K, V> StreamsBuilder 
addGlobalStore(final StoreBuilder<?> s
      * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
      * @return itself
      * @throws TopologyException if the processor of state is already 
registered
+     * @deprecated Since 3.7.0; use {@link #addGlobalStore(StoreBuilder, 
String, Consumed, ProcessorSupplier, boolean)} or  {@link 
#addGlobalStore(StoreBuilder, String, String)}  instead.

Review Comment:
   ```suggestion
        * @deprecated Since 3.8.0; use {@link #addGlobalStore(StoreBuilder, 
String, Consumed, ProcessorSupplier, boolean)} or {@link 
#addGlobalStore(StoreBuilder, String, String)}  instead.
   ```



##########
streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java:
##########
@@ -613,6 +616,74 @@ public synchronized <KIn, VIn> StreamsBuilder 
addGlobalStore(final StoreBuilder<
         return this;
     }
 
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka 
Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to 
consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an
+     * {@link Processor} that will receive all records forwarded from the 
{@link SourceNode}.
+     * The supplier should always generate a new instance. Creating a single 
{@link Processor} object
+     * and returning the same object reference in {@link 
ProcessorSupplier#get()} is a
+     * violation of the supplier pattern and leads to runtime exceptions.
+     * This {@link Processor} should be used to keep the {@link StateStore} 
up-to-date.
+     * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
+     * <p>
+     * It is not required to connect a global store to the {@link Processor 
Processors},
+     * {@link Transformer Transformers}, or {@link ValueTransformer 
ValueTransformer}; those have read-only access to all global stores by default.
+     *
+     * @param storeBuilder          user defined {@link StoreBuilder}; can't 
be {@code null}
+     * @param topic                 the topic to source the data from
+     * @param consumed              the instance of {@link Consumed} used to 
define optional parameters; can't be {@code null}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @param reprocessOnRestore    restore by reprocessing the data using a 
processor supplied by stateUpdateSupplier or loads the data in byte for byte
+     * @return itself
+     * @throws TopologyException if the processor of state is already 
registered
+     */
+    public synchronized <KIn, VIn> StreamsBuilder addGlobalStore(final 
StoreBuilder<?> storeBuilder,
+                                                                 final String 
topic,
+                                                                 final 
Consumed<KIn, VIn> consumed,
+                                                                 final 
ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier,
+                                                                 final boolean 
reprocessOnRestore) {
+        Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
+        Objects.requireNonNull(consumed, "consumed can't be null");
+        internalStreamsBuilder.addGlobalStore(
+            new StoreBuilderWrapper(storeBuilder),
+            topic,
+            new ConsumedInternal<>(consumed),
+            stateUpdateSupplier,
+            reprocessOnRestore
+        );
+        return this;
+    }
+
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka 
Streams instance.
+     *
+     * @param storeBuilder          user defined {@link StoreBuilder}; can't 
be {@code null}
+     * @param topic                 the topic to source the data from
+     * @return itself
+     * @throws TopologyException if the processor of state is already 
registered
+     */
+    public synchronized  <KIn, VIn>  StreamsBuilder addGlobalStore(final 
StoreBuilder<?> storeBuilder,
+                                                      final String topic,
+                                                      final Consumed<KIn, VIn> 
consumed) {

Review Comment:
   nit: fix indention



##########
streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java:
##########
@@ -613,6 +616,74 @@ public synchronized <KIn, VIn> StreamsBuilder 
addGlobalStore(final StoreBuilder<
         return this;
     }
 
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka 
Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to 
consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an
+     * {@link Processor} that will receive all records forwarded from the 
{@link SourceNode}.
+     * The supplier should always generate a new instance. Creating a single 
{@link Processor} object
+     * and returning the same object reference in {@link 
ProcessorSupplier#get()} is a
+     * violation of the supplier pattern and leads to runtime exceptions.
+     * This {@link Processor} should be used to keep the {@link StateStore} 
up-to-date.
+     * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
+     * <p>
+     * It is not required to connect a global store to the {@link Processor 
Processors},
+     * {@link Transformer Transformers}, or {@link ValueTransformer 
ValueTransformer}; those have read-only access to all global stores by default.
+     *
+     * @param storeBuilder          user defined {@link StoreBuilder}; can't 
be {@code null}
+     * @param topic                 the topic to source the data from
+     * @param consumed              the instance of {@link Consumed} used to 
define optional parameters; can't be {@code null}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @param reprocessOnRestore    restore by reprocessing the data using a 
processor supplied by stateUpdateSupplier or loads the data in byte for byte
+     * @return itself
+     * @throws TopologyException if the processor of state is already 
registered
+     */
+    public synchronized <KIn, VIn> StreamsBuilder addGlobalStore(final 
StoreBuilder<?> storeBuilder,
+                                                                 final String 
topic,
+                                                                 final 
Consumed<KIn, VIn> consumed,
+                                                                 final 
ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier,
+                                                                 final boolean 
reprocessOnRestore) {
+        Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
+        Objects.requireNonNull(consumed, "consumed can't be null");
+        internalStreamsBuilder.addGlobalStore(
+            new StoreBuilderWrapper(storeBuilder),
+            topic,
+            new ConsumedInternal<>(consumed),
+            stateUpdateSupplier,
+            reprocessOnRestore
+        );
+        return this;
+    }
+
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka 
Streams instance.
+     *
+     * @param storeBuilder          user defined {@link StoreBuilder}; can't 
be {@code null}
+     * @param topic                 the topic to source the data from
+     * @return itself
+     * @throws TopologyException if the processor of state is already 
registered
+     */
+    public synchronized  <KIn, VIn>  StreamsBuilder addGlobalStore(final 
StoreBuilder<?> storeBuilder,
+                                                      final String topic,
+                                                      final Consumed<KIn, VIn> 
consumed) {
+        Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");

Review Comment:
   As above. Missing null checks?



##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -902,6 +940,7 @@ public synchronized <K, V> Topology addGlobalStore(final 
StoreBuilder<?> storeBu
      * @deprecated Since 2.7.0. Use {@link #addGlobalStore(StoreBuilder, 
String, TimestampExtractor, Deserializer, Deserializer, String, String, 
ProcessorSupplier)} instead.
      */
     @Deprecated
+    @SuppressWarnings("overloads")

Review Comment:
   Why do we need this?



##########
streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java:
##########
@@ -613,6 +616,74 @@ public synchronized <KIn, VIn> StreamsBuilder 
addGlobalStore(final StoreBuilder<
         return this;
     }
 
+    /**
+     * Adds a global {@link StateStore} to the topology.
+     * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
+     * There will be exactly one instance of this {@link StateStore} per Kafka 
Streams instance.
+     * <p>
+     * A {@link SourceNode} with the provided sourceName will be added to 
consume the data arriving from the partitions
+     * of the input topic.
+     * <p>
+     * The provided {@link ProcessorSupplier} will be used to create an
+     * {@link Processor} that will receive all records forwarded from the 
{@link SourceNode}.
+     * The supplier should always generate a new instance. Creating a single 
{@link Processor} object
+     * and returning the same object reference in {@link 
ProcessorSupplier#get()} is a
+     * violation of the supplier pattern and leads to runtime exceptions.
+     * This {@link Processor} should be used to keep the {@link StateStore} 
up-to-date.
+     * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
+     * <p>
+     * It is not required to connect a global store to the {@link Processor 
Processors},
+     * {@link Transformer Transformers}, or {@link ValueTransformer 
ValueTransformer}; those have read-only access to all global stores by default.
+     *
+     * @param storeBuilder          user defined {@link StoreBuilder}; can't 
be {@code null}
+     * @param topic                 the topic to source the data from
+     * @param consumed              the instance of {@link Consumed} used to 
define optional parameters; can't be {@code null}
+     * @param stateUpdateSupplier   the instance of {@link ProcessorSupplier}
+     * @param reprocessOnRestore    restore by reprocessing the data using a 
processor supplied by stateUpdateSupplier or loads the data in byte for byte
+     * @return itself
+     * @throws TopologyException if the processor of state is already 
registered
+     */
+    public synchronized <KIn, VIn> StreamsBuilder addGlobalStore(final 
StoreBuilder<?> storeBuilder,
+                                                                 final String 
topic,
+                                                                 final 
Consumed<KIn, VIn> consumed,
+                                                                 final 
ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier,
+                                                                 final boolean 
reprocessOnRestore) {
+        Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
+        Objects.requireNonNull(consumed, "consumed can't be null");

Review Comment:
   Seems to be an issue in existing code already, by it seems `topic` and 
`stateUpdateSupplier` cannot be `null` either.



##########
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala:
##########
@@ -200,7 +200,7 @@ class StreamsBuilder(inner: StreamsBuilderJ = new 
StreamsBuilderJ) {
     consumed: Consumed[K, V],
     stateUpdateSupplier: 
org.apache.kafka.streams.processor.api.ProcessorSupplier[K, V, Void, Void]
   ): StreamsBuilderJ =
-    inner.addGlobalStore(storeBuilder, topic, consumed, stateUpdateSupplier)
+    inner.addGlobalStore(storeBuilder, topic, consumed, stateUpdateSupplier, 
true)

Review Comment:
   Seem we would need to change `StreamsBuilder.scala`, too, and add an 
overload taking the new parameter? For this case, this KIP will need to cover 
this case.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java:
##########
@@ -43,8 +44,24 @@ public static <KIn, VIn, KOut, VOut> Processor<KIn, VIn, 
KOut, VOut> adaptRaw(fi
         }
     }
 
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    public static <KIn, VIn, KOut, VOut> Processor<KIn, VIn, KOut, VOut> 
adaptRaw(final Processor delegate) {

Review Comment:
   Why do we need this? If we have a `api.Processor` at hand, there should be 
no need for wrapping it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to