mjsax commented on code in PR #16791: URL: https://github.com/apache/kafka/pull/16791#discussion_r1704716420
########## streams/src/main/java/org/apache/kafka/streams/Topology.java: ########## @@ -820,110 +778,6 @@ public synchronized <KIn, VIn> Topology addReadOnlyStateStore(final StoreBuilder ); } - /** - * Adds a global {@link StateStore} to the topology. - * The {@link StateStore} sources its data from all partitions of the provided input topic. - * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance. - * <p> - * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions - * of the input topic. - * <p> - * The provided {@link org.apache.kafka.streams.processor.ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all - * records forwarded from the {@link SourceNode}. - * The supplier should always generate a new instance each time - * {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} gets called. Creating a single - * {@link org.apache.kafka.streams.processor.Processor} object and returning the same object reference in - * {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} would be a violation of the supplier pattern - * and leads to runtime exceptions. - * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date. - * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. - * - * @param storeBuilder user defined state store builder - * @param sourceName name of the {@link SourceNode} that will be automatically added - * @param keyDeserializer the {@link Deserializer} to deserialize keys with - * @param valueDeserializer the {@link Deserializer} to deserialize values with - * @param topic the topic to source the data from - * @param processorName the name of the {@link org.apache.kafka.streams.processor.ProcessorSupplier} - * @param stateUpdateSupplier the instance of {@link org.apache.kafka.streams.processor.ProcessorSupplier} - * @return itself - * @throws TopologyException if the processor of state is already registered - * @deprecated Since 2.7.0. Use {@link #addGlobalStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier)} instead. - */ - @Deprecated - public synchronized <K, V> Topology addGlobalStore(final StoreBuilder<?> storeBuilder, - final String sourceName, - final Deserializer<K> keyDeserializer, - final Deserializer<V> valueDeserializer, - final String topic, - final String processorName, - final org.apache.kafka.streams.processor.ProcessorSupplier<K, V> stateUpdateSupplier) { - internalTopologyBuilder.addGlobalStore( - new StoreBuilderWrapper(storeBuilder), - sourceName, - null, - keyDeserializer, - valueDeserializer, - topic, - processorName, - () -> ProcessorAdapter.adapt(stateUpdateSupplier.get()), - true - ); - return this; - } - - /** - * Adds a global {@link StateStore} to the topology. - * The {@link StateStore} sources its data from all partitions of the provided input topic. - * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance. - * <p> - * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions - * of the input topic. - * <p> - * The provided {@link org.apache.kafka.streams.processor.ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all - * records forwarded from the {@link SourceNode}. - * The supplier should always generate a new instance each time - * {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} gets called. Creating a single - * {@link org.apache.kafka.streams.processor.Processor} object and returning the same object reference in - * {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} would be a violation of the supplier pattern - * and leads to runtime exceptions. - * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date. - * - * @param storeBuilder user defined key value store builder - * @param sourceName name of the {@link SourceNode} that will be automatically added - * @param timestampExtractor the stateless timestamp extractor used for this source, - * if not specified the default extractor defined in the configs will be used - * @param keyDeserializer the {@link Deserializer} to deserialize keys with - * @param valueDeserializer the {@link Deserializer} to deserialize values with - * @param topic the topic to source the data from - * @param processorName the name of the {@link org.apache.kafka.streams.processor.ProcessorSupplier} - * @param stateUpdateSupplier the instance of {@link org.apache.kafka.streams.processor.ProcessorSupplier} - * @return itself - * @throws TopologyException if the processor of state is already registered - * @deprecated Since 2.7.0. Use {@link #addGlobalStore(StoreBuilder, String, TimestampExtractor, Deserializer, Deserializer, String, String, ProcessorSupplier)} instead. - */ - @Deprecated - public synchronized <K, V> Topology addGlobalStore(final StoreBuilder<?> storeBuilder, - final String sourceName, - final TimestampExtractor timestampExtractor, - final Deserializer<K> keyDeserializer, - final Deserializer<V> valueDeserializer, - final String topic, - final String processorName, - final org.apache.kafka.streams.processor.ProcessorSupplier<K, V> stateUpdateSupplier) { - internalTopologyBuilder.addGlobalStore( - new StoreBuilderWrapper(storeBuilder), - sourceName, - timestampExtractor, - keyDeserializer, - valueDeserializer, - topic, - processorName, - () -> ProcessorAdapter.adapt(stateUpdateSupplier.get()), Review Comment: Do we use `ProcessorAdapter.adapt(...)` elsewhere? If not, we can remove it, too. (Maybe if a follow up PR if this PR becomes too big -- it's simple, but it's already more than 700 LOC...) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org