This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 93af0848e46 [SPARK-44710][CONNECT] Add Dataset.dropDuplicatesWithinWatermark to Spark Connect Scala Client 93af0848e46 is described below commit 93af0848e467fe4d58c0fb1242b738931390d6f8 Author: Herman van Hovell <her...@databricks.com> AuthorDate: Tue Aug 8 15:05:18 2023 +0200 [SPARK-44710][CONNECT] Add Dataset.dropDuplicatesWithinWatermark to Spark Connect Scala Client ### What changes were proposed in this pull request? This PR adds `Dataset.dropDuplicatesWithinWatermark` to the Spark Connect Scala Client. ### Why are the changes needed? Increase compatibility with the current sql/core APIs. ### Does this PR introduce _any_ user-facing change? Yes. It adds a new method to the scala client. ### How was this patch tested? Added a new (rudimentary) test to `ClientStreamingQuerySuite`. Closes #42384 from hvanhovell/SPARK-44710. Authored-by: Herman van Hovell <her...@databricks.com> Signed-off-by: Herman van Hovell <her...@databricks.com> --- .../main/scala/org/apache/spark/sql/Dataset.scala | 39 +++++++++++---------- .../org/apache/spark/sql/ClientE2ETestSuite.scala | 20 +++++++++++ .../resources/query-tests/queries/distinct.json | 3 +- .../query-tests/queries/distinct.proto.bin | Bin 50 -> 52 bytes .../query-tests/queries/dropDuplicates.json | 3 +- .../query-tests/queries/dropDuplicates.proto.bin | Bin 50 -> 52 bytes .../queries/dropDuplicates_names_array.json | 3 +- .../queries/dropDuplicates_names_array.proto.bin | Bin 55 -> 57 bytes .../queries/dropDuplicates_names_seq.json | 3 +- .../queries/dropDuplicates_names_seq.proto.bin | Bin 54 -> 56 bytes .../queries/dropDuplicates_varargs.json | 3 +- .../queries/dropDuplicates_varargs.proto.bin | Bin 58 -> 60 bytes 12 files changed, 51 insertions(+), 23 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala index 8a7dce3987a..5f263903c8b 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2399,6 +2399,19 @@ class Dataset[T] private[sql] ( .addAllColumnNames(cols.asJava) } + private def buildDropDuplicates( + columns: Option[Seq[String]], + withinWaterMark: Boolean): Dataset[T] = sparkSession.newDataset(encoder) { builder => + val dropBuilder = builder.getDeduplicateBuilder + .setInput(plan.getRoot) + .setWithinWatermark(withinWaterMark) + if (columns.isDefined) { + dropBuilder.addAllColumnNames(columns.get.asJava) + } else { + dropBuilder.setAllColumnsAsKeys(true) + } + } + /** * Returns a new Dataset that contains only the unique rows from this Dataset. This is an alias * for `distinct`. @@ -2406,11 +2419,7 @@ class Dataset[T] private[sql] ( * @group typedrel * @since 3.4.0 */ - def dropDuplicates(): Dataset[T] = sparkSession.newDataset(encoder) { builder => - builder.getDeduplicateBuilder - .setInput(plan.getRoot) - .setAllColumnsAsKeys(true) - } + def dropDuplicates(): Dataset[T] = buildDropDuplicates(None, withinWaterMark = false) /** * (Scala-specific) Returns a new Dataset with duplicate rows removed, considering only the @@ -2419,11 +2428,8 @@ class Dataset[T] private[sql] ( * @group typedrel * @since 3.4.0 */ - def dropDuplicates(colNames: Seq[String]): Dataset[T] = sparkSession.newDataset(encoder) { - builder => - builder.getDeduplicateBuilder - .setInput(plan.getRoot) - .addAllColumnNames(colNames.asJava) + def dropDuplicates(colNames: Seq[String]): Dataset[T] = { + buildDropDuplicates(Option(colNames), withinWaterMark = false) } /** @@ -2443,16 +2449,14 @@ class Dataset[T] private[sql] ( */ @scala.annotation.varargs def dropDuplicates(col1: String, cols: String*): Dataset[T] = { - val colNames: Seq[String] = col1 +: cols - dropDuplicates(colNames) + dropDuplicates(col1 +: cols) } - def dropDuplicatesWithinWatermark(): Dataset[T] = { - dropDuplicatesWithinWatermark(this.columns) - } + def dropDuplicatesWithinWatermark(): Dataset[T] = + buildDropDuplicates(None, withinWaterMark = true) def dropDuplicatesWithinWatermark(colNames: Seq[String]): Dataset[T] = { - throw new UnsupportedOperationException("dropDuplicatesWithinWatermark is not implemented.") + buildDropDuplicates(Option(colNames), withinWaterMark = true) } def dropDuplicatesWithinWatermark(colNames: Array[String]): Dataset[T] = { @@ -2461,8 +2465,7 @@ class Dataset[T] private[sql] ( @scala.annotation.varargs def dropDuplicatesWithinWatermark(col1: String, cols: String*): Dataset[T] = { - val colNames: Seq[String] = col1 +: cols - dropDuplicatesWithinWatermark(colNames) + dropDuplicatesWithinWatermark(col1 +: cols) } /** diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala index ebd3d037bba..074cf170dd3 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala @@ -1183,6 +1183,26 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateM val joined = ds1.joinWith(ds2, $"a.value._1" === $"b.value._2", "inner") checkSameResult(Seq((Some((2, 3)), Some((1, 2)))), joined) } + + test("dropDuplicatesWithinWatermark not supported in batch DataFrame") { + def testAndVerify(df: Dataset[_]): Unit = { + val exc = intercept[AnalysisException] { + df.write.format("noop").mode(SaveMode.Append).save() + } + + assert(exc.getMessage.contains("dropDuplicatesWithinWatermark is not supported")) + assert(exc.getMessage.contains("batch DataFrames/DataSets")) + } + + val result = spark.range(10).dropDuplicatesWithinWatermark() + testAndVerify(result) + + val result2 = spark + .range(10) + .withColumn("newcol", col("id")) + .dropDuplicatesWithinWatermark("newcol") + testAndVerify(result2) + } } private[sql] case class ClassData(a: String, b: Int) diff --git a/connector/connect/common/src/test/resources/query-tests/queries/distinct.json b/connector/connect/common/src/test/resources/query-tests/queries/distinct.json index ae796b52035..15c320d462b 100644 --- a/connector/connect/common/src/test/resources/query-tests/queries/distinct.json +++ b/connector/connect/common/src/test/resources/query-tests/queries/distinct.json @@ -11,6 +11,7 @@ "schema": "struct\u003cid:bigint,a:int,b:double\u003e" } }, - "allColumnsAsKeys": true + "allColumnsAsKeys": true, + "withinWatermark": false } } \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/distinct.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/distinct.proto.bin index 07430c43831..078223b1f3e 100644 Binary files a/connector/connect/common/src/test/resources/query-tests/queries/distinct.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/distinct.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates.json b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates.json index ae796b52035..15c320d462b 100644 --- a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates.json +++ b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates.json @@ -11,6 +11,7 @@ "schema": "struct\u003cid:bigint,a:int,b:double\u003e" } }, - "allColumnsAsKeys": true + "allColumnsAsKeys": true, + "withinWatermark": false } } \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates.proto.bin index 07430c43831..078223b1f3e 100644 Binary files a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_array.json b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_array.json index e72e23c86ca..23df6972a51 100644 --- a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_array.json +++ b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_array.json @@ -11,6 +11,7 @@ "schema": "struct\u003cid:bigint,a:int,b:double\u003e" } }, - "columnNames": ["a", "id"] + "columnNames": ["a", "id"], + "withinWatermark": false } } \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_array.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_array.proto.bin index c8e3885fbf8..3bdbeb0d386 100644 Binary files a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_array.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_array.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_seq.json b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_seq.json index 754cecac4b2..6ef72770b9a 100644 --- a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_seq.json +++ b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_seq.json @@ -11,6 +11,7 @@ "schema": "struct\u003cid:bigint,a:int,b:double\u003e" } }, - "columnNames": ["a", "b"] + "columnNames": ["a", "b"], + "withinWatermark": false } } \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_seq.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_seq.proto.bin index 1a2d635e58e..65b4942c568 100644 Binary files a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_seq.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_names_seq.proto.bin differ diff --git a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_varargs.json b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_varargs.json index c4a8df30c58..2b6d46a3135 100644 --- a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_varargs.json +++ b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_varargs.json @@ -11,6 +11,7 @@ "schema": "struct\u003cid:bigint,a:int,b:double\u003e" } }, - "columnNames": ["a", "b", "id"] + "columnNames": ["a", "b", "id"], + "withinWatermark": false } } \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_varargs.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_varargs.proto.bin index 719a373c2e3..57f0d7e5afa 100644 Binary files a/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_varargs.proto.bin and b/connector/connect/common/src/test/resources/query-tests/queries/dropDuplicates_varargs.proto.bin differ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org