[ https://issues.apache.org/jira/browse/KAFKA-7223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627888#comment-16627888 ]
ASF GitHub Bot commented on KAFKA-7223: --------------------------------------- vvcephei closed pull request #5688: KAFKA-7223: Part 3 preview URL: https://github.com/apache/kafka/pull/5688 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java index 7488ef6ff37..49fe96ba20c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java @@ -155,6 +155,6 @@ static StrictBufferConfig unbounded() { * @return a suppression configuration */ static <K> Suppressed<K> untilTimeLimit(final Duration timeToWaitForMoreEvents, final BufferConfig bufferConfig) { - return new SuppressedImpl<>(timeToWaitForMoreEvents, bufferConfig, null); + return new SuppressedImpl<>(timeToWaitForMoreEvents, bufferConfig, null, false); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java new file mode 100644 index 00000000000..8a2e619b7b5 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.ByteBufferDeserializer; +import org.apache.kafka.common.serialization.ByteBufferSerializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; + +import java.nio.ByteBuffer; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +public class FullChangeSerde<T> implements Serde<Change<T>> { + private final Serde<T> inner; + + public FullChangeSerde(final Serde<T> inner) { + this.inner = requireNonNull(inner); + } + + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + inner.configure(configs, isKey); + } + + @Override + public void close() { + inner.close(); + } + + @Override + public Serializer<Change<T>> serializer() { + final Serializer<T> innerSerializer = inner.serializer(); + final ByteBufferSerializer byteBufferSerializer = new ByteBufferSerializer(); + + return new Serializer<Change<T>>() { + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + innerSerializer.configure(configs, isKey); + } + + @Override + public byte[] serialize(final String topic, final Change<T> data) { + if (data == null) { + return null; + } + final byte[] oldBytes = data.oldValue == null ? null : innerSerializer.serialize(topic, data.oldValue); + final int oldSize = oldBytes == null ? -1 : oldBytes.length; + final byte[] newBytes = data.newValue == null ? null : innerSerializer.serialize(topic, data.newValue); + final int newSize = newBytes == null ? -1 : newBytes.length; + + final ByteBuffer buffer = ByteBuffer.allocate( + 4 + (oldSize == -1 ? 0 : oldSize) + 4 + (newSize == -1 ? 0 : newSize) + ); + buffer.putInt(oldSize); + if (oldBytes != null) { + buffer.put(oldBytes); + } + buffer.putInt(newSize); + if (newBytes != null) { + buffer.put(newBytes); + } + return byteBufferSerializer.serialize(null, buffer); + } + + @Override + public void close() { + innerSerializer.close(); + } + }; + } + + @Override + public Deserializer<Change<T>> deserializer() { + final Deserializer<T> innerDeserializer = inner.deserializer(); + final ByteBufferDeserializer byteBufferDeserializer = new ByteBufferDeserializer(); + return new Deserializer<Change<T>>() { + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + innerDeserializer.configure(configs, isKey); + } + + @Override + public Change<T> deserialize(final String topic, final byte[] data) { + if (data == null) { + return null; + } + final ByteBuffer buffer = byteBufferDeserializer.deserialize(null, data); + + final int oldSize = buffer.getInt(); + final byte[] oldBytes = oldSize == -1 ? null : new byte[oldSize]; + if (oldBytes != null) { + buffer.get(oldBytes); + } + final T oldValue = oldBytes == null ? null : innerDeserializer.deserialize(topic, oldBytes); + + final int newSize = buffer.getInt(); + final byte[] newBytes = newSize == -1 ? null : new byte[newSize]; + if (newBytes != null) { + buffer.get(newBytes); + } + final T newValue = newBytes == null ? null : innerDeserializer.deserialize(topic, newBytes); + return new Change<>(newValue, oldValue); + } + + @Override + public void close() { + innerDeserializer.close(); + } + }; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java index a191c5ad19c..be60642a75f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java @@ -67,7 +67,9 @@ <KR, T> KTable<KR, T> build(final KStreamAggProcessorSupplier<K, KR, V, T> aggregateSupplier, final String functionName, final StoreBuilder<? extends StateStore> storeBuilder, - final boolean isQueryable) { + final boolean isQueryable, + final Serde<KR> keySerde, + final Serde<T> valueSerde) { final String aggFunctionName = builder.newProcessorName(functionName); @@ -98,6 +100,8 @@ return new KTableImpl<>(builder, aggFunctionName, aggregateSupplier, + keySerde, + valueSerde, sourceName.equals(this.name) ? sourceNodes : Collections.singleton(sourceName), storeBuilder.name(), isQueryable, diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java index 5d4f9f3dd3e..339e723dd68 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java @@ -92,7 +92,8 @@ return doAggregate( new KStreamReduce<>(materializedInternal.storeName(), reducer), REDUCE_NAME, - materializedInternal + materializedInternal, + materializedInternal.keySerde() ); } @@ -114,7 +115,8 @@ return doAggregate( new KStreamAggregate<>(materializedInternal.storeName(), initializer, aggregator), AGGREGATE_NAME, - materializedInternal + materializedInternal, + materializedInternal.keySerde() ); } @@ -156,7 +158,9 @@ return doAggregate( new KStreamAggregate<>(materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator), AGGREGATE_NAME, - materializedInternal); + materializedInternal, + materializedInternal.keySerde() + ); } @Override @@ -191,12 +195,15 @@ private <KR, T> KTable<KR, T> doAggregate(final KStreamAggProcessorSupplier<K, KR, V, T> aggregateSupplier, final String functionName, - final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materializedInternal) { + final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materializedInternal, + final Serde<KR> resultKeySerde) { return aggregateBuilder.build( aggregateSupplier, functionName, new KeyValueStoreMaterializer<>(materializedInternal).materialize(), - materializedInternal.isQueryable() + materializedInternal.isQueryable(), + resultKeySerde, + materializedInternal.valueSerde() ); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java index 08fb605ef99..911704a0120 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java @@ -93,6 +93,8 @@ return new KTableImpl<>(builder, funcName, aggregateSupplier, + materialized.keySerde(), + materialized.valueSerde(), Collections.singleton(sourceName), materialized.storeName(), materialized.isQueryable(), diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 2330fad1b16..9e35c3ae3e8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -365,7 +365,11 @@ public String queryableStoreName() { final String name = builder.newProcessorName(SUPPRESS_NAME); final ProcessorSupplier<K, Change<V>> suppressionSupplier = - () -> new KTableSuppressProcessor<>(buildSuppress(suppressed)); + () -> new KTableSuppressProcessor<>( + buildSuppress(suppressed), + keySerde, + valSerde == null ? null : new FullChangeSerde<>(valSerde) + ); final ProcessorParameters<K, Change<V>> processorParameters = new ProcessorParameters<>( suppressionSupplier, diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java index 98076e068ac..f9c64edf98b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java @@ -28,6 +28,7 @@ import org.apache.kafka.streams.kstream.SessionWindowedKStream; import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.WindowedSerdes; import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode; import org.apache.kafka.streams.state.SessionBytesStoreSupplier; import org.apache.kafka.streams.state.SessionStore; @@ -100,7 +101,9 @@ ), AGGREGATE_NAME, materialize(materializedInternal), - materializedInternal.isQueryable() + materializedInternal.isQueryable(), + getWindowedSerde(materializedInternal.keySerde()), + materializedInternal.valueSerde() ); } @@ -134,7 +137,9 @@ ), REDUCE_NAME, materialize(materializedInternal), - materializedInternal.isQueryable() + materializedInternal.isQueryable(), + getWindowedSerde(materializedInternal.keySerde()), + materializedInternal.valueSerde() ); } @@ -170,7 +175,9 @@ ), AGGREGATE_NAME, materialize(materializedInternal), - materializedInternal.isQueryable() + materializedInternal.isQueryable(), + getWindowedSerde(materializedInternal.keySerde()), + materializedInternal.valueSerde() ); } @@ -221,4 +228,8 @@ private Aggregator<K, V, V> aggregatorForReducer(final Reducer<V> reducer) { return (aggKey, value, aggregate) -> aggregate == null ? value : reducer.apply(aggregate, value); } + + private static <T> Serde<Windowed<T>> getWindowedSerde(final Serde<T> rawSerde) { + return rawSerde == null ? null : new WindowedSerdes.TimeWindowedSerde<>(rawSerde); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java index 5c5cfb2bed7..cf240008cf1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.kstream.TimeWindowedKStream; import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.WindowedSerdes; import org.apache.kafka.streams.kstream.Windows; import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode; import org.apache.kafka.streams.state.StoreBuilder; @@ -100,11 +101,14 @@ ), AGGREGATE_NAME, materialize(materializedInternal), - materializedInternal.isQueryable() + materializedInternal.isQueryable(), + getWindowedSerde(materializedInternal.keySerde()), + materializedInternal.valueSerde() ); } + @Override public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator) { @@ -132,7 +136,10 @@ ), AGGREGATE_NAME, materialize(materializedInternal), - materializedInternal.isQueryable()); + materializedInternal.isQueryable(), + getWindowedSerde(materializedInternal.keySerde()), + materializedInternal.valueSerde() + ); } @Override @@ -159,7 +166,9 @@ new KStreamWindowReduce<>(windows, materializedInternal.storeName(), reducer), REDUCE_NAME, materialize(materializedInternal), - materializedInternal.isQueryable() + materializedInternal.isQueryable(), + getWindowedSerde(materializedInternal.keySerde()), + materializedInternal.valueSerde() ); } @@ -226,4 +235,8 @@ } return builder; } + + private static <T> Serde<Windowed<T>> getWindowedSerde(final Serde<T> rawSerde) { + return rawSerde == null ? null : new WindowedSerdes.TimeWindowedSerde<>(rawSerde); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigImpl.java index e731dc6f5e1..027e6bc6e13 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigImpl.java @@ -21,7 +21,7 @@ import static org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN; abstract class BufferConfigImpl<BC extends Suppressed.BufferConfig<BC>> implements Suppressed.BufferConfig<BC> { - public abstract long maxKeys(); + public abstract long maxRecords(); public abstract long maxBytes(); @@ -39,12 +39,12 @@ @Override public Suppressed.StrictBufferConfig shutDownWhenFull() { - return new StrictBufferConfigImpl(maxKeys(), maxBytes(), SHUT_DOWN); + return new StrictBufferConfigImpl(maxRecords(), maxBytes(), SHUT_DOWN); } @Override public Suppressed.BufferConfig emitEarlyWhenFull() { - return new EagerBufferConfigImpl(maxKeys(), maxBytes()); + return new EagerBufferConfigImpl(maxRecords(), maxBytes()); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/ContextualRecord.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/ContextualRecord.java new file mode 100644 index 00000000000..f50f45c723e --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/ContextualRecord.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.suppress; + +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; + +class ContextualRecord<V> { + private final long time; + private final V value; + private final ProcessorRecordContext recordContext; + + ContextualRecord(final long time, final V value, final ProcessorRecordContext recordContext) { + this.time = time; + this.value = value; + this.recordContext = recordContext; + } + + long time() { + return time; + } + + ProcessorRecordContext recordContext() { + return recordContext; + } + + V value() { + return value; + } + + @Override + public String toString() { + return "ContextualRecord{value=" + value + ", time=" + time + '}'; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java index 0c2c883e18a..59256fec349 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java @@ -22,11 +22,11 @@ public class EagerBufferConfigImpl extends BufferConfigImpl { - private final long maxKeys; + private final long maxRecords; private final long maxBytes; - public EagerBufferConfigImpl(final long maxKeys, final long maxBytes) { - this.maxKeys = maxKeys; + public EagerBufferConfigImpl(final long maxRecords, final long maxBytes) { + this.maxRecords = maxRecords; this.maxBytes = maxBytes; } @@ -37,12 +37,12 @@ public EagerBufferConfigImpl(final long maxKeys, final long maxBytes) { @Override public Suppressed.BufferConfig withMaxBytes(final long byteLimit) { - return new EagerBufferConfigImpl(maxKeys, byteLimit); + return new EagerBufferConfigImpl(maxRecords, byteLimit); } @Override - public long maxKeys() { - return maxKeys; + public long maxRecords() { + return maxRecords; } @Override @@ -60,17 +60,17 @@ public boolean equals(final Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; final EagerBufferConfigImpl that = (EagerBufferConfigImpl) o; - return maxKeys == that.maxKeys && + return maxRecords == that.maxRecords && maxBytes == that.maxBytes; } @Override public int hashCode() { - return Objects.hash(maxKeys, maxBytes); + return Objects.hash(maxRecords, maxBytes); } @Override public String toString() { - return "EagerBufferConfigImpl{maxKeys=" + maxKeys + ", maxBytes=" + maxBytes + '}'; + return "EagerBufferConfigImpl{maxKeys=" + maxRecords + ", maxBytes=" + maxBytes + '}'; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java index 548f5991dbb..db09307d48c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java @@ -34,7 +34,8 @@ public FinalResultsSuppressionBuilder(final Suppressed.StrictBufferConfig buffer return new SuppressedImpl<>( gracePeriod, bufferConfig, - (ProcessorContext context, K key) -> key.window().end() + (ProcessorContext context, K key) -> key.window().end(), + true ); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/InMemoryTimeOrderedKeyValueBuffer.java new file mode 100644 index 00000000000..4d1880b6b5e --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/InMemoryTimeOrderedKeyValueBuffer.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.suppress; + +import org.apache.kafka.streams.KeyValue; + +import java.util.ConcurrentModificationException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; + +class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrderedKeyValueBuffer<K, V> { + private long memBufferSize = 0; + private final Map<K, TimeKey<K>> index = new HashMap<>(); + private final TreeMap<TimeKey<K>, SizedValue<V>> sortedMap = new TreeMap<>(); + + private static class SizedValue<V> { + private final V v; + private final long size; + + private SizedValue(final V v, final long size) { + this.v = v; + this.size = size; + } + + public V value() { + return v; + } + + public long size() { + return size; + } + + @Override + public String toString() { + return "SizedValue{v=" + v + ", size=" + size + '}'; + } + } + + private class InnerIterator implements Iterator<KeyValue<TimeKey<K>, V>> { + private final Iterator<Map.Entry<TimeKey<K>, SizedValue<V>>> delegate = sortedMap.entrySet().iterator(); + private Map.Entry<TimeKey<K>, SizedValue<V>> last; + + @Override + public boolean hasNext() { + return delegate.hasNext(); + } + + @Override + public KeyValue<TimeKey<K>, V> next() { + final Map.Entry<TimeKey<K>, SizedValue<V>> next = delegate.next(); + this.last = next; + return new KeyValue<>(next.getKey(), next.getValue().value()); + } + + @Override + public void remove() { + delegate.remove(); + index.remove(last.getKey().key()); + memBufferSize = memBufferSize - last.getValue().size(); + } + } + + @Override + public Iterator<KeyValue<TimeKey<K>, V>> iterator() { + return new InnerIterator(); + } + + @Override + public void put(final long time, + final K key, + final V value, + final long recordSize) { + final TimeKey<K> previousKey = index.get(key); + // non-resetting semantics: + // if there was a previous version of the same record, insert the new record in the same place in the priority queue + final TimeKey<K> timeKey = previousKey != null ? previousKey : new TimeKey<>(time, key); + + final TimeKey<K> replaced = index.put(key, timeKey); + + if (!Objects.equals(previousKey, replaced)) { + throw new ConcurrentModificationException( + String.format("expected to replace [%s], but got [%s]", previousKey, replaced) + ); + } + + final SizedValue<V> removedValue = replaced == null ? null : sortedMap.remove(replaced); + final SizedValue<V> previousValue = sortedMap.put(timeKey, new SizedValue<>(value, recordSize)); + memBufferSize = + memBufferSize + + recordSize + - (removedValue == null ? 0 : removedValue.size()) + - (previousValue == null ? 0 : previousValue.size()); + } + + @Override + public int numRecords() { + return index.size(); + } + + @Override + public long bufferSize() { + return memBufferSize; + } + + @Override + public String toString() { + return "InMemoryTimeOrderedKeyValueBuffer{" + + ", \n\tmemBufferSize=" + memBufferSize + + ", \n\tindex=" + index + + ", \n\tsortedMap=" + sortedMap + + '\n' + '}'; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java index f65f2b4af20..16901d778bc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java @@ -16,32 +16,91 @@ */ package org.apache.kafka.streams.kstream.internals.suppress; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.FullChangeSerde; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorNode; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import java.time.Duration; +import java.util.Iterator; + +import static java.util.Objects.requireNonNull; public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> { private final SuppressedImpl<K> suppress; + private final long maxRecords; + private final long maxBytes; + private final Duration suppressDuration; private InternalProcessorContext internalProcessorContext; + private TimeOrderedKeyValueBuffer<K, ContextualRecord<Change<V>>> buffer; + + private ProcessorNode myNode; + private Serde<K> keySerde; + private Serde<Change<V>> valueSerde; + + public KTableSuppressProcessor(final SuppressedImpl<K> suppress, + final Serde<K> keySerde, + final Serde<Change<V>> valueSerde) { + this.suppress = requireNonNull(suppress); + this.keySerde = keySerde; + this.valueSerde = valueSerde; + maxRecords = suppress.getBufferConfig().maxRecords(); + maxBytes = suppress.getBufferConfig().maxBytes(); + suppressDuration = suppress.getTimeToWaitForMoreEvents(); + } - public KTableSuppressProcessor(final SuppressedImpl<K> suppress) { - this.suppress = suppress; + private void ensureBuffer() { + if (buffer == null) { + buffer = new InMemoryTimeOrderedKeyValueBuffer<>(); + + internalProcessorContext.schedule( + 1L, + PunctuationType.STREAM_TIME, + this::evictOldEnoughRecords + ); + } } @Override public void init(final ProcessorContext context) { internalProcessorContext = (InternalProcessorContext) context; + myNode = internalProcessorContext.currentNode(); + this.keySerde = keySerde == null ? castSerde(context.keySerde(), "key") : keySerde; + this.valueSerde = + valueSerde == null ? new FullChangeSerde<>(castSerde(context.valueSerde(), "value")) : valueSerde; } @Override public void process(final K key, final Change<V> value) { if (suppress.getTimeToWaitForMoreEvents() == Duration.ZERO && definedRecordTime(key) <= internalProcessorContext.streamTime()) { - internalProcessorContext.forward(key, value); + if (shouldForward(value)) { + internalProcessorContext.forward(key, value); + } // else skip } else { - throw new NotImplementedException(); + ensureBuffer(); + buffer(key, value); + enforceSizeBound(); + } + } + + @SuppressWarnings("unchecked") + private <T> Serde<T> castSerde(final Serde<?> untyped, final String keyOrValue) { + try { + return (Serde<T>) untyped; + } catch (final ClassCastException e) { + throw new TopologyException( + "Unable to use the configured default " + keyOrValue + + " serde. Ensure that the correct " + keyOrValue + " serde is set upstream of " + myNode + ".", + e + ); } } @@ -49,18 +108,98 @@ private long definedRecordTime(final K key) { return suppress.getTimeDefinition().time(internalProcessorContext, key); } + private void buffer(final K key, final Change<V> value) { + final long time = suppress.getTimeDefinition().time(internalProcessorContext, key); + final ProcessorRecordContext recordContext = internalProcessorContext.recordContext(); + + buffer.put(time, key, new ContextualRecord<>(time, value, recordContext), computeRecordSize(key, value, recordContext)); + } + + private void evictOldEnoughRecords(final long streamTime) { + System.out.println("st: " + streamTime); + final Iterator<KeyValue<TimeKey<K>, ContextualRecord<Change<V>>>> iterator = buffer.iterator(); + while (iterator.hasNext()) { + final KeyValue<TimeKey<K>, ContextualRecord<Change<V>>> next = iterator.next(); + final long bufferTime = next.key.time(); + if (bufferTime <= streamTime - suppressDuration.toMillis()) { + final K key = next.key.key(); + final ContextualRecord<Change<V>> contextualRecord = next.value; + + setNodeAndForward(key, contextualRecord); + iterator.remove(); + } else { + break; + } + } + System.out.println(buffer); + } + + private void enforceSizeBound() { + if (overCapacity()) { + switch (suppress.getBufferConfig().bufferFullStrategy()) { + case EMIT: + final Iterator<KeyValue<TimeKey<K>, ContextualRecord<Change<V>>>> iterator = buffer.iterator(); + while (overCapacity() && iterator.hasNext()) { + final KeyValue<TimeKey<K>, ContextualRecord<Change<V>>> next = iterator.next(); + final K key = next.key.key(); + final ContextualRecord<Change<V>> contextualRecord = next.value; + // to be safe, forward before we delete + setNodeAndForward(key, contextualRecord); + iterator.remove(); + } + return; + case SHUT_DOWN: + throw new RuntimeException("TODO: request graceful shutdown"); // TODO: request graceful shutdown + } + } + } + + private boolean overCapacity() { + return buffer.numRecords() > maxRecords || buffer.bufferSize() > maxBytes; + } + + private void setNodeAndForward(final K key, final ContextualRecord<Change<V>> contextualRecord) { + if (shouldForward(contextualRecord.value())) { + final ProcessorNode prevNode = internalProcessorContext.currentNode(); + final ProcessorRecordContext prevRecordContext = internalProcessorContext.recordContext(); + internalProcessorContext.setRecordContext(contextualRecord.recordContext()); + internalProcessorContext.setCurrentNode(myNode); + try { + internalProcessorContext.forward(key, contextualRecord.value()); + } finally { + internalProcessorContext.setCurrentNode(prevNode); + internalProcessorContext.setRecordContext(prevRecordContext); + } + } + } + + private boolean shouldForward(final Change<V> value) { + return !(value.newValue == null && suppress.shouldSuppressTombstones()); + } + + private long computeRecordSize(final K key, final Change<V> value, final ProcessorRecordContext recordContext) { + long size = 0L; + size += keySerde.serializer().serialize(null, key).length; + size += valueSerde.serializer().serialize(null, value).length; + size += 8; // timestamp + size += 8; // offset + size += 4; // partition + size += recordContext.topic().toCharArray().length; + if (recordContext.headers() != null) { + for (final Header header : recordContext.headers()) { + size += header.key().toCharArray().length; + size += header.value().length; + } + } + return size; + } + @Override public void close() { } @Override public String toString() { - return "KTableSuppressProcessor{suppress=" + suppress + '}'; - } - - static class NotImplementedException extends RuntimeException { - NotImplementedException() { - super(); - } + return "KTableSuppressProcessor{suppress=" + suppress + ", keySerde=" + keySerde + ", valueSerde=" + valueSerde + '}'; } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java index 0634a748a5b..76aa6acf71a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java @@ -24,20 +24,20 @@ public class StrictBufferConfigImpl extends BufferConfigImpl<Suppressed.StrictBufferConfig> implements Suppressed.StrictBufferConfig { - private final long maxKeys; + private final long maxRecords; private final long maxBytes; private final BufferFullStrategy bufferFullStrategy; - public StrictBufferConfigImpl(final long maxKeys, + public StrictBufferConfigImpl(final long maxRecords, final long maxBytes, final BufferFullStrategy bufferFullStrategy) { - this.maxKeys = maxKeys; + this.maxRecords = maxRecords; this.maxBytes = maxBytes; this.bufferFullStrategy = bufferFullStrategy; } public StrictBufferConfigImpl() { - this.maxKeys = Long.MAX_VALUE; + this.maxRecords = Long.MAX_VALUE; this.maxBytes = Long.MAX_VALUE; this.bufferFullStrategy = SHUT_DOWN; } @@ -49,12 +49,12 @@ public StrictBufferConfigImpl() { @Override public Suppressed.StrictBufferConfig withMaxBytes(final long byteLimit) { - return new StrictBufferConfigImpl(maxKeys, byteLimit, bufferFullStrategy); + return new StrictBufferConfigImpl(maxRecords, byteLimit, bufferFullStrategy); } @Override - public long maxKeys() { - return maxKeys; + public long maxRecords() { + return maxRecords; } @Override @@ -72,19 +72,19 @@ public boolean equals(final Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; final StrictBufferConfigImpl that = (StrictBufferConfigImpl) o; - return maxKeys == that.maxKeys && + return maxRecords == that.maxRecords && maxBytes == that.maxBytes && bufferFullStrategy == that.bufferFullStrategy; } @Override public int hashCode() { - return Objects.hash(maxKeys, maxBytes, bufferFullStrategy); + return Objects.hash(maxRecords, maxBytes, bufferFullStrategy); } @Override public String toString() { - return "StrictBufferConfigImpl{maxKeys=" + maxKeys + + return "StrictBufferConfigImpl{maxKeys=" + maxRecords + ", maxBytes=" + maxBytes + ", bufferFullStrategy=" + bufferFullStrategy + '}'; } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedImpl.java index cffc42b66d5..f4241847b17 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedImpl.java @@ -17,29 +17,31 @@ package org.apache.kafka.streams.kstream.internals.suppress; import org.apache.kafka.streams.kstream.Suppressed; -import org.apache.kafka.streams.processor.ProcessorContext; import java.time.Duration; import java.util.Objects; public class SuppressedImpl<K> implements Suppressed<K> { private static final Duration DEFAULT_SUPPRESSION_TIME = Duration.ofMillis(Long.MAX_VALUE); - private static final StrictBufferConfig DEFAULT_BUFFER_CONFIG = BufferConfig.unbounded(); + private static final StrictBufferConfigImpl DEFAULT_BUFFER_CONFIG = (StrictBufferConfigImpl) BufferConfig.unbounded(); - private final BufferConfig bufferConfig; + private final BufferConfigImpl bufferConfig; private final Duration timeToWaitForMoreEvents; private final TimeDefinition<K> timeDefinition; + private final boolean suppressTombstones; public SuppressedImpl(final Duration suppressionTime, final BufferConfig bufferConfig, - final TimeDefinition<K> timeDefinition) { + final TimeDefinition<K> timeDefinition, + final boolean suppressTombstones) { this.timeToWaitForMoreEvents = suppressionTime == null ? DEFAULT_SUPPRESSION_TIME : suppressionTime; this.timeDefinition = timeDefinition == null ? (context, anyKey) -> context.timestamp() : timeDefinition; - this.bufferConfig = bufferConfig == null ? DEFAULT_BUFFER_CONFIG : bufferConfig; + this.bufferConfig = bufferConfig == null ? DEFAULT_BUFFER_CONFIG : (BufferConfigImpl) bufferConfig; + this.suppressTombstones = suppressTombstones; } - interface TimeDefinition<K> { - long time(final ProcessorContext context, final K key); + BufferConfigImpl getBufferConfig() { + return bufferConfig; } TimeDefinition<K> getTimeDefinition() { @@ -50,6 +52,10 @@ Duration getTimeToWaitForMoreEvents() { return timeToWaitForMoreEvents == null ? Duration.ZERO : timeToWaitForMoreEvents; } + boolean shouldSuppressTombstones() { + return suppressTombstones; + } + @Override public boolean equals(final Object o) { if (this == o) return true; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeDefinition.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeDefinition.java new file mode 100644 index 00000000000..47d35b234d9 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeDefinition.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.suppress; + +import org.apache.kafka.streams.processor.ProcessorContext; + +interface TimeDefinition<K> { + long time(final ProcessorContext context, final K key); +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeKey.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeKey.java new file mode 100644 index 00000000000..73ac5da5046 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeKey.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.suppress; + +import java.util.Objects; + +class TimeKey<K> implements Comparable<TimeKey<K>> { + private final long time; + private final K key; + + TimeKey(final long time, final K key) { + this.time = time; + this.key = key; + } + + K key() { + return key; + } + + long time() { + return time; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final TimeKey<?> timeKey = (TimeKey<?>) o; + return time == timeKey.time && + Objects.equals(key, timeKey.key); + } + + @Override + public int hashCode() { + return Objects.hash(time, key); + } + + @Override + public int compareTo(final TimeKey<K> o) { + // ordering of keys within a time uses hashCode. + final int timeComparison = Long.compare(time, o.time); + return timeComparison == 0 ? Integer.compare(key.hashCode(), o.key.hashCode()) : timeComparison; + } + + @Override + public String toString() { + return "TimeKey{time=" + time + ", key=" + key + '}'; + } + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeOrderedKeyValueBuffer.java new file mode 100644 index 00000000000..18f80966f6a --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeOrderedKeyValueBuffer.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.suppress; + +import org.apache.kafka.streams.KeyValue; + +import java.util.Iterator; + +interface TimeOrderedKeyValueBuffer<K, V> { + Iterator<KeyValue<TimeKey<K>, V>> iterator(); + + void put(final long time, final K key, final V value, final long recordSize); + + int numRecords(); + + long bufferSize(); +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KIP-328: Add in-memory Suppression > ---------------------------------- > > Key: KAFKA-7223 > URL: https://issues.apache.org/jira/browse/KAFKA-7223 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: John Roesler > Assignee: John Roesler > Priority: Major > > As described in > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables.] > > This ticket is to implement Suppress, but only for in-memory buffers. > (depends on KAFKA-7222) -- This message was sent by Atlassian JIRA (v7.6.3#76005)