This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new a65940c MINOR: clarify why suppress can sometimes drop tombstones (#6195) a65940c is described below commit a65940cd820c68ee9613c7ff54676560cd01f88a Author: John Roesler <vvcep...@users.noreply.github.com> AuthorDate: Fri Jan 25 15:17:39 2019 -0600 MINOR: clarify why suppress can sometimes drop tombstones (#6195) Reviewers: Jonathan Gordon, Matthias J. Sax <matth...@confluent.io>, Guozhang Wang <wangg...@gmail.com> --- .../suppress/KTableSuppressProcessor.java | 6 ++-- .../internals/suppress/SuppressedInternal.java | 31 ++++++++++++++------ .../suppress/KTableSuppressProcessorTest.java | 33 ++++++++++++++++++---- 3 files changed, 53 insertions(+), 17 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java index 06d5004..813c558 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java @@ -42,7 +42,7 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> { private final long suppressDurationMillis; private final TimeDefinition<K> bufferTimeDefinition; private final BufferFullStrategy bufferFullStrategy; - private final boolean shouldSuppressTombstones; + private final boolean safeToDropTombstones; private final String storeName; private TimeOrderedKeyValueBuffer buffer; @@ -64,7 +64,7 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> { suppressDurationMillis = suppress.timeToWaitForMoreEvents().toMillis(); bufferTimeDefinition = suppress.timeDefinition(); bufferFullStrategy = suppress.bufferConfig().bufferFullStrategy(); - shouldSuppressTombstones = suppress.shouldSuppressTombstones(); + safeToDropTombstones = suppress.safeToDropTombstones(); } @SuppressWarnings("unchecked") @@ -136,7 +136,7 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> { } private boolean shouldForward(final Change<V> value) { - return !(value.newValue == null && shouldSuppressTombstones); + return value.newValue != null || !safeToDropTombstones; } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java index 042a81a..c387700 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java @@ -30,23 +30,36 @@ public class SuppressedInternal<K> implements Suppressed<K> { private final BufferConfigInternal bufferConfig; private final Duration timeToWaitForMoreEvents; private final TimeDefinition<K> timeDefinition; - private final boolean suppressTombstones; + private final boolean safeToDropTombstones; + /** + * @param safeToDropTombstones Note: it's *only* safe to drop tombstones for windowed KTables in "final results" mode. + * In that case, we have a priori knowledge that we have never before emitted any + * results for a given key, and therefore the tombstone is unnecessary (albeit + * idempotent and correct). We decided that the unnecessary tombstones would not be + * desirable in the output stream, though, hence the ability to drop them. + * + * A alternative is to remember whether a result has previously been emitted + * for a key and drop tombstones in that case, but it would be a little complicated to + * figure out when to forget the fact that we have emitted some result (currently, the + * buffer immediately forgets all about a key when we emit, which helps to keep it + * compact). + */ public SuppressedInternal(final String name, final Duration suppressionTime, final BufferConfig bufferConfig, final TimeDefinition<K> timeDefinition, - final boolean suppressTombstones) { + final boolean safeToDropTombstones) { this.name = name; this.timeToWaitForMoreEvents = suppressionTime == null ? DEFAULT_SUPPRESSION_TIME : suppressionTime; this.timeDefinition = timeDefinition == null ? TimeDefinitions.RecordTimeDefintion.instance() : timeDefinition; this.bufferConfig = bufferConfig == null ? DEFAULT_BUFFER_CONFIG : (BufferConfigInternal) bufferConfig; - this.suppressTombstones = suppressTombstones; + this.safeToDropTombstones = safeToDropTombstones; } @Override public Suppressed<K> withName(final String name) { - return new SuppressedInternal<>(name, timeToWaitForMoreEvents, bufferConfig, timeDefinition, suppressTombstones); + return new SuppressedInternal<>(name, timeToWaitForMoreEvents, bufferConfig, timeDefinition, safeToDropTombstones); } public String name() { @@ -65,8 +78,8 @@ public class SuppressedInternal<K> implements Suppressed<K> { return timeToWaitForMoreEvents == null ? Duration.ZERO : timeToWaitForMoreEvents; } - boolean shouldSuppressTombstones() { - return suppressTombstones; + boolean safeToDropTombstones() { + return safeToDropTombstones; } @Override @@ -78,7 +91,7 @@ public class SuppressedInternal<K> implements Suppressed<K> { return false; } final SuppressedInternal<?> that = (SuppressedInternal<?>) o; - return suppressTombstones == that.suppressTombstones && + return safeToDropTombstones == that.safeToDropTombstones && Objects.equals(name, that.name) && Objects.equals(bufferConfig, that.bufferConfig) && Objects.equals(timeToWaitForMoreEvents, that.timeToWaitForMoreEvents) && @@ -87,7 +100,7 @@ public class SuppressedInternal<K> implements Suppressed<K> { @Override public int hashCode() { - return Objects.hash(name, bufferConfig, timeToWaitForMoreEvents, timeDefinition, suppressTombstones); + return Objects.hash(name, bufferConfig, timeToWaitForMoreEvents, timeDefinition, safeToDropTombstones); } @Override @@ -96,7 +109,7 @@ public class SuppressedInternal<K> implements Suppressed<K> { ", bufferConfig=" + bufferConfig + ", timeToWaitForMoreEvents=" + timeToWaitForMoreEvents + ", timeDefinition=" + timeDefinition + - ", suppressTombstones=" + suppressTombstones + + ", safeToDropTombstones=" + safeToDropTombstones + '}'; } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java index 335fae1..0f2b36b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java @@ -255,8 +255,12 @@ public class KTableSuppressProcessorTest { assertThat(capturedForward.timestamp(), is(timestamp)); } + /** + * It's desirable to drop tombstones for final-results windowed streams, since (as described in the + * {@link SuppressedInternal} javadoc), they are unnecessary to emit. + */ @Test - public void finalResultsShouldSuppressTombstonesForTimeWindows() { + public void finalResultsShouldDropTombstonesForTimeWindows() { final Harness<Windowed<String>, Long> harness = new Harness<>(finalResults(ofMillis(0L)), timeWindowedSerdeFrom(String.class, 100L), Long()); final MockInternalProcessorContext context = harness.context; @@ -272,8 +276,13 @@ public class KTableSuppressProcessorTest { assertThat(context.forwarded(), hasSize(0)); } + + /** + * It's desirable to drop tombstones for final-results windowed streams, since (as described in the + * {@link SuppressedInternal} javadoc), they are unnecessary to emit. + */ @Test - public void finalResultsShouldSuppressTombstonesForSessionWindows() { + public void finalResultsShouldDropTombstonesForSessionWindows() { final Harness<Windowed<String>, Long> harness = new Harness<>(finalResults(ofMillis(0L)), sessionWindowedSerdeFrom(String.class), Long()); final MockInternalProcessorContext context = harness.context; @@ -289,8 +298,12 @@ public class KTableSuppressProcessorTest { assertThat(context.forwarded(), hasSize(0)); } + /** + * It's NOT OK to drop tombstones for non-final-results windowed streams, since we may have emitted some results for + * the window before getting the tombstone (see the {@link SuppressedInternal} javadoc). + */ @Test - public void suppressShouldNotSuppressTombstonesForTimeWindows() { + public void suppressShouldNotDropTombstonesForTimeWindows() { final Harness<Windowed<String>, Long> harness = new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), timeWindowedSerdeFrom(String.class, 100L), Long()); final MockInternalProcessorContext context = harness.context; @@ -309,8 +322,13 @@ public class KTableSuppressProcessorTest { assertThat(capturedForward.timestamp(), is(timestamp)); } + + /** + * It's NOT OK to drop tombstones for non-final-results windowed streams, since we may have emitted some results for + * the window before getting the tombstone (see the {@link SuppressedInternal} javadoc). + */ @Test - public void suppressShouldNotSuppressTombstonesForSessionWindows() { + public void suppressShouldNotDropTombstonesForSessionWindows() { final Harness<Windowed<String>, Long> harness = new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), sessionWindowedSerdeFrom(String.class), Long()); final MockInternalProcessorContext context = harness.context; @@ -329,8 +347,13 @@ public class KTableSuppressProcessorTest { assertThat(capturedForward.timestamp(), is(timestamp)); } + + /** + * It's SUPER NOT OK to drop tombstones for non-windowed streams, since we may have emitted some results for + * the key before getting the tombstone (see the {@link SuppressedInternal} javadoc). + */ @Test - public void suppressShouldNotSuppressTombstonesForKTable() { + public void suppressShouldNotDropTombstonesForKTable() { final Harness<String, Long> harness = new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), String(), Long()); final MockInternalProcessorContext context = harness.context;