Repository: kafka Updated Branches: refs/heads/trunk c58790595 -> 37ec15e96
KAFKA-5931; deprecate KTable#through and KTable#to Author: Damian Guy <damian....@gmail.com> Reviewers: Matthias J. Sax <matth...@confluent.io>, Bill Bejeck <b...@confluent.io>, Guozhang Wang <wangg...@gmail.com> Closes #3903 from dguy/deprectate-to-through Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/37ec15e9 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/37ec15e9 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/37ec15e9 Branch: refs/heads/trunk Commit: 37ec15e9627e2fe68d78eb6d95e9a117e3bca320 Parents: c587905 Author: Damian Guy <damian....@gmail.com> Authored: Wed Sep 20 12:04:13 2017 +0100 Committer: Damian Guy <damian....@gmail.com> Committed: Wed Sep 20 12:04:13 2017 +0100 ---------------------------------------------------------------------- .../apache/kafka/streams/kstream/KTable.java | 84 +++++++++++++++----- 1 file changed, 64 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/37ec15e9/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index 6d1d85d..66ec0d7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -38,7 +38,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; * {@code KTable} is an abstraction of a <i>changelog stream</i> from a primary-keyed table. * Each record in this changelog stream is an update on the primary-keyed table with the record key as the primary key. * <p> - * A {@code KTable} is either {@link StreamsBuilder#table(String, String) defined from a single Kafka topic} that is + * A {@code KTable} is either {@link StreamsBuilder#table(String) defined from a single Kafka topic} that is * consumed message by message or the result of a {@code KTable} transformation. * An aggregation of a {@link KStream} also yields a {@code KTable}. * <p> @@ -66,7 +66,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; * @see KStream * @see KGroupedTable * @see GlobalKTable - * @see StreamsBuilder#table(String, String) + * @see StreamsBuilder#table(String) */ @InterfaceStability.Evolving public interface KTable<K, V> { @@ -763,17 +763,20 @@ public interface KTable<K, V> { * started). * <p> * This is equivalent to calling {@link #to(String) #to(someTopicName)} and - * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}. + * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}. * <p> * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf. - * {@link StreamsBuilder#table(String, String)}) + * {@link StreamsBuilder#table(String, Materialized)}) * The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'. * * @param topic the topic name * @param queryableStoreName the state store name used for the result {@code KTable}; valid characters are ASCII * alphanumerics, '.', '_' and '-'. If {@code null} this is the equivalent of {@link KTable#through(String)()} * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} + * @deprecated use {@link #toStream()} followed by {@link KStream#to(String)} + * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable} */ + @Deprecated KTable<K, V> through(final String topic, final String queryableStoreName); @@ -784,16 +787,19 @@ public interface KTable<K, V> { * started). * <p> * This is equivalent to calling {@link #to(String) #to(someTopicName)} and - * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}. + * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}. * <p> * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf. - * {@link StreamsBuilder#table(String, String)}) + * {@link StreamsBuilder#table(String, Materialized)}) * The store name must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'. * * @param topic the topic name * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} + * @deprecated use {@link #toStream()} followed by {@link KStream#to(String)} + * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable} */ + @Deprecated KTable<K, V> through(final String topic, final StateStoreSupplier<KeyValueStore> storeSupplier); @@ -804,14 +810,17 @@ public interface KTable<K, V> { * started). * <p> * This is equivalent to calling {@link #to(String) #to(someTopicName)} and - * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName)}. + * {@link StreamsBuilder#table(String) StreamsBuilder#table(someTopicName)}. * <p> * The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf. * {@link StreamsBuilder#table(String)}) * * @param topic the topic name * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} + * @deprecated use {@link #toStream()} followed by {@link KStream#to(String)} + * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable} */ + @Deprecated KTable<K, V> through(final String topic); /** @@ -822,7 +831,7 @@ public interface KTable<K, V> { * started). * <p> * This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and - * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName)}. + * {@link StreamsBuilder#table(String) StreamsBuilder#table(someTopicName)}. * <p> * The resulting {@code KTable} will be materialized in a local state store with an internal store name (cf. * {@link StreamsBuilder#table(String)}) @@ -831,7 +840,10 @@ public interface KTable<K, V> { * if not specified producer's {@link DefaultPartitioner} will be used * @param topic the topic name * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} + * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)} + * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable} */ + @Deprecated KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner, final String topic); @@ -843,10 +855,10 @@ public interface KTable<K, V> { * started). * <p> * This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and - * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}. + * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}. * <p> * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf. - * {@link StreamsBuilder#table(String, String)}) + * {@link StreamsBuilder#table(String, Materialized)}) * * @param partitioner the function used to determine how records are distributed among partitions of the topic, * if not specified producer's {@link DefaultPartitioner} will be used @@ -854,7 +866,10 @@ public interface KTable<K, V> { * @param queryableStoreName the state store name used for the result {@code KTable}. * If {@code null} this is the equivalent of {@link KTable#through(StreamPartitioner, String)} * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} + * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)} + * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable} */ + @Deprecated KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner, final String topic, final String queryableStoreName); @@ -867,17 +882,20 @@ public interface KTable<K, V> { * started). * <p> * This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and - * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}. + * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}. * <p> * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf. - * {@link StreamsBuilder#table(String, String)}) + * {@link StreamsBuilder#table(String, Materialized)}) * * @param partitioner the function used to determine how records are distributed among partitions of the topic, * if not specified producer's {@link DefaultPartitioner} will be used * @param topic the topic name * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} + * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)} + * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable} */ + @Deprecated KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner, final String topic, final StateStoreSupplier<KeyValueStore> storeSupplier); @@ -891,10 +909,10 @@ public interface KTable<K, V> { * used—otherwise producer's {@link DefaultPartitioner} is used. * <p> * This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and - * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}. + * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}. * <p> * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf. - * {@link StreamsBuilder#table(String, String)}) + * {@link StreamsBuilder#table(String, Materialized)}) * * @param keySerde key serde used to send key-value pairs, * if not specified the default key serde defined in the configuration will be used @@ -904,7 +922,10 @@ public interface KTable<K, V> { * @param queryableStoreName the state store name used for the result {@code KTable}. * If {@code null} this is the equivalent of {@link KTable#through(Serde, Serde, String)()} * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} + * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)} + * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable} */ + @Deprecated KTable<K, V> through(final Serde<K> keySerde, Serde<V> valSerde, final String topic, final String queryableStoreName); @@ -918,10 +939,10 @@ public interface KTable<K, V> { * used—otherwise producer's {@link DefaultPartitioner} is used. * <p> * This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and - * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}. + * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}. * <p> * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf. - * {@link StreamsBuilder#table(String, String)}) + * {@link StreamsBuilder#table(String, Materialized)}) * * @param keySerde key serde used to send key-value pairs, * if not specified the default key serde defined in the configuration will be used @@ -930,7 +951,10 @@ public interface KTable<K, V> { * @param topic the topic name * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} + * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)} + * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable} */ + @Deprecated KTable<K, V> through(final Serde<K> keySerde, Serde<V> valSerde, final String topic, final StateStoreSupplier<KeyValueStore> storeSupplier); @@ -955,7 +979,10 @@ public interface KTable<K, V> { * if not specified the default value serde defined in the configuration will be used * @param topic the topic name * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} + * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)} + * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable} */ + @Deprecated KTable<K, V> through(final Serde<K> keySerde, Serde<V> valSerde, final String topic); @@ -967,10 +994,10 @@ public interface KTable<K, V> { * <p> * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String) * #to(keySerde, valueSerde, partitioner, someTopicName)} and - * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}. + * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}. * <p> * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf. - * {@link StreamsBuilder#table(String, String)}) + * {@link StreamsBuilder#table(String, Materialized)}) * * @param keySerde key serde used to send key-value pairs, * if not specified the default key serde defined in the configuration will be used @@ -984,7 +1011,10 @@ public interface KTable<K, V> { * @param queryableStoreName the state store name used for the result {@code KTable}. * If {@code null} this is the equivalent of {@link KTable#through(Serde, Serde, StreamPartitioner, String)()} * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} + * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)} + * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable} */ + @Deprecated KTable<K, V> through(final Serde<K> keySerde, final Serde<V> valSerde, final StreamPartitioner<? super K, ? super V> partitioner, @@ -999,10 +1029,10 @@ public interface KTable<K, V> { * <p> * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String) * #to(keySerde, valueSerde, partitioner, someTopicName)} and - * {@link StreamsBuilder#table(String, String) StreamsBuilder#table(someTopicName, queryableStoreName)}. + * {@link StreamsBuilder#table(String, Materialized) StreamsBuilder#table(someTopicName, queryableStoreName)}. * <p> * The resulting {@code KTable} will be materialized in a local state store with the given store name (cf. - * {@link StreamsBuilder#table(String, String)}) + * {@link StreamsBuilder#table(String, Materialized)}) * * @param keySerde key serde used to send key-value pairs, * if not specified the default key serde defined in the configuration will be used @@ -1015,7 +1045,10 @@ public interface KTable<K, V> { * @param topic the topic name * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} + * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)} + * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable} */ + @Deprecated KTable<K, V> through(final Serde<K> keySerde, final Serde<V> valSerde, final StreamPartitioner<? super K, ? super V> partitioner, @@ -1045,7 +1078,10 @@ public interface KTable<K, V> { * be used * @param topic the topic name * @return a {@code KTable} that contains the exact same (and potentially repartitioned) records as this {@code KTable} + * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)} + * and {@link StreamsBuilder#table(String)} to read back as a {@code KTable} */ + @Deprecated KTable<K, V> through(final Serde<K> keySerde, final Serde<V> valSerde, final StreamPartitioner<? super K, ? super V> partitioner, @@ -1058,7 +1094,9 @@ public interface KTable<K, V> { * started). * * @param topic the topic name + * @deprecated use {@link #toStream()} followed by {@link KStream#to(String)} */ + @Deprecated void to(final String topic); /** @@ -1070,7 +1108,9 @@ public interface KTable<K, V> { * @param partitioner the function used to determine how records are distributed among partitions of the topic, * if not specified producer's {@link DefaultPartitioner} will be used * @param topic the topic name + * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)} */ + @Deprecated void to(final StreamPartitioner<? super K, ? super V> partitioner, final String topic); @@ -1087,7 +1127,9 @@ public interface KTable<K, V> { * @param valSerde value serde used to send key-value pairs, * if not specified the default value serde defined in the configuration will be used * @param topic the topic name + * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)} */ + @Deprecated void to(final Serde<K> keySerde, final Serde<V> valSerde, final String topic); @@ -1107,7 +1149,9 @@ public interface KTable<K, V> { * {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} will * be used * @param topic the topic name + * @deprecated use {@link #toStream()} followed by {@link KStream#to(String, Produced)} */ + @Deprecated void to(final Serde<K> keySerde, final Serde<V> valSerde, final StreamPartitioner<? super K, ? super V> partitioner,