This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e9e34c1bd9 [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark
0e9e34c1bd9 is described below

commit 0e9e34c1bd9bd16ad5efca77ce2763eb950f3103
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Sat Apr 8 22:44:53 2023 +0900

    [SPARK-42931][SS] Introduce dropDuplicatesWithinWatermark
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to introduce a new API of dropDuplicates which has 
following different characteristics compared to existing dropDuplicates:
    
    * Weaker constraints on the subset (key)
      * Does not require an event time column on the subset.
    * Looser semantics on deduplication
      * Only guarantee to deduplicate events within watermark delay.
    
    Since the new API leverages event time, the new API has following new 
requirements:
    
    * The watermark must be defined in the streaming DataFrame
    * The event time column must be defined in the streaming DataFrame.
    
    More specifically on the semantic, once the operator processes the first 
arrived event, events arriving within the watermark for the first event will be 
deduplicated.
    (Technically, the expiration time should be the “event time of the first 
arrived event + watermark delay threshold”, to match up with future events.)
    
    Users are encouraged to set the delay threshold of watermark longer than 
max timestamp differences among duplicated events. (If they are unsure, they 
can alternatively set the delay threshold large enough, e.g. 48 hours.)
    
    For batch DataFrame, this is equivalent to the dropDuplicates.
    
    This PR also updates the SS guide doc to introduce the new feature; 
screenshots below:
    
    <img width="747" alt="스크린샷 2023-04-06 오전 11 09 12" 
src="https://user-images.githubusercontent.com/1317309/230254868-7fe76175-5883-4700-b018-d85d851799cb.png";>
    <img width="749" alt="스크린샷 2023-04-06 오전 11 09 18" 
src="https://user-images.githubusercontent.com/1317309/230254874-a754cdfd-2832-41dd-85b6-291f05eccb3d.png";>
    <img width="752" alt="스크린샷 2023-04-06 오전 11 09 23" 
src="https://user-images.githubusercontent.com/1317309/230254876-7fd7b3b1-f59d-481f-8249-5a4ae556c7cf.png";>
    <img width="751" alt="스크린샷 2023-04-06 오전 11 09 29" 
src="https://user-images.githubusercontent.com/1317309/230254880-79b158ca-3403-46a6-be4a-46618ec749db.png";>
    
    ### Why are the changes needed?
    
    Existing dropDuplicates API does not address the valid use case on 
streaming query.
    
    There are many cases where the event time is not exact the same, although 
these events are same. One example is duplicated events are produced due to 
non-idempotent writer where event time is issued from producer/broker side. 
Another example is that the value of event time is unstable and users want to 
use alternative timestamp e.g. ingestion time.
    
    For these case, users have to exclude event time column from subset of 
deduplication, but then the operator is unable to evict state, leading to 
indefinitely growing state.
    
    To allow eviction of state while event time column is not required to be a 
part of subset of deduplication, we need to loose the semantic for the API, 
which warrants a new API.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, this introduces a new public API, dropDuplicatesWithinWatermark.
    
    ### How was this patch tested?
    
    New test suite.
    
    Closes #40561 from HeartSaVioR/SPARK-42931.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  18 ++
 docs/structured-streaming-programming-guide.md     |  55 ++++++
 .../source/reference/pyspark.sql/dataframe.rst     |   1 +
 python/pyspark/sql/connect/dataframe.py            |   3 +
 python/pyspark/sql/dataframe.py                    |  56 ++++++
 .../analysis/UnsupportedOperationChecker.scala     |  18 ++
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |   5 +
 .../plans/logical/basicLogicalOperators.scala      |   8 +
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 123 +++++++++++--
 .../spark/sql/execution/SparkStrategies.scala      |   3 +
 .../execution/streaming/IncrementalExecution.scala |  14 ++
 .../execution/streaming/statefulOperators.scala    | 141 +++++++++++++--
 ...treamingDeduplicationWithinWatermarkSuite.scala | 200 +++++++++++++++++++++
 13 files changed, 614 insertions(+), 31 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index 54410521a4c..023379c00a5 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2318,6 +2318,24 @@ class Dataset[T] private[sql] (
     dropDuplicates(colNames)
   }
 
+  def dropDuplicatesWithinWatermark(): Dataset[T] = {
+    dropDuplicatesWithinWatermark(this.columns)
+  }
+
+  def dropDuplicatesWithinWatermark(colNames: Seq[String]): Dataset[T] = {
+    throw new UnsupportedOperationException("dropDuplicatesWithinWatermark is 
not implemented.")
+  }
+
+  def dropDuplicatesWithinWatermark(colNames: Array[String]): Dataset[T] = {
+    dropDuplicatesWithinWatermark(colNames.toSeq)
+  }
+
+  @scala.annotation.varargs
+  def dropDuplicatesWithinWatermark(col1: String, cols: String*): Dataset[T] = 
{
+    val colNames: Seq[String] = col1 +: cols
+    dropDuplicatesWithinWatermark(colNames)
+  }
+
   /**
    * Computes basic statistics for numeric and string columns, including 
count, mean, stddev, min,
    * and max. If no columns are given, this function computes statistics for 
all numerical or
diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index f0f41e5cee1..53d18ab9aa3 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -2132,6 +2132,61 @@ streamingDf <- withWatermark(streamingDf, "eventTime", 
"10 seconds")
 streamingDf <- dropDuplicates(streamingDf, "guid", "eventTime")
 {% endhighlight %}
 
+</div>
+
+</div>
+
+Specifically for streaming, you can deduplicate records in data streams using 
a unique identifier in the events, within the time range of watermark.
+For example, if you set the delay threshold of watermark as "1 hour", 
duplicated events which occurred within 1 hour can be correctly deduplicated.
+(For more details, please refer to the API doc of 
[dropDuplicatesWithinWatermark](/api/scala/org/apache/spark/sql/Dataset.html#dropDuplicatesWithinWatermark():org.apache.spark.sql.Dataset[T]).)
+
+This can be used to deal with use case where event time column cannot be a 
part of unique identifier, mostly due to the case
+where event times are somehow different for the same records. (E.g. 
non-idempotent writer where issuing event time happens at write)
+
+Users are encouraged to set the delay threshold of watermark longer than max 
timestamp differences among duplicated events.
+
+This feature requires watermark with delay threshold to be set in streaming 
DataFrame/Dataset.
+
+<div class="codetabs">
+
+<div data-lang="python"  markdown="1">
+
+{% highlight python %}
+streamingDf = spark.readStream. ...
+
+# deduplicate using guid column with watermark based on eventTime column
+streamingDf \
+  .withWatermark("eventTime", "10 hours") \
+  .dropDuplicatesWithinWatermark("guid")
+{% endhighlight %}
+
+</div>
+
+<div data-lang="scala"  markdown="1">
+
+{% highlight scala %}
+val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...
+
+// deduplicate using guid column with watermark based on eventTime column
+streamingDf
+  .withWatermark("eventTime", "10 hours")
+  .dropDuplicatesWithinWatermark("guid")
+{% endhighlight %}
+
+</div>
+
+<div data-lang="java"  markdown="1">
+
+{% highlight java %}
+Dataset<Row> streamingDf = spark.readStream(). ...;  // columns: guid, 
eventTime, ...
+
+// deduplicate using guid column with watermark based on eventTime column
+streamingDf
+  .withWatermark("eventTime", "10 hours")
+  .dropDuplicatesWithinWatermark("guid");
+{% endhighlight %}
+
+
 </div>
 
 </div>
diff --git a/python/docs/source/reference/pyspark.sql/dataframe.rst 
b/python/docs/source/reference/pyspark.sql/dataframe.rst
index aa306ccc382..98bf9465f80 100644
--- a/python/docs/source/reference/pyspark.sql/dataframe.rst
+++ b/python/docs/source/reference/pyspark.sql/dataframe.rst
@@ -50,6 +50,7 @@ DataFrame
     DataFrame.distinct
     DataFrame.drop
     DataFrame.dropDuplicates
+    DataFrame.dropDuplicatesWithinWatermark
     DataFrame.drop_duplicates
     DataFrame.dropna
     DataFrame.dtypes
diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index 85143f0ee89..9106ddc2fc8 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -331,6 +331,9 @@ class DataFrame:
 
     drop_duplicates = dropDuplicates
 
+    def dropDuplicatesWithinWatermark(self, subset: Optional[List[str]] = 
None) -> "DataFrame":
+        raise NotImplementedError("dropDuplicatesWithinWatermark() is not 
implemented.")
+
     def distinct(self) -> "DataFrame":
         return DataFrame.withPlan(
             plan.Deduplicate(child=self._plan, all_columns_as_keys=True), 
session=self._session
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index e7df25d20fc..2191fe88eff 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -3963,6 +3963,62 @@ class DataFrame(PandasMapOpsMixin, 
PandasConversionMixin):
             jdf = self._jdf.dropDuplicates(self._jseq(subset))
         return DataFrame(jdf, self.sparkSession)
 
+    def dropDuplicatesWithinWatermark(self, subset: Optional[List[str]] = 
None) -> "DataFrame":
+        """Return a new :class:`DataFrame` with duplicate rows removed,
+         optionally only considering certain columns, within watermark.
+
+         This only works with streaming :class:`DataFrame`, and watermark for 
the input
+         :class:`DataFrame` must be set via :func:`withWatermark`.
+
+        For a streaming :class:`DataFrame`, this will keep all data across 
triggers as intermediate
+        state to drop duplicated rows. The state will be kept to guarantee the 
semantic, "Events
+        are deduplicated as long as the time distance of earliest and latest 
events are smaller
+        than the delay threshold of watermark." Users are encouraged to set 
the delay threshold of
+        watermark longer than max timestamp differences among duplicated 
events.
+
+        Note: too late data older than watermark will be dropped.
+
+         .. versionadded:: 3.5.0
+
+         Parameters
+         ----------
+         subset : List of column names, optional
+             List of columns to use for duplicate comparison (default All 
columns).
+
+         Returns
+         -------
+         :class:`DataFrame`
+             DataFrame without duplicates.
+
+         Examples
+         --------
+         >>> from pyspark.sql import Row
+         >>> from pyspark.sql.functions import timestamp_seconds
+         >>> df = spark.readStream.format("rate").load().selectExpr(
+         ...     "value % 5 AS value", "timestamp")
+         >>> df.select("value", 
df.timestamp.alias("time")).withWatermark("time", '10 minutes')
+         DataFrame[value: bigint, time: timestamp]
+
+         Deduplicate the same rows.
+
+         >>> df.dropDuplicatesWithinWatermark() # doctest: +SKIP
+
+         Deduplicate values on 'value' columns.
+
+         >>> df.dropDuplicatesWithinWatermark(['value'])  # doctest: +SKIP
+        """
+        if subset is not None and (not isinstance(subset, Iterable) or 
isinstance(subset, str)):
+            raise PySparkTypeError(
+                error_class="NOT_LIST_OR_TUPLE",
+                message_parameters={"arg_name": "subset", "arg_type": 
type(subset).__name__},
+            )
+
+        if subset is None:
+            jdf = self._jdf.dropDuplicatesWithinWatermark()
+        else:
+            jdf = self._jdf.dropDuplicatesWithinWatermark(self._jseq(subset))
+        return DataFrame(jdf, self.sparkSession)
+
     def dropna(
         self,
         how: str = "any",
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index 69ebe09667d..daa7c0d54b7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -37,6 +37,10 @@ object UnsupportedOperationChecker extends Logging {
       case p if p.isStreaming =>
         throwError("Queries with streaming sources must be executed with 
writeStream.start()")(p)
 
+      case d: DeduplicateWithinWatermark =>
+        throwError("dropDuplicatesWithinWatermark is not supported with batch 
" +
+          "DataFrames/DataSets")(d)
+
       case _ =>
     }
   }
@@ -114,6 +118,7 @@ object UnsupportedOperationChecker extends Logging {
     case f: FlatMapGroupsWithState if f.isStreaming => true
     case f: FlatMapGroupsInPandasWithState if f.isStreaming => true
     case d: Deduplicate if d.isStreaming && d.keys.exists(hasEventTimeCol) => 
true
+    case d: DeduplicateWithinWatermark if d.isStreaming => true
     case _ => false
   }
 
@@ -464,6 +469,19 @@ object UnsupportedOperationChecker extends Logging {
               throwError(s"Join type $joinType is not supported with streaming 
DataFrame/Dataset")
           }
 
+        case d: DeduplicateWithinWatermark if d.isStreaming =>
+          // Find any attributes that are associated with an eventTime 
watermark.
+          val watermarkAttributes = d.child.output.collect {
+            case a: Attribute if 
a.metadata.contains(EventTimeWatermark.delayKey) => a
+          }
+
+          // DeduplicateWithinWatermark requires event time column being set 
in the input DataFrame
+          if (watermarkAttributes.isEmpty) {
+            throwError(
+              "dropDuplicatesWithinWatermark is not supported on streaming 
DataFrames/DataSets " +
+                "without watermark")(plan)
+          }
+
         case c: CoGroup if c.children.exists(_.isStreaming) =>
           throwError("CoGrouping with a streaming DataFrame/Dataset is not 
supported")
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 7d1ef6c7fb3..568e3d30e34 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -679,6 +679,8 @@ object RemoveNoopUnion extends Rule[LogicalPlan] {
       d.withNewChildren(Seq(simplifyUnion(u)))
     case d @ Deduplicate(_, u: Union) =>
       d.withNewChildren(Seq(simplifyUnion(u)))
+    case d @ DeduplicateWithinWatermark(_, u: Union) =>
+      d.withNewChildren(Seq(simplifyUnion(u)))
   }
 }
 
@@ -1451,6 +1453,9 @@ object CombineUnions extends Rule[LogicalPlan] {
     // Only handle distinct-like 'Deduplicate', where the keys == output
     case Deduplicate(keys: Seq[Attribute], u: Union) if AttributeSet(keys) == 
u.outputSet =>
       Deduplicate(keys, flattenUnion(u, true))
+    case DeduplicateWithinWatermark(keys: Seq[Attribute], u: Union)
+      if AttributeSet(keys) == u.outputSet =>
+      DeduplicateWithinWatermark(keys, flattenUnion(u, true))
   }
 
   private def flattenUnion(union: Union, flattenDistinct: Boolean): Union = {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 09807f53c6a..91726185090 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -1912,6 +1912,14 @@ case class Deduplicate(
     copy(child = newChild)
 }
 
+case class DeduplicateWithinWatermark(keys: Seq[Attribute], child: 
LogicalPlan) extends UnaryNode {
+  override def maxRows: Option[Long] = child.maxRows
+  override def output: Seq[Attribute] = child.output
+  final override val nodePatterns: Seq[TreePattern] = Seq(DISTINCT_LIKE)
+  override protected def withNewChildInternal(newChild: LogicalPlan): 
DeduplicateWithinWatermark =
+    copy(child = newChild)
+}
+
 /**
  * A trait to represent the commands that support subqueries.
  * This is used to allow such commands in the subquery-related checks.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index b2cd8f7460a..be393e46e54 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2999,20 +2999,7 @@ class Dataset[T] private[sql](
    * @since 2.0.0
    */
   def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan {
-    val resolver = sparkSession.sessionState.analyzer.resolver
-    val allColumns = queryExecution.analyzed.output
-    // SPARK-31990: We must keep `toSet.toSeq` here because of the backward 
compatibility issue
-    // (the Streaming's state store depends on the `groupCols` order).
-    val groupCols = colNames.toSet.toSeq.flatMap { (colName: String) =>
-      // It is possibly there are more than one columns with the same name,
-      // so we call filter instead of find.
-      val cols = allColumns.filter(col => resolver(col.name, colName))
-      if (cols.isEmpty) {
-        throw 
QueryCompilationErrors.cannotResolveColumnNameAmongAttributesError(
-          colName, schema.fieldNames.mkString(", "))
-      }
-      cols
-    }
+    val groupCols = groupColsFromDropDuplicates(colNames)
     Deduplicate(groupCols, logicalPlan)
   }
 
@@ -3050,6 +3037,114 @@ class Dataset[T] private[sql](
     dropDuplicates(colNames)
   }
 
+  /**
+   * Returns a new Dataset with duplicates rows removed, within watermark.
+   *
+   * This only works with streaming [[Dataset]], and watermark for the input 
[[Dataset]] must be
+   * set via [[withWatermark]].
+   *
+   * For a streaming [[Dataset]], this will keep all data across triggers as 
intermediate state
+   * to drop duplicated rows. The state will be kept to guarantee the 
semantic, "Events are
+   * deduplicated as long as the time distance of earliest and latest events 
are smaller than the
+   * delay threshold of watermark." Users are encouraged to set the delay 
threshold of watermark
+   * longer than max timestamp differences among duplicated events.
+   *
+   * Note: too late data older than watermark will be dropped.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def dropDuplicatesWithinWatermark(): Dataset[T] = {
+    dropDuplicatesWithinWatermark(this.columns)
+  }
+
+  /**
+   * Returns a new Dataset with duplicates rows removed, considering only the 
subset of columns,
+   * within watermark.
+   *
+   * This only works with streaming [[Dataset]], and watermark for the input 
[[Dataset]] must be
+   * set via [[withWatermark]].
+   *
+   * For a streaming [[Dataset]], this will keep all data across triggers as 
intermediate state
+   * to drop duplicated rows. The state will be kept to guarantee the 
semantic, "Events are
+   * deduplicated as long as the time distance of earliest and latest events 
are smaller than the
+   * delay threshold of watermark." Users are encouraged to set the delay 
threshold of watermark
+   * longer than max timestamp differences among duplicated events.
+   *
+   * Note: too late data older than watermark will be dropped.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def dropDuplicatesWithinWatermark(colNames: Seq[String]): Dataset[T] = 
withTypedPlan {
+    val groupCols = groupColsFromDropDuplicates(colNames)
+    // UnsupportedOperationChecker will fail the query if this is called with 
batch Dataset.
+    DeduplicateWithinWatermark(groupCols, logicalPlan)
+  }
+
+  /**
+   * Returns a new Dataset with duplicates rows removed, considering only the 
subset of columns,
+   * within watermark.
+   *
+   * This only works with streaming [[Dataset]], and watermark for the input 
[[Dataset]] must be
+   * set via [[withWatermark]].
+   *
+   * For a streaming [[Dataset]], this will keep all data across triggers as 
intermediate state
+   * to drop duplicated rows. The state will be kept to guarantee the 
semantic, "Events are
+   * deduplicated as long as the time distance of earliest and latest events 
are smaller than the
+   * delay threshold of watermark." Users are encouraged to set the delay 
threshold of watermark
+   * longer than max timestamp differences among duplicated events.
+   *
+   * Note: too late data older than watermark will be dropped.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  def dropDuplicatesWithinWatermark(colNames: Array[String]): Dataset[T] = {
+    dropDuplicatesWithinWatermark(colNames.toSeq)
+  }
+
+  /**
+   * Returns a new Dataset with duplicates rows removed, considering only the 
subset of columns,
+   * within watermark.
+   *
+   * This only works with streaming [[Dataset]], and watermark for the input 
[[Dataset]] must be
+   * set via [[withWatermark]].
+   *
+   * For a streaming [[Dataset]], this will keep all data across triggers as 
intermediate state
+   * to drop duplicated rows. The state will be kept to guarantee the 
semantic, "Events are
+   * deduplicated as long as the time distance of earliest and latest events 
are smaller than the
+   * delay threshold of watermark." Users are encouraged to set the delay 
threshold of watermark
+   * longer than max timestamp differences among duplicated events.
+   *
+   * Note: too late data older than watermark will be dropped.
+   *
+   * @group typedrel
+   * @since 3.5.0
+   */
+  @scala.annotation.varargs
+  def dropDuplicatesWithinWatermark(col1: String, cols: String*): Dataset[T] = 
{
+    val colNames: Seq[String] = col1 +: cols
+    dropDuplicatesWithinWatermark(colNames)
+  }
+
+  private def groupColsFromDropDuplicates(colNames: Seq[String]): 
Seq[Attribute] = {
+    val resolver = sparkSession.sessionState.analyzer.resolver
+    val allColumns = queryExecution.analyzed.output
+    // SPARK-31990: We must keep `toSet.toSeq` here because of the backward 
compatibility issue
+    // (the Streaming's state store depends on the `groupCols` order).
+    colNames.toSet.toSeq.flatMap { (colName: String) =>
+      // It is possibly there are more than one columns with the same name,
+      // so we call filter instead of find.
+      val cols = allColumns.filter(col => resolver(col.name, colName))
+      if (cols.isEmpty) {
+        throw 
QueryCompilationErrors.cannotResolveColumnNameAmongAttributesError(
+          colName, schema.fieldNames.mkString(", "))
+      }
+      cols
+    }
+  }
+
   /**
    * Computes basic statistics for numeric and string columns, including 
count, mean, stddev, min,
    * and max. If no columns are given, this function computes statistics for 
all numerical or
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 972376220f8..824d18043cb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -454,6 +454,9 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
       case Deduplicate(keys, child) if child.isStreaming =>
         StreamingDeduplicateExec(keys, planLater(child)) :: Nil
 
+      case DeduplicateWithinWatermark(keys, child) if child.isStreaming =>
+        StreamingDeduplicateWithinWatermarkExec(keys, planLater(child)) :: Nil
+
       case _ => Nil
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index 8bf3440c838..07bebb8e471 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -228,6 +228,14 @@ class IncrementalExecution(
           eventTimeWatermarkForLateEvents = None,
           eventTimeWatermarkForEviction = None)
 
+      case StreamingDeduplicateWithinWatermarkExec(keys, child, None, None, 
None) =>
+        StreamingDeduplicateWithinWatermarkExec(
+          keys,
+          child,
+          Some(nextStatefulOperationStateInfo),
+          eventTimeWatermarkForLateEvents = None,
+          eventTimeWatermarkForEviction = None)
+
       case m: FlatMapGroupsWithStateExec =>
         // We set this to true only for the first batch of the streaming query.
         val hasInitialState = (currentBatchId == 0L && m.hasInitialState)
@@ -296,6 +304,12 @@ class IncrementalExecution(
           eventTimeWatermarkForEviction = 
inputWatermarkForEviction(s.stateInfo.get)
         )
 
+      case s: StreamingDeduplicateWithinWatermarkExec if s.stateInfo.isDefined 
=>
+        s.copy(
+          eventTimeWatermarkForLateEvents = 
inputWatermarkForLateEvents(s.stateInfo.get),
+          eventTimeWatermarkForEviction = 
inputWatermarkForEviction(s.stateInfo.get)
+        )
+
       case m: FlatMapGroupsWithStateExec if m.stateInfo.isDefined =>
         m.copy(
           eventTimeWatermarkForLateEvents = 
inputWatermarkForLateEvents(m.stateInfo.get),
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index 49bb8607128..b31f6151fce 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -32,6 +32,7 @@ import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio
 import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
 import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, Distribution, 
Partitioning}
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
@@ -885,23 +886,23 @@ case class SessionWindowStateStoreSaveExec(
   }
 }
 
-
-/** Physical operator for executing streaming Deduplicate. */
-case class StreamingDeduplicateExec(
-    keyExpressions: Seq[Attribute],
-    child: SparkPlan,
-    stateInfo: Option[StatefulOperatorStateInfo] = None,
-    eventTimeWatermarkForLateEvents: Option[Long] = None,
-    eventTimeWatermarkForEviction: Option[Long] = None)
+abstract class BaseStreamingDeduplicateExec
   extends UnaryExecNode with StateStoreWriter with WatermarkSupport {
 
+  def keyExpressions: Seq[Attribute]
+  def child: SparkPlan
+  def stateInfo: Option[StatefulOperatorStateInfo]
+  def eventTimeWatermarkForLateEvents: Option[Long]
+  def eventTimeWatermarkForEviction: Option[Long]
+
   /** Distribute by grouping attributes */
   override def requiredChildDistribution: Seq[Distribution] = {
     StatefulOperatorPartitioning.getCompatibleDistribution(
       keyExpressions, getStateInfo, conf) :: Nil
   }
 
-  private val schemaForEmptyRow: StructType = 
StructType(Array(StructField("__dummy__", NullType)))
+  protected val schemaForValueRow: StructType
+  protected val extraOptionOnStateStore: Map[String, String]
 
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
@@ -909,13 +910,11 @@ case class StreamingDeduplicateExec(
     child.execute().mapPartitionsWithStateStore(
       getStateInfo,
       keyExpressions.toStructType,
-      schemaForEmptyRow,
+      schemaForValueRow,
       numColsPrefixKey = 0,
       session.sessionState,
       Some(session.streams.stateStoreCoordinator),
-      // We won't check value row in state store since the value 
StreamingDeduplicateExec.EMPTY_ROW
-      // is unrelated to the output schema.
-      Map(StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG -> "false")) { 
(store, iter) =>
+      extraOptionOnStateStore) { (store, iter) =>
       val getKey = GenerateUnsafeProjection.generate(keyExpressions, 
child.output)
       val numOutputRows = longMetric("numOutputRows")
       val numUpdatedStateRows = longMetric("numUpdatedStateRows")
@@ -929,6 +928,8 @@ case class StreamingDeduplicateExec(
         case None => iter
       }
 
+      val reusedDupInfoRow = initializeReusedDupInfoRow()
+
       val updatesStartTimeNs = System.nanoTime
 
       val result = baseIterator.filter { r =>
@@ -936,7 +937,7 @@ case class StreamingDeduplicateExec(
         val key = getKey(row)
         val value = store.get(key)
         if (value == null) {
-          store.put(key, StreamingDeduplicateExec.EMPTY_ROW)
+          putDupInfoIntoState(store, row, key, reusedDupInfoRow)
           numUpdatedStateRows += 1
           numOutputRows += 1
           true
@@ -949,7 +950,7 @@ case class StreamingDeduplicateExec(
 
       CompletionIterator[InternalRow, Iterator[InternalRow]](result, {
         allUpdatesTimeMs += NANOSECONDS.toMillis(System.nanoTime - 
updatesStartTimeNs)
-        allRemovalsTimeMs += timeTakenMs { removeKeysOlderThanWatermark(store) 
}
+        allRemovalsTimeMs += timeTakenMs { evictDupInfoFromState(store) }
         commitTimeMs += timeTakenMs { store.commit() }
         setStoreMetrics(store)
         setOperatorMetrics()
@@ -957,6 +958,16 @@ case class StreamingDeduplicateExec(
     }
   }
 
+  protected def initializeReusedDupInfoRow(): Option[UnsafeRow]
+
+  protected def putDupInfoIntoState(
+      store: StateStore,
+      data: UnsafeRow,
+      key: UnsafeRow,
+      reusedDupInfoRow: Option[UnsafeRow]): Unit
+
+  protected def evictDupInfoFromState(store: StateStore): Unit
+
   override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
@@ -965,12 +976,44 @@ case class StreamingDeduplicateExec(
     Seq(StatefulOperatorCustomSumMetric("numDroppedDuplicateRows", "number of 
duplicates dropped"))
   }
 
-  override def shortName: String = "dedupe"
-
   override def shouldRunAnotherBatch(newInputWatermark: Long): Boolean = {
     eventTimeWatermarkForEviction.isDefined &&
       newInputWatermark > eventTimeWatermarkForEviction.get
   }
+}
+
+/** Physical operator for executing streaming Deduplicate. */
+case class StreamingDeduplicateExec(
+    keyExpressions: Seq[Attribute],
+    child: SparkPlan,
+    stateInfo: Option[StatefulOperatorStateInfo] = None,
+    eventTimeWatermarkForLateEvents: Option[Long] = None,
+    eventTimeWatermarkForEviction: Option[Long] = None)
+  extends BaseStreamingDeduplicateExec {
+
+  protected val schemaForValueRow: StructType =
+    StructType(Array(StructField("__dummy__", NullType)))
+
+  // We won't check value row in state store since the value 
StreamingDeduplicateExec.EMPTY_ROW
+  // is unrelated to the output schema.
+  protected val extraOptionOnStateStore: Map[String, String] =
+    Map(StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG -> "false")
+
+  protected def initializeReusedDupInfoRow(): Option[UnsafeRow] = None
+
+  protected def putDupInfoIntoState(
+      store: StateStore,
+      data: UnsafeRow,
+      key: UnsafeRow,
+      reusedDupInfoRow: Option[UnsafeRow]): Unit = {
+    store.put(key, StreamingDeduplicateExec.EMPTY_ROW)
+  }
+
+  protected def evictDupInfoFromState(store: StateStore): Unit = {
+    removeKeysOlderThanWatermark(store)
+  }
+
+  override def shortName: String = "dedupe"
 
   override protected def withNewChildInternal(newChild: SparkPlan): 
StreamingDeduplicateExec =
     copy(child = newChild)
@@ -980,3 +1023,67 @@ object StreamingDeduplicateExec {
   private val EMPTY_ROW =
     
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
 }
+
+case class StreamingDeduplicateWithinWatermarkExec(
+    keyExpressions: Seq[Attribute],
+    child: SparkPlan,
+    stateInfo: Option[StatefulOperatorStateInfo] = None,
+    eventTimeWatermarkForLateEvents: Option[Long] = None,
+    eventTimeWatermarkForEviction: Option[Long] = None)
+  extends BaseStreamingDeduplicateExec {
+
+  protected val schemaForValueRow: StructType = StructType(
+    Array(StructField("expiresAtMicros", LongType, nullable = false)))
+
+  protected val extraOptionOnStateStore: Map[String, String] = Map.empty
+
+  private val eventTimeCol: Attribute = 
WatermarkSupport.findEventTimeColumn(child.output,
+    allowMultipleEventTimeColumns = false).get
+  private val delayThresholdMs = 
eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey)
+  private val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol)
+
+  protected def initializeReusedDupInfoRow(): Option[UnsafeRow] = {
+    val timeoutToUnsafeRow = UnsafeProjection.create(schemaForValueRow)
+    val timeoutRow = timeoutToUnsafeRow(new 
SpecificInternalRow(schemaForValueRow))
+    Some(timeoutRow)
+  }
+
+  protected def putDupInfoIntoState(
+      store: StateStore,
+      data: UnsafeRow,
+      key: UnsafeRow,
+      reusedDupInfoRow: Option[UnsafeRow]): Unit = {
+    assert(reusedDupInfoRow.isDefined, "This should have reused row.")
+    val timeoutRow = reusedDupInfoRow.get
+
+    // We expect data type of event time column to be TimestampType or 
TimestampNTZType which both
+    // are internally represented as Long.
+    val timestamp = data.getLong(eventTimeColOrdinal)
+    // The unit of timestamp in Spark is microseconds, convert the delay 
threshold to micros.
+    val expiresAt = timestamp + DateTimeUtils.millisToMicros(delayThresholdMs)
+
+    timeoutRow.setLong(0, expiresAt)
+    store.put(key, timeoutRow)
+  }
+
+  protected def evictDupInfoFromState(store: StateStore): Unit = {
+    val numRemovedStateRows = longMetric("numRemovedStateRows")
+
+    // Convert watermark value to micros.
+    val watermarkForEviction = 
DateTimeUtils.millisToMicros(eventTimeWatermarkForEviction.get)
+    store.iterator().foreach { rowPair =>
+      val valueRow = rowPair.value
+
+      val expiresAt = valueRow.getLong(0)
+      if (watermarkForEviction >= expiresAt) {
+        store.remove(rowPair.key)
+        numRemovedStateRows += 1
+      }
+    }
+  }
+
+  override def shortName: String = "dedupeWithinWatermark"
+
+  override protected def withNewChildInternal(
+      newChild: SparkPlan): StreamingDeduplicateWithinWatermarkExec = 
copy(child = newChild)
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala
new file mode 100644
index 00000000000..c1435182c15
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.sql.{AnalysisException, Dataset, SaveMode}
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Append
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.functions.timestamp_seconds
+
+class StreamingDeduplicationWithinWatermarkSuite extends StateStoreMetricsTest 
{
+
+  import testImplicits._
+
+  test("deduplicate in batch DataFrame") {
+    def testAndVerify(df: Dataset[_]): Unit = {
+      val exc = intercept[AnalysisException] {
+        df.write.format("noop").mode(SaveMode.Append).save()
+      }
+
+      assert(exc.getMessage.contains("dropDuplicatesWithinWatermark is not 
supported"))
+      assert(exc.getMessage.contains("batch DataFrames/DataSets"))
+    }
+
+    val result = spark.range(10).dropDuplicatesWithinWatermark()
+    testAndVerify(result)
+
+    val result2 = spark.range(10).withColumn("newcol", $"id")
+      .dropDuplicatesWithinWatermark("newcol")
+    testAndVerify(result2)
+  }
+
+  test("deduplicate without event time column should result in error") {
+    def testAndVerify(df: Dataset[_]): Unit = {
+      val exc = intercept[AnalysisException] {
+        df.writeStream.format("noop").start()
+      }
+
+      assert(exc.getMessage.contains("dropDuplicatesWithinWatermark is not 
supported"))
+      assert(exc.getMessage.contains("streaming DataFrames/DataSets without 
watermark"))
+    }
+
+    val inputData = MemoryStream[String]
+    val result = inputData.toDS().dropDuplicatesWithinWatermark()
+    testAndVerify(result)
+
+    val result2 = inputData.toDS().withColumn("newcol", $"value")
+      .dropDuplicatesWithinWatermark("newcol")
+    testAndVerify(result2)
+
+    val inputData2 = MemoryStream[(String, Int)]
+    val otherSideForJoin = inputData2.toDF()
+      .select($"_1" as "key", timestamp_seconds($"_2") as "time")
+      .withWatermark("Time", "10 seconds")
+
+    val result3 = inputData.toDS()
+      .select($"value".as("key"))
+      // there are two streams which one stream only defines the watermark. 
the stream which
+      // contains dropDuplicatesWithinWatermark does not define the watermark, 
which is not
+      // supported.
+      .dropDuplicatesWithinWatermark()
+      .join(otherSideForJoin, "key")
+    testAndVerify(result3)
+  }
+
+  test("deduplicate with all columns with event time column in DataFrame") {
+    val inputData = MemoryStream[Int]
+    val result = inputData.toDS()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .withWatermark("eventTime", "10 seconds")
+      .dropDuplicatesWithinWatermark()
+      .select($"eventTime".cast("long").as[Long])
+
+    testStream(result, Append)(
+      // Advance watermark to 5 secs, no-data-batch does not drop state rows
+      AddData(inputData, (1 to 5).flatMap(_ => (10 to 15)): _*),
+      CheckAnswer(10 to 15: _*),
+      assertNumStateRows(total = 6, updated = 6),
+
+      // Advance watermark to 7 secs, no-data-batch does not drop state rows
+      AddData(inputData, (13 to 17): _*),
+      // 13 to 15 are duplicated
+      CheckNewAnswer(16, 17),
+      assertNumStateRows(total = 8, updated = 2),
+
+      AddData(inputData, 5), // Should not emit anything as data less than 
watermark
+      CheckNewAnswer(),
+      assertNumStateRows(total = 8, updated = 0, droppedByWatermark = 1),
+
+      // Advance watermark to 25 secs, no-data-batch drops state rows having 
expired time <= 25
+      AddData(inputData, 35),
+      CheckNewAnswer(35),
+      assertNumStateRows(total = 3, updated = 1),
+
+      // Advance watermark to 45 seconds, no-data-batch drops state rows 
having expired time <= 45
+      AddData(inputData, 55),
+      CheckNewAnswer(55),
+      assertNumStateRows(total = 1, updated = 1)
+    )
+  }
+
+  test("deduplicate with subset of columns which event time column is not in 
subset") {
+    val inputData = MemoryStream[(String, Int)]
+    val result = inputData.toDS()
+      .withColumn("eventTime", timestamp_seconds($"_2"))
+      .withWatermark("eventTime", "2 seconds")
+      .dropDuplicatesWithinWatermark("_1")
+      .select($"_1", $"eventTime".cast("long").as[Long])
+
+    testStream(result, Append)(
+      // Advances watermark to 15
+      AddData(inputData, "a" -> 17),
+      CheckNewAnswer("a" -> 17),
+      // expired time is set to 19
+      assertNumStateRows(total = 1, updated = 1),
+
+      // Watermark does not advance
+      AddData(inputData, "a" -> 16),
+      CheckNewAnswer(),
+      assertNumStateRows(total = 1, updated = 0),
+
+      // Watermark does not advance
+      // Should not emit anything as data less than watermark
+      AddData(inputData, "a" -> 13),
+      CheckNewAnswer(),
+      assertNumStateRows(total = 1, updated = 0, droppedByWatermark = 1),
+
+      // Advances watermark to 20. no-data batch drops state row ("a" -> 19)
+      AddData(inputData, "b" -> 22, "c" -> 21),
+      CheckNewAnswer("b" -> 22, "c" -> 21),
+      // expired time is set to 24 and 23
+      assertNumStateRows(total = 2, updated = 2),
+
+      // Watermark does not advance
+      AddData(inputData, "a" -> 21),
+      // "a" is identified as new event since previous batch dropped state row 
("a" -> 19)
+      CheckNewAnswer("a" -> 21),
+      // expired time is set to 23
+      assertNumStateRows(total = 3, updated = 1),
+
+      // Advances watermark to 23. no-data batch drops state row ("a" -> 23), 
("c" -> 23)
+      AddData(inputData, "d" -> 25),
+      CheckNewAnswer("d" -> 25),
+      assertNumStateRows(total = 2, updated = 1)
+    )
+  }
+
+  test("SPARK-39650: duplicate with specific keys should allow input to change 
schema") {
+    withTempDir { checkpoint =>
+      val dedupeInputData = MemoryStream[(String, Int)]
+      val dedupe = dedupeInputData.toDS()
+        .withColumn("eventTime", timestamp_seconds($"_2"))
+        .withWatermark("eventTime", "10 second")
+        .dropDuplicatesWithinWatermark("_1")
+        .select($"_1", $"eventTime".cast("long").as[Long])
+
+      testStream(dedupe, Append)(
+        StartStream(checkpointLocation = checkpoint.getCanonicalPath),
+
+        AddData(dedupeInputData, "a" -> 1),
+        CheckNewAnswer("a" -> 1),
+
+        AddData(dedupeInputData, "a" -> 2, "b" -> 3),
+        CheckNewAnswer("b" -> 3)
+      )
+
+      val dedupeInputData2 = MemoryStream[(String, Int, String)]
+      val dedupe2 = dedupeInputData2.toDS()
+        .withColumn("eventTime", timestamp_seconds($"_2"))
+        .withWatermark("eventTime", "10 second")
+        .dropDuplicatesWithinWatermark(Seq("_1"))
+        .select($"_1", $"eventTime".cast("long").as[Long], $"_3")
+
+      // initialize new memory stream with previously executed batches
+      dedupeInputData2.addData(("a", 1, "dummy"))
+      dedupeInputData2.addData(Seq(("a", 2, "dummy"), ("b", 3, "dummy")))
+
+      testStream(dedupe2, Append)(
+        StartStream(checkpointLocation = checkpoint.getCanonicalPath),
+
+        AddData(dedupeInputData2, ("a", 5, "a"), ("b", 2, "b"), ("c", 9, "c")),
+        CheckNewAnswer(("c", 9, "c"))
+      )
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to