This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a65940c  MINOR: clarify why suppress can sometimes drop tombstones 
(#6195)
a65940c is described below

commit a65940cd820c68ee9613c7ff54676560cd01f88a
Author: John Roesler <vvcep...@users.noreply.github.com>
AuthorDate: Fri Jan 25 15:17:39 2019 -0600

    MINOR: clarify why suppress can sometimes drop tombstones (#6195)
    
    Reviewers: Jonathan Gordon, Matthias J. Sax <matth...@confluent.io>, 
Guozhang Wang <wangg...@gmail.com>
---
 .../suppress/KTableSuppressProcessor.java          |  6 ++--
 .../internals/suppress/SuppressedInternal.java     | 31 ++++++++++++++------
 .../suppress/KTableSuppressProcessorTest.java      | 33 ++++++++++++++++++----
 3 files changed, 53 insertions(+), 17 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
index 06d5004..813c558 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
@@ -42,7 +42,7 @@ public class KTableSuppressProcessor<K, V> implements 
Processor<K, Change<V>> {
     private final long suppressDurationMillis;
     private final TimeDefinition<K> bufferTimeDefinition;
     private final BufferFullStrategy bufferFullStrategy;
-    private final boolean shouldSuppressTombstones;
+    private final boolean safeToDropTombstones;
     private final String storeName;
 
     private TimeOrderedKeyValueBuffer buffer;
@@ -64,7 +64,7 @@ public class KTableSuppressProcessor<K, V> implements 
Processor<K, Change<V>> {
         suppressDurationMillis = suppress.timeToWaitForMoreEvents().toMillis();
         bufferTimeDefinition = suppress.timeDefinition();
         bufferFullStrategy = suppress.bufferConfig().bufferFullStrategy();
-        shouldSuppressTombstones = suppress.shouldSuppressTombstones();
+        safeToDropTombstones = suppress.safeToDropTombstones();
     }
 
     @SuppressWarnings("unchecked")
@@ -136,7 +136,7 @@ public class KTableSuppressProcessor<K, V> implements 
Processor<K, Change<V>> {
     }
 
     private boolean shouldForward(final Change<V> value) {
-        return !(value.newValue == null && shouldSuppressTombstones);
+        return value.newValue != null || !safeToDropTombstones;
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
index 042a81a..c387700 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
@@ -30,23 +30,36 @@ public class SuppressedInternal<K> implements Suppressed<K> 
{
     private final BufferConfigInternal bufferConfig;
     private final Duration timeToWaitForMoreEvents;
     private final TimeDefinition<K> timeDefinition;
-    private final boolean suppressTombstones;
+    private final boolean safeToDropTombstones;
 
+    /**
+     * @param safeToDropTombstones Note: it's *only* safe to drop tombstones 
for windowed KTables in "final results" mode.
+     *                             In that case, we have a priori knowledge 
that we have never before emitted any
+     *                             results for a given key, and therefore the 
tombstone is unnecessary (albeit
+     *                             idempotent and correct). We decided that 
the unnecessary tombstones would not be
+     *                             desirable in the output stream, though, 
hence the ability to drop them.
+     *
+     *                             A alternative is to remember whether a 
result has previously been emitted
+     *                             for a key and drop tombstones in that case, 
but it would be a little complicated to
+     *                             figure out when to forget the fact that we 
have emitted some result (currently, the
+     *                             buffer immediately forgets all about a key 
when we emit, which helps to keep it
+     *                             compact).
+     */
     public SuppressedInternal(final String name,
                               final Duration suppressionTime,
                               final BufferConfig bufferConfig,
                               final TimeDefinition<K> timeDefinition,
-                              final boolean suppressTombstones) {
+                              final boolean safeToDropTombstones) {
         this.name = name;
         this.timeToWaitForMoreEvents = suppressionTime == null ? 
DEFAULT_SUPPRESSION_TIME : suppressionTime;
         this.timeDefinition = timeDefinition == null ? 
TimeDefinitions.RecordTimeDefintion.instance() : timeDefinition;
         this.bufferConfig = bufferConfig == null ? DEFAULT_BUFFER_CONFIG : 
(BufferConfigInternal) bufferConfig;
-        this.suppressTombstones = suppressTombstones;
+        this.safeToDropTombstones = safeToDropTombstones;
     }
 
     @Override
     public Suppressed<K> withName(final String name) {
-        return new SuppressedInternal<>(name, timeToWaitForMoreEvents, 
bufferConfig, timeDefinition, suppressTombstones);
+        return new SuppressedInternal<>(name, timeToWaitForMoreEvents, 
bufferConfig, timeDefinition, safeToDropTombstones);
     }
 
     public String name() {
@@ -65,8 +78,8 @@ public class SuppressedInternal<K> implements Suppressed<K> {
         return timeToWaitForMoreEvents == null ? Duration.ZERO : 
timeToWaitForMoreEvents;
     }
 
-    boolean shouldSuppressTombstones() {
-        return suppressTombstones;
+    boolean safeToDropTombstones() {
+        return safeToDropTombstones;
     }
 
     @Override
@@ -78,7 +91,7 @@ public class SuppressedInternal<K> implements Suppressed<K> {
             return false;
         }
         final SuppressedInternal<?> that = (SuppressedInternal<?>) o;
-        return suppressTombstones == that.suppressTombstones &&
+        return safeToDropTombstones == that.safeToDropTombstones &&
             Objects.equals(name, that.name) &&
             Objects.equals(bufferConfig, that.bufferConfig) &&
             Objects.equals(timeToWaitForMoreEvents, 
that.timeToWaitForMoreEvents) &&
@@ -87,7 +100,7 @@ public class SuppressedInternal<K> implements Suppressed<K> {
 
     @Override
     public int hashCode() {
-        return Objects.hash(name, bufferConfig, timeToWaitForMoreEvents, 
timeDefinition, suppressTombstones);
+        return Objects.hash(name, bufferConfig, timeToWaitForMoreEvents, 
timeDefinition, safeToDropTombstones);
     }
 
     @Override
@@ -96,7 +109,7 @@ public class SuppressedInternal<K> implements Suppressed<K> {
             ", bufferConfig=" + bufferConfig +
             ", timeToWaitForMoreEvents=" + timeToWaitForMoreEvents +
             ", timeDefinition=" + timeDefinition +
-            ", suppressTombstones=" + suppressTombstones +
+            ", safeToDropTombstones=" + safeToDropTombstones +
             '}';
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
index 335fae1..0f2b36b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
@@ -255,8 +255,12 @@ public class KTableSuppressProcessorTest {
         assertThat(capturedForward.timestamp(), is(timestamp));
     }
 
+    /**
+     * It's desirable to drop tombstones for final-results windowed streams, 
since (as described in the
+     * {@link SuppressedInternal} javadoc), they are unnecessary to emit.
+     */
     @Test
-    public void finalResultsShouldSuppressTombstonesForTimeWindows() {
+    public void finalResultsShouldDropTombstonesForTimeWindows() {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(finalResults(ofMillis(0L)), 
timeWindowedSerdeFrom(String.class, 100L), Long());
         final MockInternalProcessorContext context = harness.context;
@@ -272,8 +276,13 @@ public class KTableSuppressProcessorTest {
         assertThat(context.forwarded(), hasSize(0));
     }
 
+
+    /**
+     * It's desirable to drop tombstones for final-results windowed streams, 
since (as described in the
+     * {@link SuppressedInternal} javadoc), they are unnecessary to emit.
+     */
     @Test
-    public void finalResultsShouldSuppressTombstonesForSessionWindows() {
+    public void finalResultsShouldDropTombstonesForSessionWindows() {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(finalResults(ofMillis(0L)), 
sessionWindowedSerdeFrom(String.class), Long());
         final MockInternalProcessorContext context = harness.context;
@@ -289,8 +298,12 @@ public class KTableSuppressProcessorTest {
         assertThat(context.forwarded(), hasSize(0));
     }
 
+    /**
+     * It's NOT OK to drop tombstones for non-final-results windowed streams, 
since we may have emitted some results for
+     * the window before getting the tombstone (see the {@link 
SuppressedInternal} javadoc).
+     */
     @Test
-    public void suppressShouldNotSuppressTombstonesForTimeWindows() {
+    public void suppressShouldNotDropTombstonesForTimeWindows() {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), 
timeWindowedSerdeFrom(String.class, 100L), Long());
         final MockInternalProcessorContext context = harness.context;
@@ -309,8 +322,13 @@ public class KTableSuppressProcessorTest {
         assertThat(capturedForward.timestamp(), is(timestamp));
     }
 
+
+    /**
+     * It's NOT OK to drop tombstones for non-final-results windowed streams, 
since we may have emitted some results for
+     * the window before getting the tombstone (see the {@link 
SuppressedInternal} javadoc).
+     */
     @Test
-    public void suppressShouldNotSuppressTombstonesForSessionWindows() {
+    public void suppressShouldNotDropTombstonesForSessionWindows() {
         final Harness<Windowed<String>, Long> harness =
             new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), 
sessionWindowedSerdeFrom(String.class), Long());
         final MockInternalProcessorContext context = harness.context;
@@ -329,8 +347,13 @@ public class KTableSuppressProcessorTest {
         assertThat(capturedForward.timestamp(), is(timestamp));
     }
 
+
+    /**
+     * It's SUPER NOT OK to drop tombstones for non-windowed streams, since we 
may have emitted some results for
+     * the key before getting the tombstone (see the {@link 
SuppressedInternal} javadoc).
+     */
     @Test
-    public void suppressShouldNotSuppressTombstonesForKTable() {
+    public void suppressShouldNotDropTombstonesForKTable() {
         final Harness<String, Long> harness =
             new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), 
String(), Long());
         final MockInternalProcessorContext context = harness.context;

Reply via email to