[ https://issues.apache.org/jira/browse/KAFKA-7456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16634766#comment-16634766 ]
ASF GitHub Bot commented on KAFKA-7456: --------------------------------------- mjsax closed pull request #5521: KAFKA-7456: Serde Inheritance in DSL URL: https://github.com/apache/kafka/pull/5521 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java index 6a851a10d9d..2474860a049 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowedSerdes.java @@ -24,7 +24,7 @@ static public class TimeWindowedSerde<T> extends Serdes.WrapperSerde<Windowed<T>> { // Default constructor needed for reflection object creation public TimeWindowedSerde() { - super(new TimeWindowedSerializer<T>(), new TimeWindowedDeserializer<T>()); + super(new TimeWindowedSerializer<>(), new TimeWindowedDeserializer<>()); } public TimeWindowedSerde(final Serde<T> inner) { @@ -35,7 +35,7 @@ public TimeWindowedSerde(final Serde<T> inner) { static public class SessionWindowedSerde<T> extends Serdes.WrapperSerde<Windowed<T>> { // Default constructor needed for reflection object creation public SessionWindowedSerde() { - super(new SessionWindowedSerializer<T>(), new SessionWindowedDeserializer<T>()); + super(new SessionWindowedSerializer<>(), new SessionWindowedDeserializer<>()); } public SessionWindowedSerde(final Serde<T> inner) { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java index a0724ebc6d6..e8707513a07 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.kstream.ValueMapperWithKey; @@ -31,32 +32,48 @@ import java.util.Objects; import java.util.Set; -public abstract class AbstractStream<K> { +/* + * Any classes (KTable, KStream, etc) extending this class should follow the serde specification precedence ordering as: + * + * 1) Overridden values via control objects (e.g. Materialized, Serialized, Consumed, etc) + * 2) Serdes that can be inferred from the operator itself (e.g. groupBy().count(), where value serde can default to `LongSerde`). + * 3) Serde inherited from parent operator if possible (note if the key / value types have been changed, then the corresponding serde cannot be inherited). + * 4) Default serde specified in the config. + */ +public abstract class AbstractStream<K, V> { - protected final InternalStreamsBuilder builder; protected final String name; + protected final Serde<K> keySerde; + protected final Serde<V> valSerde; protected final Set<String> sourceNodes; protected final StreamsGraphNode streamsGraphNode; + protected final InternalStreamsBuilder builder; // This copy-constructor will allow to extend KStream // and KTable APIs with new methods without impacting the public interface. - public AbstractStream(final AbstractStream<K> stream) { - this.builder = stream.builder; + public AbstractStream(final AbstractStream<K, V> stream) { this.name = stream.name; + this.builder = stream.builder; + this.keySerde = stream.keySerde; + this.valSerde = stream.valSerde; this.sourceNodes = stream.sourceNodes; this.streamsGraphNode = stream.streamsGraphNode; } - AbstractStream(final InternalStreamsBuilder builder, - final String name, + AbstractStream(final String name, + final Serde<K> keySerde, + final Serde<V> valSerde, final Set<String> sourceNodes, - final StreamsGraphNode streamsGraphNode) { + final StreamsGraphNode streamsGraphNode, + final InternalStreamsBuilder builder) { if (sourceNodes == null || sourceNodes.isEmpty()) { throw new IllegalArgumentException("parameter <sourceNodes> must not be null or empty"); } - this.builder = builder; this.name = name; + this.builder = builder; + this.keySerde = keySerde; + this.valSerde = valSerde; this.sourceNodes = sourceNodes; this.streamsGraphNode = streamsGraphNode; } @@ -67,7 +84,7 @@ protected InternalTopologyBuilder internalTopologyBuilder() { return builder.internalTopologyBuilder; } - Set<String> ensureJoinableWith(final AbstractStream<K> other) { + Set<String> ensureJoinableWith(final AbstractStream<K, ?> other) { final Set<String> allSourceNodes = new HashSet<>(); allSourceNodes.addAll(sourceNodes); allSourceNodes.addAll(other.sourceNodes); @@ -122,4 +139,13 @@ public void close() { } }; } + + // for testing only + public Serde<K> keySerde() { + return keySerde; + } + + public Serde<V> valueSerde() { + return valSerde; + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java index a191c5ad19c..9791db6495d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java @@ -63,11 +63,12 @@ this.streamsGraphNode = streamsGraphNode; } - - <KR, T> KTable<KR, T> build(final KStreamAggProcessorSupplier<K, KR, V, T> aggregateSupplier, - final String functionName, + <KR, T> KTable<KR, T> build(final String functionName, final StoreBuilder<? extends StateStore> storeBuilder, - final boolean isQueryable) { + final KStreamAggProcessorSupplier<K, KR, V, T> aggregateSupplier, + final boolean isQueryable, + final Serde<KR> keySerde, + final Serde<T> valSerde) { final String aggFunctionName = builder.newProcessorName(functionName); @@ -95,13 +96,15 @@ builder.addGraphNode(parentNode, statefulProcessorNode); - return new KTableImpl<>(builder, - aggFunctionName, - aggregateSupplier, + return new KTableImpl<>(aggFunctionName, + keySerde, + valSerde, sourceName.equals(this.name) ? sourceNodes : Collections.singleton(sourceName), storeBuilder.name(), isQueryable, - statefulProcessorNode); + aggregateSupplier, + statefulProcessorNode, + builder); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index 49f49d0ee32..8f767408fb4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -79,30 +79,33 @@ public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuil public <K, V> KStream<K, V> stream(final Collection<String> topics, final ConsumedInternal<K, V> consumed) { final String name = newProcessorName(KStreamImpl.SOURCE_NAME); + final StreamSourceNode<K, V> streamSourceNode = new StreamSourceNode<>(name, topics, consumed); - final StreamSourceNode<K, V> streamSourceNode = new StreamSourceNode<>(name, - topics, - consumed); addGraphNode(root, streamSourceNode); - return new KStreamImpl<>(this, name, Collections.singleton(name), false, streamSourceNode); + return new KStreamImpl<>(name, + consumed.keySerde(), + consumed.valueSerde(), + Collections.singleton(name), + false, + streamSourceNode, + this); } public <K, V> KStream<K, V> stream(final Pattern topicPattern, final ConsumedInternal<K, V> consumed) { final String name = newProcessorName(KStreamImpl.SOURCE_NAME); - - final StreamSourceNode<K, V> streamPatternSourceNode = new StreamSourceNode<>(name, - topicPattern, - consumed); + final StreamSourceNode<K, V> streamPatternSourceNode = new StreamSourceNode<>(name, topicPattern, consumed); addGraphNode(root, streamPatternSourceNode); - return new KStreamImpl<>(this, - name, + return new KStreamImpl<>(name, + consumed.keySerde(), + consumed.valueSerde(), Collections.singleton(name), false, - streamPatternSourceNode); + streamPatternSourceNode, + this); } @SuppressWarnings("unchecked") @@ -129,15 +132,15 @@ public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuil addGraphNode(root, tableSourceNode); - return new KTableImpl<>(this, - name, - processorSupplier, + return new KTableImpl<>(name, consumed.keySerde(), consumed.valueSerde(), Collections.singleton(source), storeBuilder.name(), materialized.isQueryable(), - tableSourceNode); + processorSupplier, + tableSourceNode, + this); } @SuppressWarnings("unchecked") diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java index 5d4f9f3dd3e..da2eeb6ab8f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java @@ -36,24 +36,22 @@ import java.util.Objects; import java.util.Set; -class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStream<K, V> { +class KGroupedStreamImpl<K, V> extends AbstractStream<K, V> implements KGroupedStream<K, V> { static final String REDUCE_NAME = "KSTREAM-REDUCE-"; static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-"; - private final Serde<K> keySerde; - private final Serde<V> valSerde; private final boolean repartitionRequired; private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder; - KGroupedStreamImpl(final InternalStreamsBuilder builder, - final String name, - final Set<String> sourceNodes, + KGroupedStreamImpl(final String name, final Serde<K> keySerde, final Serde<V> valSerde, + final Set<String> sourceNodes, final boolean repartitionRequired, - final StreamsGraphNode streamsGraphNode) { - super(builder, name, sourceNodes, streamsGraphNode); + final StreamsGraphNode streamsGraphNode, + final InternalStreamsBuilder builder) { + super(name, keySerde, valSerde, sourceNodes, streamsGraphNode, builder); this.aggregateBuilder = new GroupedStreamAggregateBuilder<>( builder, keySerde, @@ -63,8 +61,6 @@ name, streamsGraphNode ); - this.keySerde = keySerde; - this.valSerde = valSerde; this.repartitionRequired = repartitionRequired; } @@ -189,14 +185,15 @@ ); } - private <KR, T> KTable<KR, T> doAggregate(final KStreamAggProcessorSupplier<K, KR, V, T> aggregateSupplier, - final String functionName, - final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materializedInternal) { + private <T> KTable<K, T> doAggregate(final KStreamAggProcessorSupplier<K, K, V, T> aggregateSupplier, + final String functionName, + final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materializedInternal) { return aggregateBuilder.build( - aggregateSupplier, functionName, new KeyValueStoreMaterializer<>(materializedInternal).materialize(), - materializedInternal.isQueryable() - ); + aggregateSupplier, + materializedInternal.isQueryable(), + materializedInternal.keySerde(), + materializedInternal.valueSerde()); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java index 08fb605ef99..6ec3c0d1481 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java @@ -34,6 +34,7 @@ import java.util.Collections; import java.util.Objects; +import java.util.Set; /** * The implementation class of {@link KGroupedTable}. @@ -41,14 +42,12 @@ * @param <K> the key type * @param <V> the value type */ -public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroupedTable<K, V> { +public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGroupedTable<K, V> { private static final String AGGREGATE_NAME = "KTABLE-AGGREGATE-"; private static final String REDUCE_NAME = "KTABLE-REDUCE-"; - protected final Serde<K> keySerde; - protected final Serde<V> valSerde; private final Initializer<Long> countInitializer = () -> 0L; private final Aggregator<K, V, Long> countAdder = (aggKey, value, aggregate) -> aggregate + 1L; @@ -57,16 +56,13 @@ KGroupedTableImpl(final InternalStreamsBuilder builder, final String name, - final String sourceName, + final Set<String> sourceNodes, final Serde<K> keySerde, final Serde<V> valSerde, final StreamsGraphNode streamsGraphNode) { - super(builder, name, Collections.singleton(sourceName), streamsGraphNode); - this.keySerde = keySerde; - this.valSerde = valSerde; + super(name, keySerde, valSerde, sourceNodes, streamsGraphNode, builder); } - @SuppressWarnings("unchecked") private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier, final String functionName, final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized) { @@ -75,9 +71,7 @@ final String funcName = builder.newProcessorName(functionName); final String topic = materialized.storeName() + KStreamImpl.REPARTITION_TOPIC_SUFFIX; - final StreamsGraphNode repartitionNode = createRepartitionNode(sinkName, - sourceName, - topic); + final StreamsGraphNode repartitionNode = createRepartitionNode(sinkName, sourceName, topic); // the passed in StreamsGraphNode must be the parent of the repartition node builder.addGraphNode(this.streamsGraphNode, repartitionNode); @@ -90,34 +84,34 @@ builder.addGraphNode(repartitionNode, statefulProcessorNode); // return the KTable representation with the intermediate topic as the sources - return new KTableImpl<>(builder, - funcName, - aggregateSupplier, + return new KTableImpl<>(funcName, + materialized.keySerde(), + materialized.valueSerde(), Collections.singleton(sourceName), materialized.storeName(), materialized.isQueryable(), - statefulProcessorNode); + aggregateSupplier, + statefulProcessorNode, + builder); } - @SuppressWarnings("unchecked") private <T> StatefulProcessorNode getStatefulProcessorNode(final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized, final String functionName, - final ProcessorSupplier aggregateSupplier) { + final ProcessorSupplier<K, Change<V>> aggregateSupplier) { - final ProcessorParameters aggregateFunctionProcessorParams = new ProcessorParameters<>(aggregateSupplier, functionName); + final ProcessorParameters<K, Change<V>> aggregateFunctionProcessorParams = new ProcessorParameters<>(aggregateSupplier, functionName); return StatefulProcessorNode.statefulProcessorNodeBuilder() .withNodeName(functionName) .withProcessorParameters(aggregateFunctionProcessorParams) - .withStoreBuilder(new KeyValueStoreMaterializer(materialized).materialize()).build(); + .withStoreBuilder(new KeyValueStoreMaterializer<>(materialized).materialize()).build(); } - @SuppressWarnings("unchecked") - private GroupedTableOperationRepartitionNode createRepartitionNode(final String sinkName, - final String sourceName, - final String topic) { + private GroupedTableOperationRepartitionNode<K, V> createRepartitionNode(final String sinkName, + final String sourceName, + final String topic) { - return GroupedTableOperationRepartitionNode.groupedTableOperationNodeBuilder() + return GroupedTableOperationRepartitionNode.<K, V>groupedTableOperationNodeBuilder() .withRepartitionTopic(topic) .withSinkName(sinkName) .withSourceName(sourceName) @@ -151,7 +145,7 @@ private GroupedTableOperationRepartitionNode createRepartitionNode(final String @Override public KTable<K, V> reduce(final Reducer<V> adder, final Reducer<V> subtractor) { - return reduce(adder, subtractor, Materialized.<K, V, KeyValueStore<Bytes, byte[]>>with(keySerde, valSerde)); + return reduce(adder, subtractor, Materialized.with(keySerde, valSerde)); } @Override @@ -176,7 +170,7 @@ private GroupedTableOperationRepartitionNode createRepartitionNode(final String @Override public KTable<K, Long> count() { - return count(Materialized.<K, Long, KeyValueStore<Bytes, byte[]>>with(keySerde, Serdes.Long())); + return count(Materialized.with(keySerde, Serdes.Long())); } @Override @@ -206,7 +200,7 @@ private GroupedTableOperationRepartitionNode createRepartitionNode(final String public <T> KTable<K, T> aggregate(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> adder, final Aggregator<? super K, ? super V, T> subtractor) { - return aggregate(initializer, adder, subtractor, Materialized.<K, T, KeyValueStore<Bytes, byte[]>>with(keySerde, null)); + return aggregate(initializer, adder, subtractor, Materialized.with(keySerde, null)); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 42c20a52f37..2a3bc8f9230 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -59,7 +59,7 @@ import java.util.Objects; import java.util.Set; -public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V> { +public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K, V> { static final String SOURCE_NAME = "KSTREAM-SOURCE-"; @@ -113,12 +113,14 @@ private final boolean repartitionRequired; - KStreamImpl(final InternalStreamsBuilder builder, - final String name, + KStreamImpl(final String name, + final Serde<K> keySerde, + final Serde<V> valueSerde, final Set<String> sourceNodes, final boolean repartitionRequired, - final StreamsGraphNode streamsGraphNode) { - super(builder, name, sourceNodes, streamsGraphNode); + final StreamsGraphNode streamsGraphNode, + final InternalStreamsBuilder builder) { + super(name, keySerde, valueSerde, sourceNodes, streamsGraphNode, builder); this.repartitionRequired = repartitionRequired; } @@ -129,12 +131,16 @@ final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamFilter<>(predicate, false), name); - final ProcessorGraphNode<? super K, ? super V> filterProcessorNode = new ProcessorGraphNode<>(name, - processorParameters, - repartitionRequired); + final ProcessorGraphNode<? super K, ? super V> filterProcessorNode = new ProcessorGraphNode<>(name, processorParameters, repartitionRequired); builder.addGraphNode(this.streamsGraphNode, filterProcessorNode); - return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired, filterProcessorNode); + return new KStreamImpl<>(name, + keySerde, + valSerde, + sourceNodes, + repartitionRequired, + filterProcessorNode, + builder); } @Override @@ -143,46 +149,41 @@ final String name = builder.newProcessorName(FILTER_NAME); final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamFilter<>(predicate, true), name); - - - final ProcessorGraphNode<? super K, ? super V> filterNotProcessorNode = new ProcessorGraphNode<>(name, - processorParameters, - repartitionRequired); + final ProcessorGraphNode<? super K, ? super V> filterNotProcessorNode = new ProcessorGraphNode<>(name, processorParameters, repartitionRequired); builder.addGraphNode(this.streamsGraphNode, filterNotProcessorNode); - return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired, filterNotProcessorNode); + return new KStreamImpl<>(name, + keySerde, + valSerde, + sourceNodes, + repartitionRequired, + filterNotProcessorNode, + builder); } @Override public <K1> KStream<K1, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends K1> mapper) { Objects.requireNonNull(mapper, "mapper can't be null"); - final ProcessorGraphNode<K, V> selectKeyProcessorNode = internalSelectKey(mapper); selectKeyProcessorNode.keyChangingOperation(true); builder.addGraphNode(this.streamsGraphNode, selectKeyProcessorNode); - return new KStreamImpl<>(builder, selectKeyProcessorNode.nodeName(), sourceNodes, true, selectKeyProcessorNode); + + // key serde cannot be preserved + return new KStreamImpl<>(selectKeyProcessorNode.nodeName(), null, valSerde, sourceNodes, true, selectKeyProcessorNode, builder); } private <K1> ProcessorGraphNode<K, V> internalSelectKey(final KeyValueMapper<? super K, ? super V, ? extends K1> mapper) { final String name = builder.newProcessorName(KEY_SELECT_NAME); - - final KStreamMap<K, V, K1, V> kStreamMap = new KStreamMap<>( - (KeyValueMapper<K, V, KeyValue<K1, V>>) (key, value) -> new KeyValue<>(mapper.apply(key, value), value)); - + final KStreamMap<K, V, K1, V> kStreamMap = new KStreamMap<>((key, value) -> new KeyValue<>(mapper.apply(key, value), value)); final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(kStreamMap, name); - return new ProcessorGraphNode<>( - name, - processorParameters, - repartitionRequired - ); - + return new ProcessorGraphNode<>(name, processorParameters, repartitionRequired); } @Override @@ -192,14 +193,19 @@ final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamMap<>(mapper), name); - final ProcessorGraphNode<? super K, ? super V> mapProcessorNode = new ProcessorGraphNode<>(name, - processorParameters, - true); + final ProcessorGraphNode<? super K, ? super V> mapProcessorNode = new ProcessorGraphNode<>(name, processorParameters, true); mapProcessorNode.keyChangingOperation(true); builder.addGraphNode(this.streamsGraphNode, mapProcessorNode); - return new KStreamImpl<>(builder, name, sourceNodes, true, mapProcessorNode); + // key and value serde cannot be preserved + return new KStreamImpl<>(name, + null, + null, + sourceNodes, + true, + mapProcessorNode, + builder); } @@ -214,15 +220,19 @@ final String name = builder.newProcessorName(MAPVALUES_NAME); final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamMapValues<>(mapper), name); + final ProcessorGraphNode<? super K, ? super V> mapValuesProcessorNode = new ProcessorGraphNode<>(name, processorParameters, repartitionRequired); - - final ProcessorGraphNode<? super K, ? super V> mapValuesProcessorNode = new ProcessorGraphNode<>(name, - processorParameters, - repartitionRequired); mapValuesProcessorNode.setValueChangingOperation(true); builder.addGraphNode(this.streamsGraphNode, mapValuesProcessorNode); - return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired, mapValuesProcessorNode); + // value serde cannot be preserved + return new KStreamImpl<>(name, + keySerde, + null, + sourceNodes, + repartitionRequired, + mapValuesProcessorNode, + builder); } @Override @@ -232,31 +242,30 @@ public void print(final Printed<K, V> printed) { final String name = builder.newProcessorName(PRINTING_NAME); final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(printedInternal.build(this.name), name); + final ProcessorGraphNode<? super K, ? super V> printNode = new ProcessorGraphNode<>(name, processorParameters, false); - - final ProcessorGraphNode<? super K, ? super V> printNode = new ProcessorGraphNode<>(name, - processorParameters, - false); builder.addGraphNode(this.streamsGraphNode, printNode); } @Override - public <K1, V1> KStream<K1, V1> flatMap( - final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper) { + public <K1, V1> KStream<K1, V1> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper) { Objects.requireNonNull(mapper, "mapper can't be null"); final String name = builder.newProcessorName(FLATMAP_NAME); final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamFlatMap<>(mapper), name); - - - final ProcessorGraphNode<? super K, ? super V> flatMapNode = new ProcessorGraphNode<>(name, - processorParameters, - true); + final ProcessorGraphNode<? super K, ? super V> flatMapNode = new ProcessorGraphNode<>(name, processorParameters, true); flatMapNode.keyChangingOperation(true); builder.addGraphNode(this.streamsGraphNode, flatMapNode); - return new KStreamImpl<>(builder, name, sourceNodes, true, flatMapNode); + // key and value serde cannot be preserved + return new KStreamImpl<>(name, + null, + null, + sourceNodes, + true, + flatMapNode, + builder); } @Override @@ -270,15 +279,13 @@ public void print(final Printed<K, V> printed) { final String name = builder.newProcessorName(FLATMAPVALUES_NAME); final ProcessorParameters<? super K, ? super V> processorParameters = new ProcessorParameters<>(new KStreamFlatMapValues<>(mapper), name); + final ProcessorGraphNode<? super K, ? super V> flatMapValuesNode = new ProcessorGraphNode<>(name, processorParameters, repartitionRequired); - - final ProcessorGraphNode<? super K, ? super V> flatMapValuesNode = new ProcessorGraphNode<>(name, - processorParameters, - repartitionRequired); flatMapValuesNode.setValueChangingOperation(true); builder.addGraphNode(this.streamsGraphNode, flatMapValuesNode); - return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired, flatMapValuesNode); + // value serde cannot be preserved + return new KStreamImpl<>(name, keySerde, null, sourceNodes, this.repartitionRequired, flatMapValuesNode, builder); } @Override @@ -298,25 +305,18 @@ public void print(final Printed<K, V> printed) { childNames[i] = builder.newProcessorName(BRANCHCHILD_NAME); } - final ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamBranch(predicates.clone(), childNames), branchName); - - final ProcessorGraphNode<K, V> branchNode = new ProcessorGraphNode<>(branchName, - processorParameters, - false); + final ProcessorGraphNode<K, V> branchNode = new ProcessorGraphNode<>(branchName, processorParameters, false); builder.addGraphNode(this.streamsGraphNode, branchNode); final KStream<K, V>[] branchChildren = (KStream<K, V>[]) Array.newInstance(KStream.class, predicates.length); for (int i = 0; i < predicates.length; i++) { final ProcessorParameters innerProcessorParameters = new ProcessorParameters<>(new KStreamPassThrough<K, V>(), childNames[i]); - - final ProcessorGraphNode<K, V> branchChildNode = new ProcessorGraphNode<>(childNames[i], - innerProcessorParameters, - repartitionRequired); + final ProcessorGraphNode<K, V> branchChildNode = new ProcessorGraphNode<>(childNames[i], innerProcessorParameters, repartitionRequired); builder.addGraphNode(branchNode, branchChildNode); - branchChildren[i] = new KStreamImpl<>(builder, childNames[i], sourceNodes, this.repartitionRequired, branchChildNode); + branchChildren[i] = new KStreamImpl<>(childNames[i], keySerde, valSerde, sourceNodes, repartitionRequired, branchChildNode, builder); } return branchChildren; @@ -347,22 +347,9 @@ public void print(final Printed<K, V> printed) { mergeNode.setMergeNode(true); builder.addGraphNode(Arrays.asList(this.streamsGraphNode, streamImpl.streamsGraphNode), mergeNode); - return new KStreamImpl<>(builder, name, allSourceNodes, requireRepartitioning, mergeNode); - } - @Override - public KStream<K, V> through(final String topic, final Produced<K, V> produced) { - final ProducedInternal<K, V> producedInternal = new ProducedInternal<>(produced); - to(topic, producedInternal); - return builder.stream( - Collections.singleton(topic), - new ConsumedInternal<>( - producedInternal.keySerde(), - producedInternal.valueSerde(), - new FailOnInvalidTimestamp(), - null - ) - ); + // drop the serde as we cannot safely use either one to represent both streams + return new KStreamImpl<>(name, null, null, allSourceNodes, requireRepartitioning, mergeNode, builder); } @Override @@ -375,7 +362,6 @@ public void foreach(final ForeachAction<? super K, ? super V> action) { name ); - final ProcessorGraphNode<? super K, ? super V> foreachNode = new ProcessorGraphNode<>(name, processorParameters, repartitionRequired); @@ -394,12 +380,11 @@ public void foreach(final ForeachAction<? super K, ? super V> action) { final ProcessorGraphNode<? super K, ? super V> peekNode = new ProcessorGraphNode<>(name, processorParameters, - repartitionRequired - ); + repartitionRequired); builder.addGraphNode(this.streamsGraphNode, peekNode); - return new KStreamImpl<>(builder, name, sourceNodes, repartitionRequired, peekNode); + return new KStreamImpl<>(name, keySerde, valSerde, sourceNodes, repartitionRequired, peekNode, builder); } @Override @@ -407,6 +392,21 @@ public void foreach(final ForeachAction<? super K, ? super V> action) { return through(topic, Produced.with(null, null, null)); } + @Override + public KStream<K, V> through(final String topic, final Produced<K, V> produced) { + final ProducedInternal<K, V> producedInternal = new ProducedInternal<>(produced); + to(topic, producedInternal); + return builder.stream( + Collections.singleton(topic), + new ConsumedInternal<>( + producedInternal.keySerde() != null ? producedInternal.keySerde() : keySerde, + producedInternal.valueSerde() != null ? producedInternal.valueSerde() : valSerde, + new FailOnInvalidTimestamp(), + null + ) + ); + } + @Override public void to(final String topic) { to(topic, Produced.with(null, null, null)); @@ -431,7 +431,6 @@ public void to(final TopicNameExtractor<K, V> topicExtractor, final Produced<K, to(topicExtractor, new ProducedInternal<>(produced)); } - @SuppressWarnings("unchecked") private void to(final TopicNameExtractor<K, V> topicExtractor, final ProducedInternal<K, V> produced) { final String name = builder.newProcessorName(SINK_NAME); @@ -461,8 +460,8 @@ private void to(final TopicNameExtractor<K, V> topicExtractor, final ProducedInt transformNode.keyChangingOperation(true); builder.addGraphNode(this.streamsGraphNode, transformNode); - - return new KStreamImpl<>(builder, name, sourceNodes, true, transformNode); + // cannot inherit key and value serde + return new KStreamImpl<>(name, null, null, sourceNodes, true, transformNode, builder); } @Override @@ -496,7 +495,8 @@ private void to(final TopicNameExtractor<K, V> topicExtractor, final ProducedInt transformNode.setValueChangingOperation(true); builder.addGraphNode(this.streamsGraphNode, transformNode); - return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired, transformNode); + // cannot inherit value serde + return new KStreamImpl<>(name, keySerde, null, sourceNodes, this.repartitionRequired, transformNode, builder); } @Override @@ -565,11 +565,11 @@ public void process(final ProcessorSupplier<? super K, ? super V> processorSuppl KStreamImpl<K, V1> joinOther = (KStreamImpl<K, V1>) other; if (joinThis.repartitionRequired) { - joinThis = joinThis.repartitionForJoin(joined.keySerde(), joined.valueSerde()); + joinThis = joinThis.repartitionForJoin(joined); } if (joinOther.repartitionRequired) { - joinOther = joinOther.repartitionForJoin(joined.keySerde(), joined.otherValueSerde()); + joinOther = joinOther.repartitionForJoin(Joined.with(joined.keySerde(), joined.otherValueSerde(), joined.valueSerde())); } joinThis.ensureJoinableWith(joinOther); @@ -587,17 +587,16 @@ public void process(final ProcessorSupplier<? super K, ? super V> processorSuppl * Repartition a stream. This is required on join operations occurring after * an operation that changes the key, i.e, selectKey, map(..), flatMap(..). * - * @param keySerde Serdes for serializing the keys - * @param valSerde Serdes for serializing the values + * @param joined joined control object * @return a new {@link KStreamImpl} */ - private KStreamImpl<K, V> repartitionForJoin(final Serde<K> keySerde, - final Serde<V> valSerde) { - + private KStreamImpl<K, V> repartitionForJoin(final Joined<K, V, ?> joined) { + final Serde<K> repartitionKeySerde = joined.keySerde() != null ? joined.keySerde() : keySerde; + final Serde<V> repartitionValueSerde = joined.valueSerde() != null ? joined.valueSerde() : valSerde; final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder<K, V> optimizableRepartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder(); final String repartitionedSourceName = createRepartitionedSource(builder, - keySerde, - valSerde, + repartitionKeySerde, + repartitionValueSerde, null, name, optimizableRepartitionNodeBuilder); @@ -605,7 +604,7 @@ public void process(final ProcessorSupplier<? super K, ? super V> processorSuppl final OptimizableRepartitionNode<K, V> optimizableRepartitionNode = optimizableRepartitionNodeBuilder.build(); builder.addGraphNode(this.streamsGraphNode, optimizableRepartitionNode); - return new KStreamImpl<>(builder, repartitionedSourceName, Collections.singleton(repartitionedSourceName), false, optimizableRepartitionNode); + return new KStreamImpl<>(repartitionedSourceName, repartitionKeySerde, repartitionValueSerde, Collections.singleton(repartitionedSourceName), false, optimizableRepartitionNode, builder); } static <K1, V1> String createRepartitionedSource(final InternalStreamsBuilder builder, @@ -678,18 +677,31 @@ public void process(final ProcessorSupplier<? super K, ? super V> processorSuppl Objects.requireNonNull(joiner, "joiner can't be null"); Objects.requireNonNull(joined, "joined can't be null"); if (repartitionRequired) { - final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(joined.keySerde(), joined.valueSerde()); - return thisStreamRepartitioned.doStreamTableJoin(other, joiner, false); + final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(joined); + return thisStreamRepartitioned.doStreamTableJoin(other, joiner, joined, false); } else { - return doStreamTableJoin(other, joiner, false); + return doStreamTableJoin(other, joiner, joined, false); } } @Override - public <K1, V1, R> KStream<K, R> leftJoin(final GlobalKTable<K1, V1> globalTable, - final KeyValueMapper<? super K, ? super V, ? extends K1> keyMapper, - final ValueJoiner<? super V, ? super V1, ? extends R> joiner) { - return globalTableJoin(globalTable, keyMapper, joiner, true); + public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner) { + return leftJoin(other, joiner, Joined.with(null, null, null)); + } + + @Override + public <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> other, + final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, + final Joined<K, V, VT> joined) { + Objects.requireNonNull(other, "other can't be null"); + Objects.requireNonNull(joiner, "joiner can't be null"); + Objects.requireNonNull(joined, "joined can't be null"); + if (repartitionRequired) { + final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(joined); + return thisStreamRepartitioned.doStreamTableJoin(other, joiner, joined, true); + } else { + return doStreamTableJoin(other, joiner, joined, true); + } } @Override @@ -699,6 +711,13 @@ public void process(final ProcessorSupplier<? super K, ? super V> processorSuppl return globalTableJoin(globalTable, keyMapper, joiner, false); } + @Override + public <K1, V1, R> KStream<K, R> leftJoin(final GlobalKTable<K1, V1> globalTable, + final KeyValueMapper<? super K, ? super V, ? extends K1> keyMapper, + final ValueJoiner<? super V, ? super V1, ? extends R> joiner) { + return globalTableJoin(globalTable, keyMapper, joiner, true); + } + private <K1, V1, V2> KStream<K, V2> globalTableJoin(final GlobalKTable<K1, V1> globalTable, final KeyValueMapper<? super K, ? super V, ? extends K1> keyMapper, final ValueJoiner<? super V, ? super V1, ? extends V2> joiner, @@ -724,17 +743,19 @@ public void process(final ProcessorSupplier<? super K, ? super V> processorSuppl null); builder.addGraphNode(this.streamsGraphNode, streamTableJoinNode); - return new KStreamImpl<>(builder, name, sourceNodes, false, streamTableJoinNode); + // do not have serde for joined result + return new KStreamImpl<>(name, keySerde, null, sourceNodes, false, streamTableJoinNode, builder); } @SuppressWarnings("unchecked") private <V1, R> KStream<K, R> doStreamTableJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner, + final Joined<K, V, V1> joined, final boolean leftJoin) { Objects.requireNonNull(other, "other KTable can't be null"); Objects.requireNonNull(joiner, "joiner can't be null"); - final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other); + final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K, V1>) other); final String name = builder.newProcessorName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME); final ProcessorSupplier<K, V> processorSupplier = new KStreamKTableJoin<>( @@ -743,7 +764,6 @@ public void process(final ProcessorSupplier<? super K, ? super V> processorSuppl leftJoin ); - final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(processorSupplier, name); final StreamTableJoinNode<K, V> streamTableJoinNode = new StreamTableJoinNode<>( name, @@ -754,27 +774,8 @@ public void process(final ProcessorSupplier<? super K, ? super V> processorSuppl builder.addGraphNode(this.streamsGraphNode, streamTableJoinNode); - return new KStreamImpl<>(builder, name, allSourceNodes, false, streamTableJoinNode); - } - - @Override - public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner) { - return leftJoin(other, joiner, Joined.with(null, null, null)); - } - - @Override - public <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> other, - final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, - final Joined<K, V, VT> joined) { - Objects.requireNonNull(other, "other can't be null"); - Objects.requireNonNull(joiner, "joiner can't be null"); - Objects.requireNonNull(joined, "joined can't be null"); - if (repartitionRequired) { - final KStreamImpl<K, V> thisStreamRepartitioned = this.repartitionForJoin(joined.keySerde(), joined.valueSerde()); - return thisStreamRepartitioned.doStreamTableJoin(other, joiner, true); - } else { - return doStreamTableJoin(other, joiner, true); - } + // do not have serde for joined result + return new KStreamImpl<>(name, joined.keySerde() != null ? joined.keySerde() : keySerde, null, allSourceNodes, false, streamTableJoinNode, builder); } @Override @@ -792,16 +793,13 @@ public void process(final ProcessorSupplier<? super K, ? super V> processorSuppl selectKeyMapNode.keyChangingOperation(true); builder.addGraphNode(this.streamsGraphNode, selectKeyMapNode); - return new KGroupedStreamImpl<>( - builder, - selectKeyMapNode.nodeName(), - sourceNodes, - serializedInternal.keySerde(), - serializedInternal.valueSerde(), - true, - selectKeyMapNode - ); - + return new KGroupedStreamImpl<>(selectKeyMapNode.nodeName(), + serializedInternal.keySerde(), + serializedInternal.valueSerde() != null ? serializedInternal.valueSerde() : valSerde, + sourceNodes, + true, + selectKeyMapNode, + builder); } @Override @@ -812,14 +810,13 @@ public void process(final ProcessorSupplier<? super K, ? super V> processorSuppl @Override public KGroupedStream<K, V> groupByKey(final Serialized<K, V> serialized) { final SerializedInternal<K, V> serializedInternal = new SerializedInternal<>(serialized); - return new KGroupedStreamImpl<>(builder, - this.name, + return new KGroupedStreamImpl<>(this.name, + serializedInternal.keySerde() != null ? serializedInternal.keySerde() : keySerde, + serializedInternal.valueSerde() != null ? serializedInternal.valueSerde() : valSerde, sourceNodes, - serializedInternal.keySerde(), - serializedInternal.valueSerde(), this.repartitionRequired, - streamsGraphNode); - + streamsGraphNode, + builder); } @SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode @@ -851,7 +848,6 @@ public void process(final ProcessorSupplier<? super K, ? super V> processorSuppl this.rightOuter = rightOuter; } - @SuppressWarnings("unchecked") public <K1, R, V1, V2> KStream<K1, R> join(final KStream<K1, V1> lhs, final KStream<K1, V2> other, final ValueJoiner<? super V1, ? super V2, ? extends R> joiner, @@ -874,13 +870,13 @@ public void process(final ProcessorSupplier<? super K, ? super V> processorSuppl final KStreamJoinWindow<K1, V1> thisWindowedStream = new KStreamJoinWindow<>(thisWindowStore.name()); - final ProcessorParameters thisWindowStreamProcessorParams = new ProcessorParameters(thisWindowedStream, thisWindowStreamName); + final ProcessorParameters<K1, V1> thisWindowStreamProcessorParams = new ProcessorParameters<>(thisWindowedStream, thisWindowStreamName); final ProcessorGraphNode<K1, V1> thisWindowedStreamsNode = new ProcessorGraphNode<>(thisWindowStreamName, thisWindowStreamProcessorParams); builder.addGraphNode(thisStreamsGraphNode, thisWindowedStreamsNode); final KStreamJoinWindow<K1, V2> otherWindowedStream = new KStreamJoinWindow<>(otherWindowStore.name()); - final ProcessorParameters otherWindowStreamProcessorParams = new ProcessorParameters(otherWindowedStream, otherWindowStreamName); + final ProcessorParameters<K1, V2> otherWindowStreamProcessorParams = new ProcessorParameters<>(otherWindowedStream, otherWindowStreamName); final ProcessorGraphNode<K1, V2> otherWindowedStreamsNode = new ProcessorGraphNode<>(otherWindowStreamName, otherWindowStreamProcessorParams); builder.addGraphNode(otherStreamsGraphNode, otherWindowedStreamsNode); @@ -902,11 +898,11 @@ public void process(final ProcessorSupplier<? super K, ? super V> processorSuppl final KStreamPassThrough<K1, R> joinMerge = new KStreamPassThrough<>(); - final StreamStreamJoinNode.StreamStreamJoinNodeBuilder<K, V1, V2, R> joinBuilder = StreamStreamJoinNode.streamStreamJoinNodeBuilder(); + final StreamStreamJoinNode.StreamStreamJoinNodeBuilder<K1, V1, V2, R> joinBuilder = StreamStreamJoinNode.streamStreamJoinNodeBuilder(); - final ProcessorParameters joinThisProcessorParams = new ProcessorParameters(joinThis, joinThisName); - final ProcessorParameters joinOtherProcessorParams = new ProcessorParameters(joinOther, joinOtherName); - final ProcessorParameters joinMergeProcessorParams = new ProcessorParameters(joinMerge, joinMergeName); + final ProcessorParameters<K1, V1> joinThisProcessorParams = new ProcessorParameters<>(joinThis, joinThisName); + final ProcessorParameters<K1, V2> joinOtherProcessorParams = new ProcessorParameters<>(joinOther, joinOtherName); + final ProcessorParameters<K1, R> joinMergeProcessorParams = new ProcessorParameters<>(joinMerge, joinMergeName); joinBuilder.withJoinMergeProcessorParameters(joinMergeProcessorParams) .withJoinThisProcessorParameters(joinThisProcessorParams) @@ -922,9 +918,12 @@ public void process(final ProcessorSupplier<? super K, ? super V> processorSuppl builder.addGraphNode(Arrays.asList(thisStreamsGraphNode, otherStreamsGraphNode), joinGraphNode); - final Set<String> allSourceNodes = new HashSet<>(((AbstractStream<K>) lhs).sourceNodes); + final Set<String> allSourceNodes = new HashSet<>(((KStreamImpl<K1, V1>) lhs).sourceNodes); allSourceNodes.addAll(((KStreamImpl<K1, V2>) other).sourceNodes); - return new KStreamImpl<>(builder, joinMergeName, allSourceNodes, false, joinGraphNode); + + // do not have serde for joined result; + // also for key serde we do not inherit from either since we cannot tell if these two serdes are different + return new KStreamImpl<>(joinMergeName, joined.keySerde(), null, allSourceNodes, false, joinGraphNode, builder); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index ea5c3049e7f..c5b29702c7c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -57,7 +57,7 @@ * @param <S> the source's (parent's) value type * @param <V> the value type */ -public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, V> { +public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<K, V> { static final String SOURCE_NAME = "KTABLE-SOURCE-"; @@ -87,38 +87,19 @@ private final boolean isQueryable; private boolean sendOldValues = false; - private final Serde<K> keySerde; - private final Serde<V> valSerde; - public KTableImpl(final InternalStreamsBuilder builder, - final String name, - final ProcessorSupplier<?, ?> processorSupplier, - final Set<String> sourceNodes, - final String queryableStoreName, - final boolean isQueryable, - final StreamsGraphNode streamsGraphNode) { - super(builder, name, sourceNodes, streamsGraphNode); - this.processorSupplier = processorSupplier; - this.queryableStoreName = queryableStoreName; - this.keySerde = null; - this.valSerde = null; - this.isQueryable = isQueryable; - } - - public KTableImpl(final InternalStreamsBuilder builder, - final String name, - final ProcessorSupplier<?, ?> processorSupplier, + public KTableImpl(final String name, final Serde<K> keySerde, final Serde<V> valSerde, final Set<String> sourceNodes, final String queryableStoreName, final boolean isQueryable, - final StreamsGraphNode streamsGraphNode) { - super(builder, name, sourceNodes, streamsGraphNode); + final ProcessorSupplier<?, ?> processorSupplier, + final StreamsGraphNode streamsGraphNode, + final InternalStreamsBuilder builder) { + super(name, keySerde, valSerde, sourceNodes, streamsGraphNode, builder); this.processorSupplier = processorSupplier; this.queryableStoreName = queryableStoreName; - this.keySerde = keySerde; - this.valSerde = valSerde; this.isQueryable = isQueryable; } @@ -159,18 +140,18 @@ public String queryableStoreName() { builder.addGraphNode(this.streamsGraphNode, tableNode); - - return new KTableImpl<>( - builder, - name, - processorSupplier, - this.keySerde, - this.valSerde, - sourceNodes, - shouldMaterialize ? materializedInternal.storeName() : this.queryableStoreName, - shouldMaterialize, - tableNode - ); + // we can inherit parent key and value serde if user do not provide specific overrides, more specifically: + // we preserve the key following the order of 1) materialized, 2) parent + // we preserve the value following the order of 1) materialized, 2) parent + return new KTableImpl<>(name, + materializedInternal != null && materializedInternal.keySerde() != null ? materializedInternal.keySerde() : keySerde, + materializedInternal != null && materializedInternal.valueSerde() != null ? materializedInternal.valueSerde() : valSerde, + sourceNodes, + shouldMaterialize ? materializedInternal.storeName() : this.queryableStoreName, + shouldMaterialize, + processorSupplier, + tableNode, + builder); } @Override @@ -234,14 +215,19 @@ public String queryableStoreName() { builder.addGraphNode(this.streamsGraphNode, tableNode); + // don't inherit parent value serde, since this operation may change the value type, more specifically: + // we preserve the key following the order of 1) materialized, 2) parent, 3) null + // we preserve the value following the order of 1) materialized, 2) null return new KTableImpl<>( - builder, name, - processorSupplier, + materializedInternal != null && materializedInternal.keySerde() != null ? materializedInternal.keySerde() : keySerde, + materializedInternal != null ? materializedInternal.valueSerde() : null, sourceNodes, shouldMaterialize ? materializedInternal.storeName() : this.queryableStoreName, shouldMaterialize, - tableNode + processorSupplier, + tableNode, + builder ); } @@ -325,14 +311,19 @@ public String queryableStoreName() { builder.addGraphNode(this.streamsGraphNode, tableNode); + // don't inherit parent value serde, since this operation may change the value type, more specifically: + // we preserve the key following the order of 1) materialized, 2) parent, 3) null + // we preserve the value following the order of 1) materialized, 2) null return new KTableImpl<>( - builder, name, - processorSupplier, + materialized != null && materialized.keySerde() != null ? materialized.keySerde() : keySerde, + materialized != null ? materialized.valueSerde() : null, sourceNodes, shouldMaterialize ? materialized.storeName() : this.queryableStoreName, shouldMaterialize, - tableNode); + processorSupplier, + tableNode, + builder); } @Override @@ -352,7 +343,8 @@ public String queryableStoreName() { builder.addGraphNode(this.streamsGraphNode, toStreamNode); - return new KStreamImpl<>(builder, name, sourceNodes, false, toStreamNode); + // we can inherit parent key and value serde + return new KStreamImpl<>(name, keySerde, valSerde, sourceNodes, false, toStreamNode, builder); } @Override @@ -382,15 +374,15 @@ public String queryableStoreName() { builder.addGraphNode(streamsGraphNode, node); return new KTableImpl<K, S, V>( - builder, name, - suppressionSupplier, keySerde, valSerde, Collections.singleton(this.name), null, false, - node + suppressionSupplier, + node, + builder ); } @@ -475,7 +467,7 @@ public String queryableStoreName() { final String joinMergeName = builder.newProcessorName(MERGE_NAME); return buildJoin( - (AbstractStream<K>) other, + (AbstractStream<K, VO>) other, joiner, leftOuter, rightOuter, @@ -485,8 +477,7 @@ public String queryableStoreName() { ); } - @SuppressWarnings("unchecked") - private <V1, R> KTable<K, R> buildJoin(final AbstractStream<K> other, + private <V1, R> KTable<K, R> buildJoin(final AbstractStream<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner, final boolean leftOuter, final boolean rightOuter, @@ -520,29 +511,9 @@ public String queryableStoreName() { joinOther = new KTableKTableOuterJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner)); } - final KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>( - new KTableImpl<K, V, R>( - builder, - joinThisName, - joinThis, - sourceNodes, - this.queryableStoreName, - false, - null - ), - new KTableImpl<K, V1, R>( - builder, - joinOtherName, - joinOther, - ((KTableImpl<K, ?, ?>) other).sourceNodes, - ((KTableImpl<K, ?, ?>) other).queryableStoreName, - false, - null - ), - internalQueryableName - ); + final KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>(joinThis, joinOther, internalQueryableName); - final KTableKTableJoinNode.KTableKTableJoinNodeBuilder kTableJoinNodeBuilder = KTableKTableJoinNode.kTableKTableJoinNodeBuilder(); + final KTableKTableJoinNode.KTableKTableJoinNodeBuilder<K, Change<V>, Change<V1>, Change<R>> kTableJoinNodeBuilder = KTableKTableJoinNode.kTableKTableJoinNodeBuilder(); // only materialize if specified in Materialized if (materializedInternal != null) { @@ -550,9 +521,9 @@ public String queryableStoreName() { } kTableJoinNodeBuilder.withNodeName(joinMergeName); - final ProcessorParameters joinThisProcessorParameters = new ProcessorParameters(joinThis, joinThisName); - final ProcessorParameters joinOtherProcessorParameters = new ProcessorParameters(joinOther, joinOtherName); - final ProcessorParameters joinMergeProcessorParameters = new ProcessorParameters(joinMerge, joinMergeName); + final ProcessorParameters<K, Change<V>> joinThisProcessorParameters = new ProcessorParameters<>(joinThis, joinThisName); + final ProcessorParameters<K, Change<V1>> joinOtherProcessorParameters = new ProcessorParameters<>(joinOther, joinOtherName); + final ProcessorParameters<K, Change<R>> joinMergeProcessorParameters = new ProcessorParameters<>(joinMerge, joinMergeName); kTableJoinNodeBuilder.withJoinMergeProcessorParameters(joinMergeProcessorParameters) .withJoinOtherProcessorParameters(joinOtherProcessorParameters) @@ -562,23 +533,26 @@ public String queryableStoreName() { .withOtherJoinSideNodeName(((KTableImpl) other).name) .withThisJoinSideNodeName(name); - final KTableKTableJoinNode kTableKTableJoinNode = kTableJoinNodeBuilder.build(); + final KTableKTableJoinNode<K, Change<V>, Change<V1>, Change<R>> kTableKTableJoinNode = kTableJoinNodeBuilder.build(); builder.addGraphNode(this.streamsGraphNode, kTableKTableJoinNode); - return new KTableImpl<>( - builder, + // we can inherit parent key serde if user do not provide specific overrides + return new KTableImpl<K, Change<R>, R>( joinMergeName, - joinMerge, + materializedInternal != null && materializedInternal.keySerde() != null ? materializedInternal.keySerde() : keySerde, + materializedInternal != null ? materializedInternal.valueSerde() : null, allSourceNodes, internalQueryableName, internalQueryableName != null, - kTableKTableJoinNode + joinMerge, + kTableKTableJoinNode, + builder ); } @Override public <K1, V1> KGroupedTable<K1, V1> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector) { - return this.groupBy(selector, Serialized.with(null, null)); + return groupBy(selector, Serialized.with(null, null)); } @Override @@ -592,20 +566,20 @@ public String queryableStoreName() { final ProcessorParameters<K, Change<V>> processorParameters = new ProcessorParameters<>(selectSupplier, selectName); // select the aggregate key and values (old and new), it would require parent to send old values - final ProcessorGraphNode<K, Change<V>> groupByMapNode = new ProcessorGraphNode<>( - selectName, - processorParameters, - false - ); + final ProcessorGraphNode<K, Change<V>> groupByMapNode = new ProcessorGraphNode<>(selectName, processorParameters, false); builder.addGraphNode(this.streamsGraphNode, groupByMapNode); this.enableSendingOldValues(); + final SerializedInternal<K1, V1> serializedInternal = new SerializedInternal<>(serialized); + + // we cannot inherit parent key and value serdes since both of them may have changed; + // we can only inherit from what serialized specified here return new KGroupedTableImpl<>( builder, selectName, - this.name, + sourceNodes, serializedInternal.keySerde(), serializedInternal.valueSerde(), groupByMapNode diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java index 5c464b9b79c..2ed70bd46a2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java @@ -27,13 +27,13 @@ class KTableKTableJoinMerger<K, V> implements KTableProcessorSupplier<K, V, V> { - private final KTableImpl<K, ?, V> parent1; - private final KTableImpl<K, ?, V> parent2; + private final KTableProcessorSupplier<K, ?, V> parent1; + private final KTableProcessorSupplier<K, ?, V> parent2; private final String queryableName; private boolean sendOldValues = false; - KTableKTableJoinMerger(final KTableImpl<K, ?, V> parent1, - final KTableImpl<K, ?, V> parent2, + KTableKTableJoinMerger(final KTableProcessorSupplier<K, ?, V> parent1, + final KTableProcessorSupplier<K, ?, V> parent2, final String queryableName) { this.parent1 = parent1; this.parent2 = parent2; @@ -55,13 +55,13 @@ return new KTableValueGetterSupplier<K, V>() { public KTableValueGetter<K, V> get() { - return parent1.valueGetterSupplier().get(); + return parent1.view().get(); } @Override public String[] storeNames() { - final String[] storeNames1 = parent1.valueGetterSupplier().storeNames(); - final String[] storeNames2 = parent2.valueGetterSupplier().storeNames(); + final String[] storeNames1 = parent1.view().storeNames(); + final String[] storeNames2 = parent2.view().storeNames(); final Set<String> stores = new HashSet<>(storeNames1.length + storeNames2.length); Collections.addAll(stores, storeNames1); Collections.addAll(stores, storeNames2); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java index 98076e068ac..e185f4d926d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java @@ -28,6 +28,7 @@ import org.apache.kafka.streams.kstream.SessionWindowedKStream; import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.WindowedSerdes; import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode; import org.apache.kafka.streams.state.SessionBytesStoreSupplier; import org.apache.kafka.streams.state.SessionStore; @@ -40,10 +41,8 @@ import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME; import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME; -public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K> implements SessionWindowedKStream<K, V> { +public class SessionWindowedKStreamImpl<K, V> extends AbstractStream<K, V> implements SessionWindowedKStream<K, V> { private final SessionWindows windows; - private final Serde<K> keySerde; - private final Serde<V> valSerde; private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder; private final Merger<K, Long> countMerger = (aggKey, aggOne, aggTwo) -> aggOne + aggTwo; @@ -55,11 +54,9 @@ final Serde<V> valSerde, final GroupedStreamAggregateBuilder<K, V> aggregateBuilder, final StreamsGraphNode streamsGraphNode) { - super(builder, name, sourceNodes, streamsGraphNode); + super(name, keySerde, valSerde, sourceNodes, streamsGraphNode, builder); Objects.requireNonNull(windows, "windows can't be null"); this.windows = windows; - this.keySerde = keySerde; - this.valSerde = valSerde; this.aggregateBuilder = aggregateBuilder; } @@ -92,16 +89,17 @@ } return aggregateBuilder.build( + AGGREGATE_NAME, + materialize(materializedInternal), new KStreamSessionWindowAggregate<>( - windows, materializedInternal.storeName(), + windows, + materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator, - countMerger - ), - AGGREGATE_NAME, - materialize(materializedInternal), - materializedInternal.isQueryable() - ); + countMerger), + materializedInternal.isQueryable(), + materializedInternal.keySerde() != null ? new WindowedSerdes.SessionWindowedSerde<>(materializedInternal.keySerde()) : null, + materializedInternal.valueSerde()); } @Override @@ -125,6 +123,8 @@ } return aggregateBuilder.build( + REDUCE_NAME, + materialize(materializedInternal), new KStreamSessionWindowAggregate<>( windows, materializedInternal.storeName(), @@ -132,10 +132,9 @@ reduceAggregator, mergerForAggregator(reduceAggregator) ), - REDUCE_NAME, - materialize(materializedInternal), - materializedInternal.isQueryable() - ); + materializedInternal.isQueryable(), + materializedInternal.keySerde() != null ? new WindowedSerdes.SessionWindowedSerde<>(materializedInternal.keySerde()) : null, + materializedInternal.valueSerde()); } @Override @@ -160,18 +159,19 @@ if (materializedInternal.keySerde() == null) { materializedInternal.withKeySerde(keySerde); } + return aggregateBuilder.build( + AGGREGATE_NAME, + materialize(materializedInternal), new KStreamSessionWindowAggregate<>( windows, materializedInternal.storeName(), initializer, aggregator, - sessionMerger - ), - AGGREGATE_NAME, - materialize(materializedInternal), - materializedInternal.isQueryable() - ); + sessionMerger), + materializedInternal.isQueryable(), + materializedInternal.keySerde() != null ? new WindowedSerdes.SessionWindowedSerde<>(materializedInternal.keySerde()) : null, + materializedInternal.valueSerde()); } @SuppressWarnings("deprecation") // continuing to support SessionWindows#maintainMs in fallback mode diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java index 5c5cfb2bed7..2ee8f7c5958 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.kstream.TimeWindowedKStream; import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.WindowedSerdes; import org.apache.kafka.streams.kstream.Windows; import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode; import org.apache.kafka.streams.state.StoreBuilder; @@ -40,11 +41,9 @@ import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.AGGREGATE_NAME; import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDUCE_NAME; -public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStream<K> implements TimeWindowedKStream<K, V> { +public class TimeWindowedKStreamImpl<K, V, W extends Window> extends AbstractStream<K, V> implements TimeWindowedKStream<K, V> { private final Windows<W> windows; - private final Serde<K> keySerde; - private final Serde<V> valSerde; private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder; TimeWindowedKStreamImpl(final Windows<W> windows, @@ -55,9 +54,7 @@ final Serde<V> valSerde, final boolean repartitionRequired, final StreamsGraphNode streamsGraphNode) { - super(builder, name, sourceNodes, streamsGraphNode); - this.valSerde = valSerde; - this.keySerde = keySerde; + super(name, keySerde, valSerde, sourceNodes, streamsGraphNode, builder); this.windows = Objects.requireNonNull(windows, "windows can't be null"); this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(builder, keySerde, valSerde, repartitionRequired, sourceNodes, name, streamsGraphNode); } @@ -92,19 +89,14 @@ } return aggregateBuilder.build( - new KStreamWindowAggregate<>( - windows, - materializedInternal.storeName(), - aggregateBuilder.countInitializer, - aggregateBuilder.countAggregator - ), AGGREGATE_NAME, materialize(materializedInternal), - materializedInternal.isQueryable() - ); + new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator), + materializedInternal.isQueryable(), + materializedInternal.keySerde() != null ? new WindowedSerdes.TimeWindowedSerde<>(materializedInternal.keySerde()) : null, + materializedInternal.valueSerde()); } - @Override public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator) { @@ -124,15 +116,12 @@ materializedInternal.withKeySerde(keySerde); } return aggregateBuilder.build( - new KStreamWindowAggregate<>( - windows, - materializedInternal.storeName(), - initializer, - aggregator - ), AGGREGATE_NAME, materialize(materializedInternal), - materializedInternal.isQueryable()); + new KStreamWindowAggregate<>(windows, materializedInternal.storeName(), initializer, aggregator), + materializedInternal.isQueryable(), + materializedInternal.keySerde() != null ? new WindowedSerdes.TimeWindowedSerde<>(materializedInternal.keySerde()) : null, + materializedInternal.valueSerde()); } @Override @@ -156,11 +145,12 @@ } return aggregateBuilder.build( - new KStreamWindowReduce<>(windows, materializedInternal.storeName(), reducer), REDUCE_NAME, materialize(materializedInternal), - materializedInternal.isQueryable() - ); + new KStreamWindowReduce<>(windows, materializedInternal.storeName(), reducer), + materializedInternal.isQueryable(), + materializedInternal.keySerde() != null ? new WindowedSerdes.TimeWindowedSerde<>(materializedInternal.keySerde()) : null, + materializedInternal.valueSerde()); } @SuppressWarnings("deprecation") // continuing to support Windows#maintainMs/segmentInterval in fallback mode diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java index 97fb69d7c2a..4d1b67dbc33 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GroupedTableOperationRepartitionNode.java @@ -102,11 +102,10 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { } - public static GroupedTableOperationRepartitionNodeBuilder groupedTableOperationNodeBuilder() { - return new GroupedTableOperationRepartitionNodeBuilder(); + public static <K1, V1> GroupedTableOperationRepartitionNodeBuilder<K1, V1> groupedTableOperationNodeBuilder() { + return new GroupedTableOperationRepartitionNodeBuilder<>(); } - public static final class GroupedTableOperationRepartitionNodeBuilder<K, V> { private Serde<K> keySerde; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java index b63b66de38a..41c27bac5f9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java @@ -99,7 +99,7 @@ public String toString() { "} " + super.toString(); } - public static <K, V, V1, V2, VR> KTableKTableJoinNodeBuilder<K, V1, V2, VR> kTableKTableJoinNodeBuilder() { + public static <K, V1, V2, VR> KTableKTableJoinNodeBuilder<K, V1, V2, VR> kTableKTableJoinNodeBuilder() { return new KTableKTableJoinNodeBuilder<>(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java index eb3d9f691fd..4b767b41493 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java @@ -28,17 +28,17 @@ */ public class ProcessorParameters<K, V> { - private final ProcessorSupplier<K, V> processorSupplier; + private final ProcessorSupplier<? super K, ? super V> processorSupplier; private final String processorName; - public ProcessorParameters(final ProcessorSupplier<K, V> processorSupplier, + public ProcessorParameters(final ProcessorSupplier<? super K, ? super V> processorSupplier, final String processorName) { this.processorSupplier = processorSupplier; this.processorName = processorName; } - public ProcessorSupplier<K, V> processorSupplier() { + public ProcessorSupplier<? super K, ? super V> processorSupplier() { return processorSupplier; } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowedSerdesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowedSerdesTest.java index 4360d0846ec..ff180d173b5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/WindowedSerdesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/WindowedSerdesTest.java @@ -17,15 +17,37 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.junit.Assert; import org.junit.Test; +import static org.junit.Assert.assertTrue; + public class WindowedSerdesTest { private final String topic = "sample"; + @Test + public void shouldWrapForTimeWindowedSerde() { + final Serde<Windowed<String>> serde = WindowedSerdes.timeWindowedSerdeFrom(String.class); + assertTrue(serde.serializer() instanceof TimeWindowedSerializer); + assertTrue(serde.deserializer() instanceof TimeWindowedDeserializer); + assertTrue(((TimeWindowedSerializer) serde.serializer()).innerSerializer() instanceof StringSerializer); + assertTrue(((TimeWindowedDeserializer) serde.deserializer()).innerDeserializer() instanceof StringDeserializer); + } + + @Test + public void shouldWrapForSessionWindowedSerde() { + final Serde<Windowed<String>> serde = WindowedSerdes.sessionWindowedSerdeFrom(String.class); + assertTrue(serde.serializer() instanceof SessionWindowedSerializer); + assertTrue(serde.deserializer() instanceof SessionWindowedDeserializer); + assertTrue(((SessionWindowedSerializer) serde.serializer()).innerSerializer() instanceof StringSerializer); + assertTrue(((SessionWindowedDeserializer) serde.deserializer()).innerDeserializer() instanceof StringDeserializer); + } + @Test public void testTimeWindowSerdeFrom() { final Windowed<Integer> timeWindowed = new Windowed<>(10, new TimeWindow(0, Long.MAX_VALUE)); @@ -43,5 +65,4 @@ public void testSessionWindowedSerdeFrom() { final Windowed<Integer> windowed = sessionWindowedSerde.deserializer().deserialize(topic, bytes); Assert.assertEquals(sessionWindowed, windowed); } - } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java index d98fd796aef..b360ceced03 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java @@ -96,7 +96,7 @@ public void testShouldBeExtensible() { assertTrue(supplier.theCapturedProcessor().processed.size() <= expectedKeys.length); } - private class ExtendedKStream<K, V> extends AbstractStream<K> { + private class ExtendedKStream<K, V> extends AbstractStream<K, V> { ExtendedKStream(final KStream<K, V> stream) { super((KStreamImpl<K, V>) stream); @@ -104,11 +104,12 @@ public void testShouldBeExtensible() { KStream<K, V> randomFilter() { final String name = builder.newProcessorName("RANDOM-FILTER-"); - final ProcessorGraphNode processorNode = new ProcessorGraphNode(name, - new ProcessorParameters<>(new ExtendedKStreamDummy<>(), name), - false); + final ProcessorGraphNode<K, V> processorNode = new ProcessorGraphNode<>( + name, + new ProcessorParameters<>(new ExtendedKStreamDummy<>(), name), + false); builder.addGraphNode(this.streamsGraphNode, processorNode); - return new KStreamImpl<>(builder, name, sourceNodes, false, processorNode); + return new KStreamImpl<K, V>(name, null, null, sourceNodes, false, processorNode, builder); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index bce7fc80a40..119a7b72d98 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Utils; @@ -32,12 +33,17 @@ import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.Serialized; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.kstream.ValueMapperWithKey; +import org.apache.kafka.streams.kstream.ValueTransformer; import org.apache.kafka.streams.kstream.ValueTransformerSupplier; import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; +import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.SourceNode; @@ -76,6 +82,8 @@ private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer()); private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); + private Serde<String> mySerde = new Serdes.StringSerde(); + @Before public void before() { builder = new StreamsBuilder(); @@ -180,6 +188,121 @@ public Integer apply(final Integer value1, final Integer value2) { TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").build(null).processors().size()); } + @Test + public void shouldPreserveSerdesForOperators() { + final StreamsBuilder builder = new StreamsBuilder(); + final KStream<String, String> stream1 = builder.stream(Collections.singleton("topic-1"), stringConsumed); + final KTable<String, String> table1 = builder.table("topic-2", stringConsumed); + final GlobalKTable<String, String> table2 = builder.globalTable("topic-2", stringConsumed); + final ConsumedInternal<String, String> consumedInternal = new ConsumedInternal<>(stringConsumed); + + final KeyValueMapper<String, String, String> selector = (key, value) -> key; + final KeyValueMapper<String, String, Iterable<KeyValue<String, String>>> flatSelector = (key, value) -> Collections.singleton(new KeyValue<>(key, value)); + final ValueMapper<String, String> mapper = value -> value; + final ValueMapper<String, Iterable<String>> flatMapper = Collections::singleton; + final ValueJoiner<String, String, String> joiner = (value1, value2) -> value1; + final TransformerSupplier<String, String, KeyValue<String, String>> transformerSupplier = () -> new Transformer<String, String, KeyValue<String, String>>() { + @Override + public void init(final ProcessorContext context) {} + + @Override + public KeyValue<String, String> transform(final String key, final String value) { + return new KeyValue<>(key, value); + } + + @Override + public void close() {} + }; + final ValueTransformerSupplier<String, String> valueTransformerSupplier = () -> new ValueTransformer<String, String>() { + @Override + public void init(final ProcessorContext context) {} + + @Override + public String transform(final String value) { + return value; + } + + @Override + public void close() {} + }; + + assertEquals(((AbstractStream) stream1.filter((key, value) -> false)).keySerde(), consumedInternal.keySerde()); + assertEquals(((AbstractStream) stream1.filter((key, value) -> false)).valueSerde(), consumedInternal.valueSerde()); + + assertEquals(((AbstractStream) stream1.filterNot((key, value) -> false)).keySerde(), consumedInternal.keySerde()); + assertEquals(((AbstractStream) stream1.filterNot((key, value) -> false)).valueSerde(), consumedInternal.valueSerde()); + + assertNull(((AbstractStream) stream1.selectKey(selector)).keySerde()); + assertEquals(((AbstractStream) stream1.selectKey(selector)).valueSerde(), consumedInternal.valueSerde()); + + assertNull(((AbstractStream) stream1.map(KeyValue::new)).keySerde()); + assertNull(((AbstractStream) stream1.map(KeyValue::new)).valueSerde()); + + assertEquals(((AbstractStream) stream1.mapValues(mapper)).keySerde(), consumedInternal.keySerde()); + assertNull(((AbstractStream) stream1.mapValues(mapper)).valueSerde()); + + assertNull(((AbstractStream) stream1.flatMap(flatSelector)).keySerde()); + assertNull(((AbstractStream) stream1.flatMap(flatSelector)).valueSerde()); + + assertEquals(((AbstractStream) stream1.flatMapValues(flatMapper)).keySerde(), consumedInternal.keySerde()); + assertNull(((AbstractStream) stream1.flatMapValues(flatMapper)).valueSerde()); + + assertNull(((AbstractStream) stream1.transform(transformerSupplier)).keySerde()); + assertNull(((AbstractStream) stream1.transform(transformerSupplier)).valueSerde()); + + assertEquals(((AbstractStream) stream1.transformValues(valueTransformerSupplier)).keySerde(), consumedInternal.keySerde()); + assertNull(((AbstractStream) stream1.transformValues(valueTransformerSupplier)).valueSerde()); + + assertNull(((AbstractStream) stream1.merge(stream1)).keySerde()); + assertNull(((AbstractStream) stream1.merge(stream1)).valueSerde()); + + assertEquals(((AbstractStream) stream1.through("topic-3")).keySerde(), consumedInternal.keySerde()); + assertEquals(((AbstractStream) stream1.through("topic-3")).valueSerde(), consumedInternal.valueSerde()); + assertEquals(((AbstractStream) stream1.through("topic-3", Produced.with(mySerde, mySerde))).keySerde(), mySerde); + assertEquals(((AbstractStream) stream1.through("topic-3", Produced.with(mySerde, mySerde))).valueSerde(), mySerde); + + assertEquals(((AbstractStream) stream1.groupByKey()).keySerde(), consumedInternal.keySerde()); + assertEquals(((AbstractStream) stream1.groupByKey()).valueSerde(), consumedInternal.valueSerde()); + assertEquals(((AbstractStream) stream1.groupByKey(Serialized.with(mySerde, mySerde))).keySerde(), mySerde); + assertEquals(((AbstractStream) stream1.groupByKey(Serialized.with(mySerde, mySerde))).valueSerde(), mySerde); + + assertEquals(((AbstractStream) stream1.groupBy(selector)).keySerde(), null); + assertEquals(((AbstractStream) stream1.groupBy(selector)).valueSerde(), consumedInternal.valueSerde()); + assertEquals(((AbstractStream) stream1.groupBy(selector, Serialized.with(mySerde, mySerde))).keySerde(), mySerde); + assertEquals(((AbstractStream) stream1.groupBy(selector, Serialized.with(mySerde, mySerde))).valueSerde(), mySerde); + + assertEquals(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(100L))).keySerde(), null); + assertEquals(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(100L))).valueSerde(), null); + assertEquals(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde); + assertNull(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).valueSerde()); + + assertEquals(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(100L))).keySerde(), null); + assertEquals(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(100L))).valueSerde(), null); + assertEquals(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde); + assertNull(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).valueSerde()); + + assertEquals(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(100L))).keySerde(), null); + assertEquals(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(100L))).valueSerde(), null); + assertEquals(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde); + assertNull(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).valueSerde()); + + assertEquals(((AbstractStream) stream1.join(table1, joiner)).keySerde(), consumedInternal.keySerde()); + assertEquals(((AbstractStream) stream1.join(table1, joiner)).valueSerde(), null); + assertEquals(((AbstractStream) stream1.join(table1, joiner, Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde); + assertEquals(((AbstractStream) stream1.join(table1, joiner, Joined.with(mySerde, mySerde, mySerde))).valueSerde(), null); + + assertEquals(((AbstractStream) stream1.leftJoin(table1, joiner)).keySerde(), consumedInternal.keySerde()); + assertEquals(((AbstractStream) stream1.leftJoin(table1, joiner)).valueSerde(), null); + assertEquals(((AbstractStream) stream1.leftJoin(table1, joiner, Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde); + assertEquals(((AbstractStream) stream1.leftJoin(table1, joiner, Joined.with(mySerde, mySerde, mySerde))).valueSerde(), null); + + assertEquals(((AbstractStream) stream1.join(table2, selector, joiner)).keySerde(), consumedInternal.keySerde()); + assertEquals(((AbstractStream) stream1.join(table2, selector, joiner)).valueSerde(), null); + + assertEquals(((AbstractStream) stream1.leftJoin(table2, selector, joiner)).keySerde(), consumedInternal.keySerde()); + assertEquals(((AbstractStream) stream1.leftJoin(table2, selector, joiner)).valueSerde(), null); + } + @Test public void shouldUseRecordMetadataTimestampExtractorWithThrough() { final StreamsBuilder builder = new StreamsBuilder(); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java index 2692c17f021..10a27b9f2bb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java @@ -24,7 +24,6 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.StreamsTestUtils; @@ -43,20 +42,11 @@ @Test public void testMap() { final StreamsBuilder builder = new StreamsBuilder(); - - final KeyValueMapper<Integer, String, KeyValue<String, Integer>> mapper = - new KeyValueMapper<Integer, String, KeyValue<String, Integer>>() { - @Override - public KeyValue<String, Integer> apply(final Integer key, final String value) { - return KeyValue.pair(value, key); - } - }; - final int[] expectedKeys = new int[]{0, 1, 2, 3}; final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>(); final KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String())); - stream.map(mapper).process(supplier); + stream.map((key, value) -> KeyValue.pair(value, key)).process(supplier); try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { for (final int expectedKey : expectedKeys) { @@ -75,16 +65,9 @@ public void testMap() { @Test public void testTypeVariance() { - final KeyValueMapper<Number, Object, KeyValue<Number, String>> stringify = new KeyValueMapper<Number, Object, KeyValue<Number, String>>() { - @Override - public KeyValue<Number, String> apply(final Number key, final Object value) { - return KeyValue.pair(key, key + ":" + value); - } - }; - new StreamsBuilder() .<Integer, String>stream("numbers") - .map(stringify) + .map((key, value) -> KeyValue.pair(key, key + ":" + value)) .to("strings"); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java index eb586e60de1..6e666c94691 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java @@ -16,10 +16,12 @@ */ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyDescription; @@ -28,13 +30,17 @@ import org.apache.kafka.streams.TopologyWrapper; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.kstream.ValueMapperWithKey; +import org.apache.kafka.streams.kstream.ValueTransformerWithKey; import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; +import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.SinkNode; import org.apache.kafka.streams.processor.internals.SourceNode; @@ -62,6 +68,7 @@ public class KTableImplTest { + private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String()); private final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String()); private final Produced<String, String> produced = Produced.with(Serdes.String(), Serdes.String()); private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String()); @@ -70,6 +77,8 @@ private StreamsBuilder builder; private KTable<String, String> table; + private Serde<String> mySerde = new Serdes.StringSerde(); + @Before public void setUp() { builder = new StreamsBuilder(); @@ -125,6 +134,74 @@ public boolean test(final String key, final Integer value) { assertEquals(Utils.mkList("A:01", "B:02", "C:03", "D:04"), processors.get(3).processed); } + @Test + public void shouldPreserveSerdesForOperators() { + final StreamsBuilder builder = new StreamsBuilder(); + final KTable<String, String> table1 = builder.table("topic-2", stringConsumed); + final ConsumedInternal<String, String> consumedInternal = new ConsumedInternal<>(stringConsumed); + + final KeyValueMapper<String, String, String> selector = (key, value) -> key; + final ValueMapper<String, String> mapper = value -> value; + final ValueJoiner<String, String, String> joiner = (value1, value2) -> value1; + final ValueTransformerWithKeySupplier<String, String, String> valueTransformerWithKeySupplier = () -> new ValueTransformerWithKey<String, String, String>() { + @Override + public void init(final ProcessorContext context) {} + + @Override + public String transform(final String key, final String value) { + return value; + } + + @Override + public void close() {} + }; + + assertEquals(((AbstractStream) table1.filter((key, value) -> false)).keySerde(), consumedInternal.keySerde()); + assertEquals(((AbstractStream) table1.filter((key, value) -> false)).valueSerde(), consumedInternal.valueSerde()); + assertEquals(((AbstractStream) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); + assertEquals(((AbstractStream) table1.filter((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); + + assertEquals(((AbstractStream) table1.filterNot((key, value) -> false)).keySerde(), consumedInternal.keySerde()); + assertEquals(((AbstractStream) table1.filterNot((key, value) -> false)).valueSerde(), consumedInternal.valueSerde()); + assertEquals(((AbstractStream) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); + assertEquals(((AbstractStream) table1.filterNot((key, value) -> false, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); + + assertEquals(((AbstractStream) table1.mapValues(mapper)).keySerde(), consumedInternal.keySerde()); + assertNull(((AbstractStream) table1.mapValues(mapper)).valueSerde()); + assertEquals(((AbstractStream) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); + assertEquals(((AbstractStream) table1.mapValues(mapper, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); + + assertEquals(((AbstractStream) table1.toStream()).keySerde(), consumedInternal.keySerde()); + assertEquals(((AbstractStream) table1.toStream()).valueSerde(), consumedInternal.valueSerde()); + assertNull(((AbstractStream) table1.toStream(selector)).keySerde()); + assertEquals(((AbstractStream) table1.toStream(selector)).valueSerde(), consumedInternal.valueSerde()); + + assertEquals(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier)).keySerde(), consumedInternal.keySerde()); + assertNull(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier)).valueSerde()); + assertEquals(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); + assertEquals(((AbstractStream) table1.transformValues(valueTransformerWithKeySupplier, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); + + assertEquals(((AbstractStream) table1.groupBy(KeyValue::new)).keySerde(), null); + assertEquals(((AbstractStream) table1.groupBy(KeyValue::new)).valueSerde(), null); + assertEquals(((AbstractStream) table1.groupBy(KeyValue::new, Serialized.with(mySerde, mySerde))).keySerde(), mySerde); + assertEquals(((AbstractStream) table1.groupBy(KeyValue::new, Serialized.with(mySerde, mySerde))).valueSerde(), mySerde); + + assertEquals(((AbstractStream) table1.join(table1, joiner)).keySerde(), consumedInternal.keySerde()); + assertEquals(((AbstractStream) table1.join(table1, joiner)).valueSerde(), null); + assertEquals(((AbstractStream) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); + assertEquals(((AbstractStream) table1.join(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); + + assertEquals(((AbstractStream) table1.leftJoin(table1, joiner)).keySerde(), consumedInternal.keySerde()); + assertEquals(((AbstractStream) table1.leftJoin(table1, joiner)).valueSerde(), null); + assertEquals(((AbstractStream) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); + assertEquals(((AbstractStream) table1.leftJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); + + assertEquals(((AbstractStream) table1.outerJoin(table1, joiner)).keySerde(), consumedInternal.keySerde()); + assertEquals(((AbstractStream) table1.outerJoin(table1, joiner)).valueSerde(), null); + assertEquals(((AbstractStream) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).keySerde(), mySerde); + assertEquals(((AbstractStream) table1.outerJoin(table1, joiner, Materialized.with(mySerde, mySerde))).valueSerde(), mySerde); + } + @Test public void testValueGetter() { final StreamsBuilder builder = new StreamsBuilder(); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java index d65f27e2b19..75e9f5120b0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java @@ -38,9 +38,7 @@ public class StreamsGraphTest { - final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition"); - - + private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition"); // Test builds topology in succesive manner but only graph node not yet processed written to topology ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Serde Inheritance in Streams DSL > -------------------------------- > > Key: KAFKA-7456 > URL: https://issues.apache.org/jira/browse/KAFKA-7456 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Guozhang Wang > Assignee: Guozhang Wang > Priority: Major > Fix For: 2.1.0 > > > This is a prerequisite for further topology optimization in the Streams DSL: > we should let different operators inside the DSL to be able to pass along key > and value serdes if they are not explicitly specified by users. The serde > specification precedence should generally be: > 1) Overridden values via control objects (e.g. Materialized, Serialized, > Consumed, etc) > 2) Serdes that can be inferred from the operator itself (e.g. > groupBy().count(), where value serde can default to `LongSerde`). > 3) Serde inherited from parent operator if possible (note if the key / value > types have been changed, then the corresponding serde cannot be inherited). > 4) Default serde specified in the config. -- This message was sent by Atlassian JIRA (v7.6.3#76005)