This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push: new bc51d80 KAFKA-8254: Pass Changelog as Topic in Suppress Serdes (#6602) (#6641) bc51d80 is described below commit bc51d803478e90af62d242ad8cc6cdaf348d3c17 Author: John Roesler <vvcep...@users.noreply.github.com> AuthorDate: Fri Apr 26 17:15:07 2019 -0500 KAFKA-8254: Pass Changelog as Topic in Suppress Serdes (#6602) (#6641) Cherry-picked from #6602 Reviewers: Matthias J. Sax <matth...@confluent.io>, Guozhang Wang <wangg...@gmail.com> --- .../streams/kstream/internals/FullChangeSerde.java | 8 +- .../streams/kstream/internals/KStreamImpl.java | 36 +++- .../streams/kstream/internals/KTableImpl.java | 9 +- .../suppress/KTableSuppressProcessor.java | 43 ++--- .../processor/internals/ProcessorContextImpl.java | 4 +- .../internals/ProcessorRecordContext.java | 23 +-- .../InMemoryTimeOrderedKeyValueBuffer.java | 62 ++++-- .../state/internals/TimeOrderedKeyValueBuffer.java | 57 +++++- .../integration/SuppressionIntegrationTest.java | 165 ++++++++++++++-- .../kstream/internals/FullChangeSerdeTest.java | 20 +- .../suppress/KTableSuppressProcessorTest.java | 49 +++-- .../internals/AbstractProcessorContextTest.java | 4 +- .../InMemoryTimeOrderedKeyValueBufferTest.java | 4 +- .../internals/TimeOrderedKeyValueBufferTest.java | 208 +++++++++------------ 14 files changed, 448 insertions(+), 244 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java index 9bb8373..f06a428 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java @@ -25,21 +25,21 @@ import java.util.Map; import static java.util.Objects.requireNonNull; -public class FullChangeSerde<T> implements Serde<Change<T>> { +public final class FullChangeSerde<T> implements Serde<Change<T>> { private final Serde<T> inner; @SuppressWarnings("unchecked") - public static <T> FullChangeSerde<T> castOrWrap(final Serde<?> serde) { + public static <T> FullChangeSerde<T> castOrWrap(final Serde<T> serde) { if (serde == null) { return null; } else if (serde instanceof FullChangeSerde) { return (FullChangeSerde<T>) serde; } else { - return new FullChangeSerde<T>((Serde<T>) serde); + return new FullChangeSerde<>(serde); } } - public FullChangeSerde(final Serde<T> inner) { + private FullChangeSerde(final Serde<T> inner) { this.inner = requireNonNull(inner); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 26ea63c..88529fb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -392,18 +392,26 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @Override public KStream<K, V> through(final String topic) { - return through(topic, Produced.with(null, null, null)); + return through(topic, Produced.with(keySerde, valSerde, null)); } @Override public KStream<K, V> through(final String topic, final Produced<K, V> produced) { + Objects.requireNonNull(topic, "topic can't be null"); + Objects.requireNonNull(produced, "Produced can't be null"); final ProducedInternal<K, V> producedInternal = new ProducedInternal<>(produced); + if (producedInternal.keySerde() == null) { + producedInternal.withKeySerde(keySerde); + } + if (producedInternal.valueSerde() == null) { + producedInternal.withValueSerde(valSerde); + } to(topic, producedInternal); return builder.stream( Collections.singleton(topic), new ConsumedInternal<>( - producedInternal.keySerde() != null ? producedInternal.keySerde() : keySerde, - producedInternal.valueSerde() != null ? producedInternal.valueSerde() : valSerde, + producedInternal.keySerde(), + producedInternal.valueSerde(), new FailOnInvalidTimestamp(), null ) @@ -412,26 +420,40 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K @Override public void to(final String topic) { - to(topic, Produced.with(null, null, null)); + to(topic, Produced.with(keySerde, valSerde, null)); } @Override public void to(final String topic, final Produced<K, V> produced) { Objects.requireNonNull(topic, "topic can't be null"); Objects.requireNonNull(produced, "Produced can't be null"); - to(new StaticTopicNameExtractor<>(topic), new ProducedInternal<>(produced)); + final ProducedInternal<K, V> producedInternal = new ProducedInternal<>(produced); + if (producedInternal.keySerde() == null) { + producedInternal.withKeySerde(keySerde); + } + if (producedInternal.valueSerde() == null) { + producedInternal.withValueSerde(valSerde); + } + to(new StaticTopicNameExtractor<>(topic), producedInternal); } @Override public void to(final TopicNameExtractor<K, V> topicExtractor) { - to(topicExtractor, Produced.with(null, null, null)); + to(topicExtractor, Produced.with(keySerde, valSerde, null)); } @Override public void to(final TopicNameExtractor<K, V> topicExtractor, final Produced<K, V> produced) { Objects.requireNonNull(topicExtractor, "topic extractor can't be null"); Objects.requireNonNull(produced, "Produced can't be null"); - to(topicExtractor, new ProducedInternal<>(produced)); + final ProducedInternal<K, V> producedInternal = new ProducedInternal<>(produced); + if (producedInternal.keySerde() == null) { + producedInternal.withKeySerde(keySerde); + } + if (producedInternal.valueSerde() == null) { + producedInternal.withValueSerde(valSerde); + } + to(topicExtractor, producedInternal); } private void to(final TopicNameExtractor<K, V> topicExtractor, final ProducedInternal<K, V> produced) { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index f49d109..25e76f6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -366,18 +366,13 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< suppressedInternal.name() != null ? suppressedInternal.name() + "-store" : builder.newStoreName(SUPPRESS_NAME); final ProcessorSupplier<K, Change<V>> suppressionSupplier = - () -> new KTableSuppressProcessor<>( - suppressedInternal, - storeName, - keySerde, - valSerde == null ? null : new FullChangeSerde<>(valSerde) - ); + () -> new KTableSuppressProcessor<>(suppressedInternal, storeName); final ProcessorGraphNode<K, Change<V>> node = new StatefulProcessorNode<>( name, new ProcessorParameters<>(suppressionSupplier, name), - new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName), + new InMemoryTimeOrderedKeyValueBuffer.Builder<>(storeName, keySerde, FullChangeSerde.castOrWrap(valSerde)), false ); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java index 622223c..fcbc60b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java @@ -18,8 +18,6 @@ package org.apache.kafka.streams.kstream.internals.suppress; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.kstream.internals.FullChangeSerde; @@ -28,7 +26,6 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; -import org.apache.kafka.streams.state.internals.ContextualRecord; import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer; import static java.util.Objects.requireNonNull; @@ -42,22 +39,14 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> { private final boolean safeToDropTombstones; private final String storeName; - private TimeOrderedKeyValueBuffer buffer; + private TimeOrderedKeyValueBuffer<K, Change<V>> buffer; private InternalProcessorContext internalProcessorContext; - private Serde<K> keySerde; - private FullChangeSerde<V> valueSerde; - private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; - public KTableSuppressProcessor(final SuppressedInternal<K> suppress, - final String storeName, - final Serde<K> keySerde, - final FullChangeSerde<V> valueSerde) { + public KTableSuppressProcessor(final SuppressedInternal<K> suppress, final String storeName) { this.storeName = storeName; requireNonNull(suppress); - this.keySerde = keySerde; - this.valueSerde = valueSerde; maxRecords = suppress.bufferConfig().maxRecords(); maxBytes = suppress.bufferConfig().maxBytes(); suppressDurationMillis = suppress.timeToWaitForMoreEvents().toMillis(); @@ -70,9 +59,9 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> { @Override public void init(final ProcessorContext context) { internalProcessorContext = (InternalProcessorContext) context; - keySerde = keySerde == null ? (Serde<K>) context.keySerde() : keySerde; - valueSerde = valueSerde == null ? FullChangeSerde.castOrWrap(context.valueSerde()) : valueSerde; - buffer = requireNonNull((TimeOrderedKeyValueBuffer) context.getStateStore(storeName)); + + buffer = requireNonNull((TimeOrderedKeyValueBuffer<K, Change<V>>) context.getStateStore(storeName)); + buffer.setSerdesIfNull((Serde<K>) context.keySerde(), FullChangeSerde.castOrWrap((Serde<V>) context.valueSerde())); } @Override @@ -84,12 +73,7 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> { private void buffer(final K key, final Change<V> value) { final long bufferTime = bufferTimeDefinition.time(internalProcessorContext, key); - final ProcessorRecordContext recordContext = internalProcessorContext.recordContext(); - - final Bytes serializedKey = Bytes.wrap(keySerde.serializer().serialize(null, key)); - final byte[] serializedValue = valueSerde.serializer().serialize(null, value); - - buffer.put(bufferTime, serializedKey, new ContextualRecord(serializedValue, recordContext)); + buffer.put(bufferTime, key, value, internalProcessorContext.recordContext()); } private void enforceConstraints() { @@ -110,6 +94,11 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> { buffer.numRecords(), maxRecords, buffer.bufferSize(), maxBytes )); + default: + throw new UnsupportedOperationException( + "The bufferFullStrategy [" + bufferFullStrategy + + "] is not implemented. This is a bug in Kafka Streams." + ); } } } @@ -118,14 +107,12 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> { return buffer.numRecords() > maxRecords || buffer.bufferSize() > maxBytes; } - private void emit(final KeyValue<Bytes, ContextualRecord> toEmit) { - final Change<V> value = valueSerde.deserializer().deserialize(null, toEmit.value.value()); - if (shouldForward(value)) { + private void emit(final TimeOrderedKeyValueBuffer.Eviction<K, Change<V>> toEmit) { + if (shouldForward(toEmit.value())) { final ProcessorRecordContext prevRecordContext = internalProcessorContext.recordContext(); - internalProcessorContext.setRecordContext(toEmit.value.recordContext()); + internalProcessorContext.setRecordContext(toEmit.recordContext()); try { - final K key = keySerde.deserializer().deserialize(null, toEmit.key.get()); - internalProcessorContext.forward(key, value); + internalProcessorContext.forward(toEmit.key(), toEmit.value()); } finally { internalProcessorContext.setRecordContext(prevRecordContext); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index fa78d01..c9f2431 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -110,7 +110,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re final V value, final To to) { final ProcessorNode previousNode = currentNode(); - final long currentTimestamp = recordContext.timestamp; + final long currentTimestamp = recordContext.timestamp(); try { toInternal.update(to); @@ -138,7 +138,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re } } } finally { - recordContext.timestamp = currentTimestamp; + recordContext.setTimestamp(currentTimestamp); setCurrentNode(previousNode); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java index 4f991a2..d4e1594 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java @@ -29,11 +29,11 @@ import static java.nio.charset.StandardCharsets.UTF_8; public class ProcessorRecordContext implements RecordContext { - long timestamp; - final long offset; - final String topic; - final int partition; - final Headers headers; + private long timestamp; + private final long offset; + private final String topic; + private final int partition; + private final Headers headers; public ProcessorRecordContext(final long timestamp, final long offset, @@ -48,13 +48,6 @@ public class ProcessorRecordContext implements RecordContext { this.headers = headers; } - public ProcessorRecordContext(final long timestamp, - final long offset, - final int partition, - final String topic) { - this(timestamp, offset, partition, topic, null); - } - public void setTimestamp(final long timestamp) { this.timestamp = timestamp; } @@ -221,9 +214,13 @@ public class ProcessorRecordContext implements RecordContext { Objects.equals(headers, that.headers); } + /** + * Equality is implemented in support of tests, *not* for use in Hash collections, since this class is mutable. + */ + @Deprecated @Override public int hashCode() { - return Objects.hash(timestamp, offset, topic, partition, headers); + throw new UnsupportedOperationException("ProcessorRecordContext is unsafe for use in Hash collections"); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java index d323d97..82d07a8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java @@ -22,8 +22,8 @@ import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.BytesSerializer; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; @@ -47,7 +47,7 @@ import java.util.function.Supplier; import static java.util.Objects.requireNonNull; -public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuffer { +public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrderedKeyValueBuffer<K, V> { private static final BytesSerializer KEY_SERIALIZER = new BytesSerializer(); private static final ByteArraySerializer VALUE_SERIALIZER = new ByteArraySerializer(); private static final RecordHeaders V_1_CHANGELOG_HEADERS = @@ -60,6 +60,9 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa private final String storeName; private final boolean loggingEnabled; + private Serde<K> keySerde; + private Serde<V> valueSerde; + private long memBufferSize = 0L; private long minTimestamp = Long.MAX_VALUE; private RecordCollector collector; @@ -69,13 +72,17 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa private int partition; - public static class Builder implements StoreBuilder<StateStore> { + public static class Builder<K, V> implements StoreBuilder<StateStore> { private final String storeName; + private final Serde<K> keySerde; + private final Serde<V> valSerde; private boolean loggingEnabled = true; - public Builder(final String storeName) { + public Builder(final String storeName, final Serde<K> keySerde, final Serde<V> valSerde) { this.storeName = storeName; + this.keySerde = keySerde; + this.valSerde = valSerde; } /** @@ -114,8 +121,8 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa } @Override - public StateStore build() { - return new InMemoryTimeOrderedKeyValueBuffer(storeName, loggingEnabled); + public InMemoryTimeOrderedKeyValueBuffer<K, V> build() { + return new InMemoryTimeOrderedKeyValueBuffer<>(storeName, loggingEnabled, keySerde, valSerde); } @Override @@ -145,8 +152,12 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa @Override public boolean equals(final Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } final BufferKey bufferKey = (BufferKey) o; return time == bufferKey.time && Objects.equals(key, bufferKey.key); @@ -173,9 +184,14 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa } } - private InMemoryTimeOrderedKeyValueBuffer(final String storeName, final boolean loggingEnabled) { + private InMemoryTimeOrderedKeyValueBuffer(final String storeName, + final boolean loggingEnabled, + final Serde<K> keySerde, + final Serde<V> valueSerde) { this.storeName = storeName; this.loggingEnabled = loggingEnabled; + this.keySerde = keySerde; + this.valueSerde = valueSerde; } @Override @@ -190,6 +206,12 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa } @Override + public void setSerdesIfNull(final Serde<K> keySerde, final Serde<V> valueSerde) { + this.keySerde = this.keySerde == null ? keySerde : this.keySerde; + this.valueSerde = this.valueSerde == null ? valueSerde : this.valueSerde; + } + + @Override public void init(final ProcessorContext context, final StateStore root) { context.register(root, (RecordBatchingStateRestoreCallback) this::restoreBatch); if (loggingEnabled) { @@ -343,7 +365,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa @Override public void evictWhile(final Supplier<Boolean> predicate, - final Consumer<KeyValue<Bytes, ContextualRecord>> callback) { + final Consumer<Eviction<K, V>> callback) { final Iterator<Map.Entry<BufferKey, ContextualRecord>> delegate = sortedMap.entrySet().iterator(); if (predicate.get()) { @@ -360,7 +382,9 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa next.getKey().time + "]" ); } - callback.accept(new KeyValue<>(next.getKey().key, next.getValue())); + final K key = keySerde.deserializer().deserialize(changelogTopic, next.getKey().key.get()); + final V value = valueSerde.deserializer().deserialize(changelogTopic, next.getValue().value()); + callback.accept(new Eviction<>(key, value, next.getValue().recordContext())); delegate.remove(); index.remove(next.getKey().key); @@ -383,13 +407,17 @@ public final class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyVa @Override public void put(final long time, - final Bytes key, - final ContextualRecord contextualRecord) { - requireNonNull(contextualRecord.value(), "value cannot be null"); - requireNonNull(contextualRecord.recordContext(), "recordContext cannot be null"); + final K key, + final V value, + final ProcessorRecordContext recordContext) { + requireNonNull(value, "value cannot be null"); + requireNonNull(recordContext, "recordContext cannot be null"); + + final Bytes serializedKey = Bytes.wrap(keySerde.serializer().serialize(changelogTopic, key)); + final byte[] serializedValue = valueSerde.serializer().serialize(changelogTopic, value); - cleanPut(time, key, contextualRecord); - dirtyKeys.add(key); + cleanPut(time, serializedKey, new ContextualRecord(serializedValue, recordContext)); + dirtyKeys.add(serializedKey); } private void cleanPut(final long time, final Bytes key, final ContextualRecord value) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java index 86a8c1e..ffa1f49 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java @@ -16,17 +16,64 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import java.util.Objects; import java.util.function.Consumer; import java.util.function.Supplier; -public interface TimeOrderedKeyValueBuffer extends StateStore { - void evictWhile(final Supplier<Boolean> predicate, final Consumer<KeyValue<Bytes, ContextualRecord>> callback); +public interface TimeOrderedKeyValueBuffer<K, V> extends StateStore { + final class Eviction<K, V> { + private final K key; + private final V value; + private final ProcessorRecordContext recordContext; - void put(final long time, final Bytes key, final ContextualRecord value); + Eviction(final K key, final V value, final ProcessorRecordContext recordContext) { + this.key = key; + this.value = value; + this.recordContext = recordContext; + } + + public K key() { + return key; + } + + public V value() { + return value; + } + + public ProcessorRecordContext recordContext() { + return recordContext; + } + + @Override + public String toString() { + return "Eviction{key=" + key + ", value=" + value + ", recordContext=" + recordContext + '}'; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final Eviction<?, ?> eviction = (Eviction<?, ?>) o; + return Objects.equals(key, eviction.key) && + Objects.equals(value, eviction.value) && + Objects.equals(recordContext, eviction.recordContext); + } + + @Override + public int hashCode() { + return Objects.hash(key, value, recordContext); + } + } + + void setSerdesIfNull(final Serde<K> keySerde, final Serde<V> valueSerde); + + void evictWhile(final Supplier<Boolean> predicate, final Consumer<Eviction<K, V>> callback); + + void put(long time, K key, V value, ProcessorRecordContext recordContext); int numRecords(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java index da91b91..79957d1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java @@ -17,9 +17,13 @@ package org.apache.kafka.streams.integration; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; @@ -35,6 +39,7 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; @@ -44,6 +49,8 @@ import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.hamcrest.Matchers; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -53,6 +60,7 @@ import java.util.List; import java.util.Locale; import java.util.Optional; import java.util.Properties; +import java.util.stream.Collectors; import static java.lang.Long.MAX_VALUE; import static java.time.Duration.ofMillis; @@ -62,6 +70,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.DEFAULT_TIMEOUT; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes; @@ -73,7 +82,7 @@ import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -@Category({IntegrationTest.class}) +@Category(IntegrationTest.class) public class SuppressionIntegrationTest { @ClassRule public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster( @@ -147,7 +156,7 @@ public class SuppressionIntegrationTest { } } - private KTable<String, Long> buildCountsTable(final String input, final StreamsBuilder builder) { + private static KTable<String, Long> buildCountsTable(final String input, final StreamsBuilder builder) { return builder .table( input, @@ -161,6 +170,139 @@ public class SuppressionIntegrationTest { } @Test + public void shouldUseDefaultSerdes() { + final String testId = "-shouldInheritSerdes"; + final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; + final String input = "input" + testId; + final String outputSuppressed = "output-suppressed" + testId; + final String outputRaw = "output-raw" + testId; + + cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed); + + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream<String, String> inputStream = builder.stream(input); + + final KTable<String, String> valueCounts = inputStream + .groupByKey() + .aggregate(() -> "()", (key, value, aggregate) -> aggregate + ",(" + key + ": " + value + ")"); + + valueCounts + .suppress(untilTimeLimit(ofMillis(MAX_VALUE), maxRecords(1L).emitEarlyWhenFull())) + .toStream() + .to(outputSuppressed); + + valueCounts + .toStream() + .to(outputRaw); + + final Properties streamsConfig = getStreamsConfig(appId); + streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + + final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true); + try { + produceSynchronously( + input, + asList( + new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)), + new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)), + new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)), + new KeyValueTimestamp<>("x", "x", scaledTime(3L)) + ) + ); + final boolean rawRecords = waitForAnyRecord(outputRaw); + final boolean suppressedRecords = waitForAnyRecord(outputSuppressed); + assertThat(rawRecords, Matchers.is(true)); + assertThat(suppressedRecords, is(true)); + } finally { + driver.close(); + cleanStateAfterTest(CLUSTER, driver); + } + } + + @Test + public void shouldInheritSerdes() { + final String testId = "-shouldInheritSerdes"; + final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; + final String input = "input" + testId; + final String outputSuppressed = "output-suppressed" + testId; + final String outputRaw = "output-raw" + testId; + + cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed); + + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream<String, String> inputStream = builder.stream(input); + + // count sets the serde to Long + final KTable<String, Long> valueCounts = inputStream + .groupByKey() + .count(); + + valueCounts + .suppress(untilTimeLimit(ofMillis(MAX_VALUE), maxRecords(1L).emitEarlyWhenFull())) + .toStream() + .to(outputSuppressed); + + valueCounts + .toStream() + .to(outputRaw); + + final Properties streamsConfig = getStreamsConfig(appId); + streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + + final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true); + try { + produceSynchronously( + input, + asList( + new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)), + new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)), + new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)), + new KeyValueTimestamp<>("x", "x", scaledTime(3L)) + ) + ); + final boolean rawRecords = waitForAnyRecord(outputRaw); + final boolean suppressedRecords = waitForAnyRecord(outputSuppressed); + assertThat(rawRecords, Matchers.is(true)); + assertThat(suppressedRecords, is(true)); + } finally { + driver.close(); + cleanStateAfterTest(CLUSTER, driver); + } + } + + private static boolean waitForAnyRecord(final String topic) { + final Properties properties = new Properties(); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + + try (final Consumer<Object, Object> consumer = new KafkaConsumer<>(properties)) { + final List<TopicPartition> partitions = + consumer.partitionsFor(topic) + .stream() + .map(pi -> new TopicPartition(pi.topic(), pi.partition())) + .collect(Collectors.toList()); + consumer.assign(partitions); + consumer.seekToBeginning(partitions); + final long start = System.currentTimeMillis(); + while ((System.currentTimeMillis() - start) < DEFAULT_TIMEOUT) { + final ConsumerRecords<Object, Object> records = consumer.poll(ofMillis(500)); + + if (!records.isEmpty()) { + return true; + } + } + + return false; + } + } + + @Test public void shouldNotSuppressIntermediateEventsWithZeroEmitAfter() { final String testId = "-shouldNotSuppressIntermediateEventsWithZeroEmitAfter"; final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; @@ -336,7 +478,7 @@ public class SuppressionIntegrationTest { valueCounts // this is a bit brittle, but I happen to know that the entries are a little over 100 bytes in size. - .suppress(untilTimeLimit(Duration.ofMillis(MAX_VALUE), maxBytes(200L).emitEarlyWhenFull())) + .suppress(untilTimeLimit(ofMillis(MAX_VALUE), maxBytes(200L).emitEarlyWhenFull())) .toStream() .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long())); @@ -396,7 +538,7 @@ public class SuppressionIntegrationTest { valueCounts // this is a bit brittle, but I happen to know that the entries are a little over 100 bytes in size. - .suppress(untilTimeLimit(Duration.ofMillis(MAX_VALUE), maxBytes(200L).shutDownWhenFull())) + .suppress(untilTimeLimit(ofMillis(MAX_VALUE), maxBytes(200L).shutDownWhenFull())) .toStream() .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long())); @@ -490,17 +632,18 @@ public class SuppressionIntegrationTest { } } - private Properties getStreamsConfig(final String appId) { + private static Properties getStreamsConfig(final String appId) { return mkProperties(mkMap( mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId), mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), mkEntry(StreamsConfig.POLL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)), mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)), - mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, AT_LEAST_ONCE) + mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, AT_LEAST_ONCE), + mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()) )); } - private String scaledWindowKey(final String key, final long unscaledStart, final long unscaledEnd) { + private static String scaledWindowKey(final String key, final long unscaledStart, final long unscaledEnd) { return new Windowed<>(key, new TimeWindow(scaledTime(unscaledStart), scaledTime(unscaledEnd))).toString(); } @@ -508,11 +651,11 @@ public class SuppressionIntegrationTest { * scaling to ensure that there are commits in between the various test events, * just to exercise that everything works properly in the presence of commits. */ - private long scaledTime(final long unscaledTime) { + private static long scaledTime(final long unscaledTime) { return COMMIT_INTERVAL * 2 * unscaledTime; } - private void produceSynchronously(final String topic, final List<KeyValueTimestamp<String, String>> toProduce) { + private static void produceSynchronously(final String topic, final List<KeyValueTimestamp<String, String>> toProduce) { final Properties producerConfig = mkProperties(mkMap( mkEntry(ProducerConfig.CLIENT_ID_CONFIG, "anything"), mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ((Serializer<String>) STRING_SERIALIZER).getClass().getName()), @@ -522,12 +665,12 @@ public class SuppressionIntegrationTest { IntegrationTestUtils.produceSynchronously(producerConfig, false, topic, Optional.empty(), toProduce); } - private void verifyErrorShutdown(final KafkaStreams driver) throws InterruptedException { + private static void verifyErrorShutdown(final KafkaStreams driver) throws InterruptedException { waitForCondition(() -> !driver.state().isRunning(), TIMEOUT_MS, "Streams didn't shut down."); assertThat(driver.state(), is(KafkaStreams.State.ERROR)); } - private void verifyOutput(final String topic, final List<KeyValueTimestamp<String, Long>> keyValueTimestamps) { + private static void verifyOutput(final String topic, final List<KeyValueTimestamp<String, Long>> keyValueTimestamps) { final Properties properties = mkProperties( mkMap( mkEntry(ConsumerConfig.GROUP_ID_CONFIG, "test-group"), diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java index a6a8888..ddba05e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java @@ -29,7 +29,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; public class FullChangeSerdeTest { - private final FullChangeSerde<String> serde = new FullChangeSerde<>(Serdes.String()); + private final FullChangeSerde<String> serde = FullChangeSerde.castOrWrap(Serdes.String()); @Test public void shouldRoundTripNull() { @@ -77,31 +77,28 @@ public class FullChangeSerdeTest { ); } - @SuppressWarnings("unchecked") @Test public void shouldConfigureSerde() { final Serde<Void> mock = EasyMock.mock(Serde.class); mock.configure(emptyMap(), false); EasyMock.expectLastCall(); EasyMock.replay(mock); - final FullChangeSerde<Void> serde = new FullChangeSerde<>(mock); + final FullChangeSerde<Void> serde = FullChangeSerde.castOrWrap(mock); serde.configure(emptyMap(), false); EasyMock.verify(mock); } - @SuppressWarnings("unchecked") @Test public void shouldCloseSerde() { final Serde<Void> mock = EasyMock.mock(Serde.class); mock.close(); EasyMock.expectLastCall(); EasyMock.replay(mock); - final FullChangeSerde<Void> serde = new FullChangeSerde<>(mock); + final FullChangeSerde<Void> serde = FullChangeSerde.castOrWrap(mock); serde.close(); EasyMock.verify(mock); } - @SuppressWarnings("unchecked") @Test public void shouldConfigureSerializer() { final Serde<Void> mockSerde = EasyMock.mock(Serde.class); @@ -111,13 +108,12 @@ public class FullChangeSerdeTest { mockSerializer.configure(emptyMap(), false); EasyMock.expectLastCall(); EasyMock.replay(mockSerializer); - final Serializer<Change<Void>> serializer = new FullChangeSerde<>(mockSerde).serializer(); + final Serializer<Change<Void>> serializer = FullChangeSerde.castOrWrap(mockSerde).serializer(); serializer.configure(emptyMap(), false); EasyMock.verify(mockSerde); EasyMock.verify(mockSerializer); } - @SuppressWarnings("unchecked") @Test public void shouldCloseSerializer() { final Serde<Void> mockSerde = EasyMock.mock(Serde.class); @@ -127,13 +123,12 @@ public class FullChangeSerdeTest { mockSerializer.close(); EasyMock.expectLastCall(); EasyMock.replay(mockSerializer); - final Serializer<Change<Void>> serializer = new FullChangeSerde<>(mockSerde).serializer(); + final Serializer<Change<Void>> serializer = FullChangeSerde.castOrWrap(mockSerde).serializer(); serializer.close(); EasyMock.verify(mockSerde); EasyMock.verify(mockSerializer); } - @SuppressWarnings("unchecked") @Test public void shouldConfigureDeserializer() { final Serde<Void> mockSerde = EasyMock.mock(Serde.class); @@ -143,13 +138,12 @@ public class FullChangeSerdeTest { mockDeserializer.configure(emptyMap(), false); EasyMock.expectLastCall(); EasyMock.replay(mockDeserializer); - final Deserializer<Change<Void>> serializer = new FullChangeSerde<>(mockSerde).deserializer(); + final Deserializer<Change<Void>> serializer = FullChangeSerde.castOrWrap(mockSerde).deserializer(); serializer.configure(emptyMap(), false); EasyMock.verify(mockSerde); EasyMock.verify(mockDeserializer); } - @SuppressWarnings("unchecked") @Test public void shouldCloseDeserializer() { final Serde<Void> mockSerde = EasyMock.mock(Serde.class); @@ -159,7 +153,7 @@ public class FullChangeSerdeTest { mockDeserializer.close(); EasyMock.expectLastCall(); EasyMock.replay(mockDeserializer); - final Deserializer<Change<Void>> serializer = new FullChangeSerde<>(mockSerde).deserializer(); + final Deserializer<Change<Void>> serializer = FullChangeSerde.castOrWrap(mockSerde).deserializer(); serializer.close(); EasyMock.verify(mockSerde); EasyMock.verify(mockDeserializer); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java index bb1bc0f..6246459 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java @@ -72,11 +72,12 @@ public class KTableSuppressProcessorTest { final String storeName = "test-store"; - final StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName) + final StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder<>(storeName, keySerde, FullChangeSerde.castOrWrap(valueSerde)) .withLoggingDisabled() .build(); + final KTableSuppressProcessor<K, V> processor = - new KTableSuppressProcessor<>(getImpl(suppressed), storeName, keySerde, new FullChangeSerde<>(valueSerde)); + new KTableSuppressProcessor<>((SuppressedInternal<K>) suppressed, storeName); final MockInternalProcessorContext context = new MockInternalProcessorContext(); buffer.init(context, buffer); @@ -201,7 +202,6 @@ public class KTableSuppressProcessorTest { // note the record is in the past, but the window end is in the future, so we still have to buffer, // even though the grace period is 0. final long timestamp = 5L; - final long streamTime = 99L; final long windowEnd = 100L; context.setRecordMetadata("", 0, 0L, null, timestamp); final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, windowEnd)); @@ -237,8 +237,12 @@ public class KTableSuppressProcessorTest { assertThat(capturedForward.timestamp(), is(timestamp)); } + /** + * It's desirable to drop tombstones for final-results windowed streams, since (as described in the + * {@link SuppressedInternal} javadoc), they are unnecessary to emit. + */ @Test - public void finalResultsShouldSuppressTombstonesForTimeWindows() { + public void finalResultsShouldDropTombstonesForTimeWindows() { final Harness<Windowed<String>, Long> harness = new Harness<>(finalResults(ofMillis(0L)), timeWindowedSerdeFrom(String.class, 100L), Long()); final MockInternalProcessorContext context = harness.context; @@ -253,8 +257,13 @@ public class KTableSuppressProcessorTest { assertThat(context.forwarded(), hasSize(0)); } + + /** + * It's desirable to drop tombstones for final-results windowed streams, since (as described in the + * {@link SuppressedInternal} javadoc), they are unnecessary to emit. + */ @Test - public void finalResultsShouldSuppressTombstonesForSessionWindows() { + public void finalResultsShouldDropTombstonesForSessionWindows() { final Harness<Windowed<String>, Long> harness = new Harness<>(finalResults(ofMillis(0L)), sessionWindowedSerdeFrom(String.class), Long()); final MockInternalProcessorContext context = harness.context; @@ -269,8 +278,12 @@ public class KTableSuppressProcessorTest { assertThat(context.forwarded(), hasSize(0)); } + /** + * It's NOT OK to drop tombstones for non-final-results windowed streams, since we may have emitted some results for + * the window before getting the tombstone (see the {@link SuppressedInternal} javadoc). + */ @Test - public void suppressShouldNotSuppressTombstonesForTimeWindows() { + public void suppressShouldNotDropTombstonesForTimeWindows() { final Harness<Windowed<String>, Long> harness = new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), timeWindowedSerdeFrom(String.class, 100L), Long()); final MockInternalProcessorContext context = harness.context; @@ -288,8 +301,13 @@ public class KTableSuppressProcessorTest { assertThat(capturedForward.timestamp(), is(timestamp)); } + + /** + * It's NOT OK to drop tombstones for non-final-results windowed streams, since we may have emitted some results for + * the window before getting the tombstone (see the {@link SuppressedInternal} javadoc). + */ @Test - public void suppressShouldNotSuppressTombstonesForSessionWindows() { + public void suppressShouldNotDropTombstonesForSessionWindows() { final Harness<Windowed<String>, Long> harness = new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), sessionWindowedSerdeFrom(String.class), Long()); final MockInternalProcessorContext context = harness.context; @@ -307,8 +325,13 @@ public class KTableSuppressProcessorTest { assertThat(capturedForward.timestamp(), is(timestamp)); } + + /** + * It's SUPER NOT OK to drop tombstones for non-windowed streams, since we may have emitted some results for + * the key before getting the tombstone (see the {@link SuppressedInternal} javadoc). + */ @Test - public void suppressShouldNotSuppressTombstonesForKTable() { + public void suppressShouldNotDropTombstonesForKTable() { final Harness<String, Long> harness = new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), String(), Long()); final MockInternalProcessorContext context = harness.context; @@ -416,8 +439,8 @@ public class KTableSuppressProcessorTest { } } - @SuppressWarnings("unchecked") - private <K extends Windowed> SuppressedInternal<K> finalResults(final Duration grace) { + @SuppressWarnings({"unchecked", "rawtypes"}) + private static <K extends Windowed> SuppressedInternal<K> finalResults(final Duration grace) { return ((FinalResultsSuppressionBuilder) untilWindowCloses(unbounded())).buildFinalResultsSuppression(grace); } @@ -441,11 +464,7 @@ public class KTableSuppressProcessorTest { }; } - private static <K> SuppressedInternal<K> getImpl(final Suppressed<K> suppressed) { - return (SuppressedInternal<K>) suppressed; - } - - private <K> Serde<Windowed<K>> timeWindowedSerdeFrom(final Class<K> rawType, final long windowSize) { + private static <K> Serde<Windowed<K>> timeWindowedSerdeFrom(final Class<K> rawType, final long windowSize) { final Serde<K> kSerde = Serdes.serdeFrom(rawType); return new Serdes.WrapperSerde<>( new TimeWindowedSerializer<>(kSerde.serializer()), diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java index 43df1d2..1864547 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java @@ -96,7 +96,7 @@ public class AbstractProcessorContextTest { @Test public void shouldReturnNullIfTopicEqualsNonExistTopic() { - context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC)); + context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC, null)); assertThat(context.topic(), nullValue()); } @@ -154,7 +154,7 @@ public class AbstractProcessorContextTest { @Test public void shouldReturnNullIfHeadersAreNotSet() { - context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC)); + context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC, null)); assertThat(context.headers(), nullValue()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java index ddc4046..18f689f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBufferTest.java @@ -22,11 +22,11 @@ public class InMemoryTimeOrderedKeyValueBufferTest { @Test public void bufferShouldAllowCacheEnablement() { - new InMemoryTimeOrderedKeyValueBuffer.Builder(null).withCachingEnabled(); + new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null).withCachingEnabled(); } @Test public void bufferShouldAllowCacheDisablement() { - new InMemoryTimeOrderedKeyValueBuffer.Builder(null).withCachingDisabled(); + new InMemoryTimeOrderedKeyValueBuffer.Builder<>(null, null, null).withCachingDisabled(); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java index 2953953..6ae36d4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java @@ -22,13 +22,14 @@ import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.record.TimestampType; -import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; +import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer.Eviction; import org.apache.kafka.test.MockInternalProcessorContext; import org.apache.kafka.test.MockInternalProcessorContext.MockRecordCollector; import org.apache.kafka.test.TestUtils; @@ -55,7 +56,7 @@ import static org.hamcrest.Matchers.is; import static org.junit.Assert.fail; @RunWith(Parameterized.class) -public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer> { +public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<String, String>> { private static final RecordHeaders V_1_CHANGELOG_HEADERS = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})}); @@ -69,9 +70,9 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer> return singletonList( new Object[] { "in-memory buffer", - (Function<String, InMemoryTimeOrderedKeyValueBuffer>) name -> - (InMemoryTimeOrderedKeyValueBuffer) new InMemoryTimeOrderedKeyValueBuffer - .Builder(name) + (Function<String, InMemoryTimeOrderedKeyValueBuffer<String, String>>) name -> + new InMemoryTimeOrderedKeyValueBuffer + .Builder<>(name, Serdes.String(), Serdes.String()) .build() } ); @@ -96,7 +97,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer> } - private static void cleanup(final MockInternalProcessorContext context, final TimeOrderedKeyValueBuffer buffer) { + private static void cleanup(final MockInternalProcessorContext context, final TimeOrderedKeyValueBuffer<String, String> buffer) { try { buffer.close(); Utils.delete(context.stateDir()); @@ -107,7 +108,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer> @Test public void shouldInit() { - final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); + final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName); final MockInternalProcessorContext context = makeContext(); buffer.init(context, buffer); cleanup(context, buffer); @@ -115,23 +116,23 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer> @Test public void shouldAcceptData() { - final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); + final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName); final MockInternalProcessorContext context = makeContext(); buffer.init(context, buffer); - putRecord(buffer, context, "2p93nf", 0, "asdf"); + putRecord(buffer, context, 0L, 0L, "asdf", "2p93nf"); cleanup(context, buffer); } @Test public void shouldRejectNullValues() { - final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); + final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName); final MockInternalProcessorContext context = makeContext(); buffer.init(context, buffer); try { - buffer.put(0, getBytes("asdf"), new ContextualRecord( - null, - new ProcessorRecordContext(0, 0, 0, "topic") - )); + buffer.put(0, "asdf", + null, + getContext(0) + ); fail("expected an exception"); } catch (final NullPointerException expected) { // expected @@ -139,27 +140,12 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer> cleanup(context, buffer); } - private static ContextualRecord getRecord(final String value) { - return getRecord(value, 0L); - } - - private static ContextualRecord getRecord(final String value, final long timestamp) { - return new ContextualRecord( - value.getBytes(UTF_8), - new ProcessorRecordContext(timestamp, 0, 0, "topic") - ); - } - - private static Bytes getBytes(final String key) { - return Bytes.wrap(key.getBytes(UTF_8)); - } - @Test public void shouldRemoveData() { - final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); + final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName); final MockInternalProcessorContext context = makeContext(); buffer.init(context, buffer); - putRecord(buffer, context, "qwer", 0, "asdf"); + putRecord(buffer, context, 0L, 0L, "asdf", "qwer"); assertThat(buffer.numRecords(), is(1)); buffer.evictWhile(() -> true, kv -> { }); assertThat(buffer.numRecords(), is(0)); @@ -168,90 +154,71 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer> @Test public void shouldRespectEvictionPredicate() { - final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); + final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName); final MockInternalProcessorContext context = makeContext(); buffer.init(context, buffer); - final Bytes firstKey = getBytes("asdf"); - final ContextualRecord firstRecord = getRecord("eyt"); - putRecord(0, buffer, context, firstRecord, firstKey); - putRecord(buffer, context, "rtg", 1, "zxcv"); + putRecord(buffer, context, 0L, 0L, "asdf", "eyt"); + putRecord(buffer, context, 1L, 0L, "zxcv", "rtg"); assertThat(buffer.numRecords(), is(2)); - final List<KeyValue<Bytes, ContextualRecord>> evicted = new LinkedList<>(); + final List<Eviction<String, String>> evicted = new LinkedList<>(); buffer.evictWhile(() -> buffer.numRecords() > 1, evicted::add); assertThat(buffer.numRecords(), is(1)); - assertThat(evicted, is(singletonList(new KeyValue<>(firstKey, firstRecord)))); + assertThat(evicted, is(singletonList(new Eviction<>("asdf", "eyt", getContext(0L))))); cleanup(context, buffer); } @Test public void shouldTrackCount() { - final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); + final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName); final MockInternalProcessorContext context = makeContext(); buffer.init(context, buffer); - putRecord(buffer, context, "oin", 0, "asdf"); + putRecord(buffer, context, 0L, 0L, "asdf", "oin"); assertThat(buffer.numRecords(), is(1)); - putRecord(buffer, context, "wekjn", 1, "asdf"); + putRecord(buffer, context, 1L, 0L, "asdf", "wekjn"); assertThat(buffer.numRecords(), is(1)); - putRecord(buffer, context, "24inf", 0, "zxcv"); + putRecord(buffer, context, 0L, 0L, "zxcv", "24inf"); assertThat(buffer.numRecords(), is(2)); cleanup(context, buffer); } @Test public void shouldTrackSize() { - final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); + final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName); final MockInternalProcessorContext context = makeContext(); buffer.init(context, buffer); - putRecord(buffer, context, "23roni", 0, "asdf"); + putRecord(buffer, context, 0L, 0L, "asdf", "23roni"); assertThat(buffer.bufferSize(), is(43L)); - putRecord(buffer, context, "3l", 1, "asdf"); + putRecord(buffer, context, 1L, 0L, "asdf", "3l"); assertThat(buffer.bufferSize(), is(39L)); - putRecord(buffer, context, "qfowin", 0, "zxcv"); + putRecord(buffer, context, 0L, 0L, "zxcv", "qfowin"); assertThat(buffer.bufferSize(), is(82L)); cleanup(context, buffer); } @Test public void shouldTrackMinTimestamp() { - final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); + final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName); final MockInternalProcessorContext context = makeContext(); buffer.init(context, buffer); - putRecord(buffer, context, "2093j", 1, "asdf"); + putRecord(buffer, context, 1L, 0L, "asdf", "2093j"); assertThat(buffer.minTimestamp(), is(1L)); - putRecord(buffer, context, "3gon4i", 0, "zxcv"); + putRecord(buffer, context, 0L, 0L, "zxcv", "3gon4i"); assertThat(buffer.minTimestamp(), is(0L)); cleanup(context, buffer); } - private static void putRecord(final TimeOrderedKeyValueBuffer buffer, - final MockInternalProcessorContext context, - final String value, - final int time, - final String key) { - putRecord(time, buffer, context, getRecord(value), getBytes(key)); - } - - private static void putRecord(final int time, - final TimeOrderedKeyValueBuffer buffer, - final MockInternalProcessorContext context, - final ContextualRecord firstRecord, - final Bytes firstKey) { - context.setRecordContext(firstRecord.recordContext()); - buffer.put(time, firstKey, firstRecord); - } - @Test public void shouldEvictOldestAndUpdateSizeAndCountAndMinTimestamp() { - final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); + final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName); final MockInternalProcessorContext context = makeContext(); buffer.init(context, buffer); - putRecord(buffer, context, "o23i4", 1, "zxcv"); + putRecord(buffer, context, 1L, 0L, "zxcv", "o23i4"); assertThat(buffer.numRecords(), is(1)); assertThat(buffer.bufferSize(), is(42L)); assertThat(buffer.minTimestamp(), is(1L)); - putRecord(buffer, context, "3ng", 0, "asdf"); + putRecord(buffer, context, 0L, 0L, "asdf", "3ng"); assertThat(buffer.numRecords(), is(2)); assertThat(buffer.bufferSize(), is(82L)); assertThat(buffer.minTimestamp(), is(0L)); @@ -260,14 +227,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer> buffer.evictWhile(() -> true, kv -> { switch (callbackCount.incrementAndGet()) { case 1: { - assertThat(new String(kv.key.get(), UTF_8), is("asdf")); + assertThat(kv.key(), is("asdf")); assertThat(buffer.numRecords(), is(2)); assertThat(buffer.bufferSize(), is(82L)); assertThat(buffer.minTimestamp(), is(0L)); break; } case 2: { - assertThat(new String(kv.key.get(), UTF_8), is("zxcv")); + assertThat(kv.key(), is("zxcv")); assertThat(buffer.numRecords(), is(1)); assertThat(buffer.bufferSize(), is(42L)); assertThat(buffer.minTimestamp(), is(1L)); @@ -288,12 +255,12 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer> @Test public void shouldFlush() { - final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); + final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName); final MockInternalProcessorContext context = makeContext(); buffer.init(context, buffer); - putRecord(2, buffer, context, getRecord("2093j", 0L), getBytes("asdf")); - putRecord(1, buffer, context, getRecord("3gon4i", 1L), getBytes("zxcv")); - putRecord(0, buffer, context, getRecord("deadbeef", 2L), getBytes("deleteme")); + putRecord(buffer, context, 2L, 0L, "asdf", "2093j"); + putRecord(buffer, context, 1L, 1L, "zxcv", "3gon4i"); + putRecord(buffer, context, 0L, 2L, "deleteme", "deadbeef"); // replace "deleteme" with a tombstone buffer.evictWhile(() -> buffer.minTimestamp() < 1, kv -> { }); @@ -357,17 +324,16 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer> cleanup(context, buffer); } - @Test public void shouldRestoreOldFormat() { - final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); + final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName); final MockInternalProcessorContext context = makeContext(); buffer.init(context, buffer); final RecordBatchingStateRestoreCallback stateRestoreCallback = (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName); - context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "")); + context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null)); stateRestoreCallback.restoreBatch(asList( new ConsumerRecord<>("changelog-topic", @@ -425,7 +391,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer> // flush the buffer into a list in buffer order so we can make assertions about the contents. - final List<KeyValue<Bytes, ContextualRecord>> evicted = new LinkedList<>(); + final List<Eviction<String, String>> evicted = new LinkedList<>(); buffer.evictWhile(() -> true, evicted::add); // Several things to note: @@ -437,22 +403,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer> // original format. assertThat(evicted, is(asList( - new KeyValue<>( - getBytes("zxcv"), - new ContextualRecord("3o4im".getBytes(UTF_8), - new ProcessorRecordContext(2, - 2, - 0, - "changelog-topic", - new RecordHeaders()))), - new KeyValue<>( - getBytes("asdf"), - new ContextualRecord("qwer".getBytes(UTF_8), - new ProcessorRecordContext(1, - 1, - 0, - "changelog-topic", - new RecordHeaders()))) + new Eviction<>( + "zxcv", + "3o4im", + new ProcessorRecordContext(2L, 2, 0, "changelog-topic", new RecordHeaders())), + new Eviction<>( + "asdf", + "qwer", + new ProcessorRecordContext(1L, 1, 0, "changelog-topic", new RecordHeaders())) ))); cleanup(context, buffer); @@ -460,14 +418,14 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer> @Test public void shouldRestoreNewFormat() { - final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); + final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName); final MockInternalProcessorContext context = makeContext(); buffer.init(context, buffer); final RecordBatchingStateRestoreCallback stateRestoreCallback = (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName); - context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "")); + context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null)); final RecordHeaders v1FlagHeaders = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})}); @@ -533,7 +491,7 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer> // flush the buffer into a list in buffer order so we can make assertions about the contents. - final List<KeyValue<Bytes, ContextualRecord>> evicted = new LinkedList<>(); + final List<Eviction<String, String>> evicted = new LinkedList<>(); buffer.evictWhile(() -> true, evicted::add); // Several things to note: @@ -541,41 +499,33 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer> // * The record timestamps are properly restored, and not conflated with the record's buffer time. // * The keys and values are properly restored // * The record topic is set to the original input topic, *not* the changelog topic - // * The record offset preserves the origininal input record's offset, *not* the offset of the changelog record + // * The record offset preserves the original input record's offset, *not* the offset of the changelog record assertThat(evicted, is(asList( - new KeyValue<>( - getBytes("zxcv"), - new ContextualRecord("3o4im".getBytes(UTF_8), - new ProcessorRecordContext(2, - 0, - 0, - "topic", - null))), - new KeyValue<>( - getBytes("asdf"), - new ContextualRecord("qwer".getBytes(UTF_8), - new ProcessorRecordContext(1, - 0, - 0, - "topic", - null))) - ))); + new Eviction<>( + "zxcv", + "3o4im", + getContext(2L)), + new Eviction<>( + "asdf", + "qwer", + getContext(1L) + )))); cleanup(context, buffer); } @Test public void shouldNotRestoreUnrecognizedVersionRecord() { - final TimeOrderedKeyValueBuffer buffer = bufferSupplier.apply(testName); + final TimeOrderedKeyValueBuffer<String, String> buffer = bufferSupplier.apply(testName); final MockInternalProcessorContext context = makeContext(); buffer.init(context, buffer); final RecordBatchingStateRestoreCallback stateRestoreCallback = (RecordBatchingStateRestoreCallback) context.stateRestoreCallback(testName); - context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "")); + context.setRecordContext(new ProcessorRecordContext(0, 0, 0, "", null)); final RecordHeaders unknownFlagHeaders = new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) -1})}); @@ -601,4 +551,26 @@ public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer> cleanup(context, buffer); } } + + private static void putRecord(final TimeOrderedKeyValueBuffer<String, String> buffer, + final MockInternalProcessorContext context, + final long streamTime, + final long recordTimestamp, + final String key, + final String value) { + final ProcessorRecordContext recordContext = getContext(recordTimestamp); + context.setRecordContext(recordContext); + buffer.put(streamTime, key, value, recordContext); + } + + private static ContextualRecord getRecord(final String value, final long timestamp) { + return new ContextualRecord( + value.getBytes(UTF_8), + getContext(timestamp) + ); + } + + private static ProcessorRecordContext getContext(final long recordTimestamp) { + return new ProcessorRecordContext(recordTimestamp, 0, 0, "topic", null); + } }