mjsax commented on code in PR #16791:
URL: https://github.com/apache/kafka/pull/16791#discussion_r1704716420


##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -820,110 +778,6 @@ public synchronized <KIn, VIn> Topology 
addReadOnlyStateStore(final StoreBuilder
         );
     }
 
-    /**
-     * Adds a global {@link StateStore} to the topology.
-     * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
-     * There will be exactly one instance of this {@link StateStore} per Kafka 
Streams instance.
-     * <p>
-     * A {@link SourceNode} with the provided sourceName will be added to 
consume the data arriving from the partitions
-     * of the input topic.
-     * <p>
-     * The provided {@link 
org.apache.kafka.streams.processor.ProcessorSupplier} will be used to create an 
{@link ProcessorNode} that will receive all
-     * records forwarded from the {@link SourceNode}.
-     * The supplier should always generate a new instance each time
-     * {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} gets 
called. Creating a single
-     * {@link org.apache.kafka.streams.processor.Processor} object and 
returning the same object reference in
-     * {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} 
would be a violation of the supplier pattern
-     * and leads to runtime exceptions.
-     * This {@link ProcessorNode} should be used to keep the {@link 
StateStore} up-to-date.
-     * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
-     *
-     * @param storeBuilder          user defined state store builder
-     * @param sourceName            name of the {@link SourceNode} that will 
be automatically added
-     * @param keyDeserializer       the {@link Deserializer} to deserialize 
keys with
-     * @param valueDeserializer     the {@link Deserializer} to deserialize 
values with
-     * @param topic                 the topic to source the data from
-     * @param processorName         the name of the {@link 
org.apache.kafka.streams.processor.ProcessorSupplier}
-     * @param stateUpdateSupplier   the instance of {@link 
org.apache.kafka.streams.processor.ProcessorSupplier}
-     * @return itself
-     * @throws TopologyException if the processor of state is already 
registered
-     * @deprecated Since 2.7.0. Use {@link #addGlobalStore(StoreBuilder, 
String, Deserializer, Deserializer, String, String, ProcessorSupplier)} instead.
-     */
-    @Deprecated
-    public synchronized <K, V> Topology addGlobalStore(final StoreBuilder<?> 
storeBuilder,
-                                                       final String sourceName,
-                                                       final Deserializer<K> 
keyDeserializer,
-                                                       final Deserializer<V> 
valueDeserializer,
-                                                       final String topic,
-                                                       final String 
processorName,
-                                                       final 
org.apache.kafka.streams.processor.ProcessorSupplier<K, V> stateUpdateSupplier) 
{
-        internalTopologyBuilder.addGlobalStore(
-            new StoreBuilderWrapper(storeBuilder),
-            sourceName,
-            null,
-            keyDeserializer,
-            valueDeserializer,
-            topic,
-            processorName,
-            () -> ProcessorAdapter.adapt(stateUpdateSupplier.get()),
-            true
-        );
-        return this;
-    }
-
-    /**
-     * Adds a global {@link StateStore} to the topology.
-     * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
-     * There will be exactly one instance of this {@link StateStore} per Kafka 
Streams instance.
-     * <p>
-     * A {@link SourceNode} with the provided sourceName will be added to 
consume the data arriving from the partitions
-     * of the input topic.
-     * <p>
-     * The provided {@link 
org.apache.kafka.streams.processor.ProcessorSupplier} will be used to create an 
{@link ProcessorNode} that will receive all
-     * records forwarded from the {@link SourceNode}.
-     * The supplier should always generate a new instance each time
-     * {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} gets 
called. Creating a single
-     * {@link org.apache.kafka.streams.processor.Processor} object and 
returning the same object reference in
-     * {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} 
would be a violation of the supplier pattern
-     * and leads to runtime exceptions.
-     * This {@link ProcessorNode} should be used to keep the {@link 
StateStore} up-to-date.
-     *
-     * @param storeBuilder          user defined key value store builder
-     * @param sourceName            name of the {@link SourceNode} that will 
be automatically added
-     * @param timestampExtractor    the stateless timestamp extractor used for 
this source,
-     *                              if not specified the default extractor 
defined in the configs will be used
-     * @param keyDeserializer       the {@link Deserializer} to deserialize 
keys with
-     * @param valueDeserializer     the {@link Deserializer} to deserialize 
values with
-     * @param topic                 the topic to source the data from
-     * @param processorName         the name of the {@link 
org.apache.kafka.streams.processor.ProcessorSupplier}
-     * @param stateUpdateSupplier   the instance of {@link 
org.apache.kafka.streams.processor.ProcessorSupplier}
-     * @return itself
-     * @throws TopologyException if the processor of state is already 
registered
-     * @deprecated Since 2.7.0. Use {@link #addGlobalStore(StoreBuilder, 
String, TimestampExtractor, Deserializer, Deserializer, String, String, 
ProcessorSupplier)} instead.
-     */
-    @Deprecated
-    public synchronized <K, V> Topology addGlobalStore(final StoreBuilder<?> 
storeBuilder,
-                                                       final String sourceName,
-                                                       final 
TimestampExtractor timestampExtractor,
-                                                       final Deserializer<K> 
keyDeserializer,
-                                                       final Deserializer<V> 
valueDeserializer,
-                                                       final String topic,
-                                                       final String 
processorName,
-                                                       final 
org.apache.kafka.streams.processor.ProcessorSupplier<K, V> stateUpdateSupplier) 
{
-        internalTopologyBuilder.addGlobalStore(
-            new StoreBuilderWrapper(storeBuilder),
-            sourceName,
-            timestampExtractor,
-            keyDeserializer,
-            valueDeserializer,
-            topic,
-            processorName,
-            () -> ProcessorAdapter.adapt(stateUpdateSupplier.get()),

Review Comment:
   Do we use `ProcessorAdapter.adapt(...)` elsewhere? If not, we can remove it, 
too. (Maybe if a follow up PR if this PR becomes too big -- it's simple, but 
it's already more than 700 LOC...)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to