rangadi commented on code in PR #40561:
URL: https://github.com/apache/spark/pull/40561#discussion_r1151374343
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##########
@@ -464,6 +469,19 @@ object UnsupportedOperationChecker extends Logging {
throwError(s"Join type $joinType is not supported with streaming
DataFrame/Dataset")
}
+ case d: DeduplicateWithinWatermark if d.isStreaming =>
+ // Find any attributes that are associated with an eventTime
watermark.
+ val watermarkAttributes = d.child.output.collect {
+ case a: Attribute if
a.metadata.contains(EventTimeWatermark.delayKey) => a
+ }
+
+ // DeduplicateWithinWatermark requires event time column being set
in the input DataFrame
+ if (watermarkAttributes.isEmpty) {
+ throwError(
+ "dropDuplicatesWithinWatermark is not supported on streaming
DataFrames/DataSets " +
Review Comment:
[optional]
"dropDuplicatesWithinWatermark() requires watermark to be set set on
Dataframe, but there is no watermark set."
##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3038,6 +3038,118 @@ class Dataset[T] private[sql](
dropDuplicates(colNames)
}
+ /**
+ * Returns a new Dataset with duplicates rows removed, as long as event
times of duplicated rows
+ * are within delay threshold of watermark.
+ *
+ * This only works with streaming [[Dataset]], and watermark for the input
[[Dataset]] must be
+ * set via [[withWatermark]].
+ *
+ * This will keep all data across triggers as intermediate state to drop
duplicated rows. The
+ * state will be kept to guarantee the following, "If event time of the
first arrived event is
+ * 'ts', this guarantees all duplicated rows will be dropped where these
rows are within the time
+ * range of (ts - delay threshold, ts + delay threshold)". In practice,
users are encouraged to
+ * set the delay threshold of watermark longer than max timestamp
differences among duplicated
+ * events.
+ *
+ * In addition, too late data older than watermark will be dropped to avoid
any possibility
+ * of duplicates.
+ *
+ * @group typedrel
+ * @since 3.5.0
+ */
+ def dropDuplicatesWithinWatermark(): Dataset[T] = {
+ dropDuplicatesWithinWatermark(this.columns)
+ }
+
+ /**
+ * Returns a new Dataset with duplicates rows removed, considering only the
subset of columns,
+ * as long as event times of duplicated rows are within delay threshold of
watermark.
Review Comment:
I know it is a tricky thing, but it might be better to rephrase.
##########
docs/structured-streaming-programming-guide.md:
##########
@@ -2132,6 +2132,48 @@ streamingDf <- withWatermark(streamingDf, "eventTime",
"10 seconds")
streamingDf <- dropDuplicates(streamingDf, "guid", "eventTime")
{% endhighlight %}
+</div>
Review Comment:
[From PR description]
> Only guarantee to deduplicate events within the watermark.
'within watermark delay'
##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3038,6 +3038,118 @@ class Dataset[T] private[sql](
dropDuplicates(colNames)
}
+ /**
+ * Returns a new Dataset with duplicates rows removed, as long as event
times of duplicated rows
+ * are within delay threshold of watermark.
+ *
+ * This only works with streaming [[Dataset]], and watermark for the input
[[Dataset]] must be
+ * set via [[withWatermark]].
+ *
+ * This will keep all data across triggers as intermediate state to drop
duplicated rows. The
+ * state will be kept to guarantee the following, "If event time of the
first arrived event is
+ * 'ts', this guarantees all duplicated rows will be dropped where these
rows are within the time
+ * range of (ts - delay threshold, ts + delay threshold)". In practice,
users are encouraged to
+ * set the delay threshold of watermark longer than max timestamp
differences among duplicated
+ * events.
+ *
+ * In addition, too late data older than watermark will be dropped to avoid
any possibility
+ * of duplicates.
+ *
+ * @group typedrel
+ * @since 3.5.0
+ */
+ def dropDuplicatesWithinWatermark(): Dataset[T] = {
+ dropDuplicatesWithinWatermark(this.columns)
+ }
+
+ /**
+ * Returns a new Dataset with duplicates rows removed, considering only the
subset of columns,
+ * as long as event times of duplicated rows are within delay threshold of
watermark.
+ *
+ * This only works with streaming [[Dataset]], and watermark for the input
[[Dataset]] must be
+ * set via [[withWatermark]].
+ *
+ * This will keep all data across triggers as intermediate state to drop
duplicated rows. The
+ * state will be kept to guarantee the following, "If event time of the
first arrived event is
+ * 'ts', this guarantees all duplicated rows will be dropped where these
rows are within the time
+ * range of (ts - delay threshold, ts + delay threshold)". In practice,
users are encouraged to
Review Comment:
I don't think we guarantee this condition : `(ts - delay threshold, ts +
delay threshold)`.
we likely need to rephrase it. We can look at the scaladoc towards the end
before merging this.
##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3038,6 +3038,118 @@ class Dataset[T] private[sql](
dropDuplicates(colNames)
}
+ /**
+ * Returns a new Dataset with duplicates rows removed, as long as event
times of duplicated rows
+ * are within delay threshold of watermark.
+ *
+ * This only works with streaming [[Dataset]], and watermark for the input
[[Dataset]] must be
+ * set via [[withWatermark]].
+ *
+ * This will keep all data across triggers as intermediate state to drop
duplicated rows. The
+ * state will be kept to guarantee the following, "If event time of the
first arrived event is
+ * 'ts', this guarantees all duplicated rows will be dropped where these
rows are within the time
+ * range of (ts - delay threshold, ts + delay threshold)". In practice,
users are encouraged to
+ * set the delay threshold of watermark longer than max timestamp
differences among duplicated
+ * events.
+ *
+ * In addition, too late data older than watermark will be dropped to avoid
any possibility
+ * of duplicates.
+ *
+ * @group typedrel
+ * @since 3.5.0
+ */
+ def dropDuplicatesWithinWatermark(): Dataset[T] = {
+ dropDuplicatesWithinWatermark(this.columns)
+ }
+
+ /**
+ * Returns a new Dataset with duplicates rows removed, considering only the
subset of columns,
+ * as long as event times of duplicated rows are within delay threshold of
watermark.
+ *
+ * This only works with streaming [[Dataset]], and watermark for the input
[[Dataset]] must be
+ * set via [[withWatermark]].
+ *
+ * This will keep all data across triggers as intermediate state to drop
duplicated rows. The
+ * state will be kept to guarantee the following, "If event time of the
first arrived event is
+ * 'ts', this guarantees all duplicated rows will be dropped where these
rows are within the time
+ * range of (ts - delay threshold, ts + delay threshold)". In practice,
users are encouraged to
+ * set the delay threshold of watermark longer than max timestamp
differences among duplicated
+ * events.
+ *
+ * In addition, too late data older than watermark will be dropped to avoid
any possibility
+ * of duplicates.
+ *
+ * @group typedrel
+ * @since 3.5.0
+ */
+ def dropDuplicatesWithinWatermark(colNames: Seq[String]): Dataset[T] =
withTypedPlan {
+ val resolver = sparkSession.sessionState.analyzer.resolver
Review Comment:
can we share the this with dropDuplicate()? or even better we can reuse
'Deduplicate()' node(s).
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -980,3 +980,117 @@ object StreamingDeduplicateExec {
private val EMPTY_ROW =
UnsafeProjection.create(Array[DataType](NullType)).apply(InternalRow.apply(null))
}
+
+case class StreamingDeduplicateWithinWatermarkExec(
+ keyExpressions: Seq[Attribute],
+ child: SparkPlan,
+ stateInfo: Option[StatefulOperatorStateInfo] = None,
+ eventTimeWatermarkForLateEvents: Option[Long] = None,
+ eventTimeWatermarkForEviction: Option[Long] = None)
+ extends UnaryExecNode with StateStoreWriter with WatermarkSupport {
+
+ /** Distribute by grouping attributes */
+ override def requiredChildDistribution: Seq[Distribution] = {
+ StatefulOperatorPartitioning.getCompatibleDistribution(
+ keyExpressions, getStateInfo, conf) :: Nil
+ }
+
+ private val schemaForTimeoutRow: StructType = StructType(
+ Array(StructField("expiresAt", LongType, nullable = false)))
+ private val eventTimeCol: Attribute =
WatermarkSupport.findEventTimeColumn(child.output,
+ allowMultipleEventTimeColumns = false).get
+ private val delayThresholdMillis =
eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey)
+ private val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol)
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ metrics // force lazy init at driver
+
+ child.execute().mapPartitionsWithStateStore(
+ getStateInfo,
+ keyExpressions.toStructType,
+ schemaForTimeoutRow,
+ numColsPrefixKey = 0,
+ session.sessionState,
+ Some(session.streams.stateStoreCoordinator)) { (store, iter) =>
+ val getKey = GenerateUnsafeProjection.generate(keyExpressions,
child.output)
+
+ val timeoutToUnsafeRow = UnsafeProjection.create(schemaForTimeoutRow)
+ val timeoutRow = timeoutToUnsafeRow(new
SpecificInternalRow(schemaForTimeoutRow))
+
+ val numOutputRows = longMetric("numOutputRows")
+ val numUpdatedStateRows = longMetric("numUpdatedStateRows")
+ val numRemovedStateRows = longMetric("numRemovedStateRows")
+ val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
+ val allRemovalsTimeMs = longMetric("allRemovalsTimeMs")
+ val commitTimeMs = longMetric("commitTimeMs")
+ val numDroppedDuplicateRows = longMetric("numDroppedDuplicateRows")
+
+ val baseIterator = watermarkPredicateForDataForLateEvents match {
+ case Some(predicate) => applyRemovingRowsOlderThanWatermark(iter,
predicate)
+ case None => iter
+ }
+
+ val updatesStartTimeNs = System.nanoTime
+
+ val result = baseIterator.filter { r =>
Review Comment:
When we reuse Deduplicate() logical node: mainly this part and removal would
be different based on 'dropWithinWatermark' flag. That we most of the remaining
code remains unchanged.
##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3038,6 +3038,118 @@ class Dataset[T] private[sql](
dropDuplicates(colNames)
}
+ /**
+ * Returns a new Dataset with duplicates rows removed, as long as event
times of duplicated rows
+ * are within delay threshold of watermark.
+ *
+ * This only works with streaming [[Dataset]], and watermark for the input
[[Dataset]] must be
+ * set via [[withWatermark]].
+ *
+ * This will keep all data across triggers as intermediate state to drop
duplicated rows. The
+ * state will be kept to guarantee the following, "If event time of the
first arrived event is
+ * 'ts', this guarantees all duplicated rows will be dropped where these
rows are within the time
+ * range of (ts - delay threshold, ts + delay threshold)". In practice,
users are encouraged to
+ * set the delay threshold of watermark longer than max timestamp
differences among duplicated
+ * events.
+ *
+ * In addition, too late data older than watermark will be dropped to avoid
any possibility
+ * of duplicates.
+ *
+ * @group typedrel
+ * @since 3.5.0
+ */
+ def dropDuplicatesWithinWatermark(): Dataset[T] = {
+ dropDuplicatesWithinWatermark(this.columns)
+ }
+
+ /**
+ * Returns a new Dataset with duplicates rows removed, considering only the
subset of columns,
+ * as long as event times of duplicated rows are within delay threshold of
watermark.
+ *
+ * This only works with streaming [[Dataset]], and watermark for the input
[[Dataset]] must be
+ * set via [[withWatermark]].
+ *
+ * This will keep all data across triggers as intermediate state to drop
duplicated rows. The
+ * state will be kept to guarantee the following, "If event time of the
first arrived event is
+ * 'ts', this guarantees all duplicated rows will be dropped where these
rows are within the time
+ * range of (ts - delay threshold, ts + delay threshold)". In practice,
users are encouraged to
+ * set the delay threshold of watermark longer than max timestamp
differences among duplicated
+ * events.
+ *
+ * In addition, too late data older than watermark will be dropped to avoid
any possibility
+ * of duplicates.
Review Comment:
Same here.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -679,6 +679,8 @@ object RemoveNoopUnion extends Rule[LogicalPlan] {
d.withNewChildren(Seq(simplifyUnion(u)))
case d @ Deduplicate(_, u: Union) =>
d.withNewChildren(Seq(simplifyUnion(u)))
+ case d @ DeduplicateWithinWatermark(_, u: Union) =>
Review Comment:
Rather than making this a separate logical node, we can we make the new
behavior an option in Deduplicate node? That way we don't need to distinguish
them in the implementation except in couple of places.
##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -3038,6 +3038,118 @@ class Dataset[T] private[sql](
dropDuplicates(colNames)
}
+ /**
+ * Returns a new Dataset with duplicates rows removed, as long as event
times of duplicated rows
+ * are within delay threshold of watermark.
+ *
+ * This only works with streaming [[Dataset]], and watermark for the input
[[Dataset]] must be
Review Comment:
Also mentioned above: `withWatermark()` is allowed in batch, same way this
could be allowed to. Essentially this is same as normal dropDuplicates().
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]