c21 commented on a change in pull request #34291:
URL: https://github.com/apache/spark/pull/34291#discussion_r730323473
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala
##########
@@ -298,17 +299,22 @@ private[sql] case class JDBCRelation(
requiredColumns: Array[String],
finalSchema: StructType,
filters: Array[Filter],
- groupByColumns: Option[Array[String]]): RDD[Row] = {
+ groupByColumns: Option[Array[String]],
+ limit: Option[Limit]): RDD[Row] = {
+ // If limit is pushed down, only a limited number of rows will be
returned. PartitionInfo will
+ // be ignored and the query will be done in one task.
Review comment:
just wondering would only one task be a problem, when reading big JDBC
data source? Is it possible to lift the restriction to allow more than one task
in the future?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -225,6 +226,31 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan]
with PredicateHelper {
withProjection
}
+ def applyLimit(plan: LogicalPlan): LogicalPlan = plan.transform {
+ case globalLimit @ GlobalLimit(_,
+ LocalLimit(limitExpr, DataSourceV2ScanRelation(_, scan, _))) =>
+ val supportsPushDownLimit = scan match {
+ case _: SupportsPushDownLimit => true
+ case v1: V1ScanWrapper =>
+ v1.v1Scan match {
+ case _: SupportsPushDownLimit => true
+ case _ => false
+ }
+ case _ => false
+ }
+ if (supportsPushDownLimit) {
+ assert(limitExpr.isInstanceOf[Literal] &&
+ limitExpr.asInstanceOf[Literal].value.isInstanceOf[Integer],
+ "Limit has to be an Integer")
+ val value = limitExpr.asInstanceOf[Literal].value.asInstanceOf[Integer]
+ val limit = LogicalExpressions.limit(LiteralValue(value, IntegerType))
+ PushDownUtils.pushLimit(scan, limit)
+ globalLimit
Review comment:
Don't we want to change the query plan? Why we need to keep the limit
operator in Spark?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -149,11 +151,14 @@ case class RowDataSourceScanExec(
handledFilters
}
+ val limitStr = if (limit != null) s"LIMIT ${limit.number}" else "null"
Review comment:
if limit is not pushed down, shall we set `limitStr` as others - `[]`?
Showing `PushedLimit: null` in query plan seems not very cosmetic.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScan.scala
##########
@@ -28,10 +29,19 @@ case class JDBCScan(
prunedSchema: StructType,
pushedFilters: Array[Filter],
pushedAggregateColumn: Array[String] = Array(),
- groupByColumns: Option[Array[String]]) extends V1Scan {
+ groupByColumns: Option[Array[String]]) extends V1Scan with
SupportsPushDownLimit {
override def readSchema(): StructType = prunedSchema
+ private var limit: Option[Limit] = None
+
+ override def pushLimit(limit: Limit): Unit = {
+ if (!relation.jdbcOptions.pushDownLimit) return
+ this.limit = Some(limit)
Review comment:
nit: this can just be
```
if (relation.jdbcOptions.pushDownLimit) {
this.limit = Some(limit)
}
```
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
##########
@@ -274,6 +279,17 @@ private[jdbc] class JDBCRDD(
}
}
+ /**
+ * A LIMIT clause representing pushed-down limit.
+ */
+ private def getLimitClause: String = {
+ if (limit.nonEmpty) {
Review comment:
nit: we can be more scala here:
```
limit.map(l => s"LIMIT ${l.number.value}").getOrElse("")
```
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCScan.scala
##########
@@ -28,10 +29,19 @@ case class JDBCScan(
prunedSchema: StructType,
pushedFilters: Array[Filter],
pushedAggregateColumn: Array[String] = Array(),
- groupByColumns: Option[Array[String]]) extends V1Scan {
+ groupByColumns: Option[Array[String]]) extends V1Scan with
SupportsPushDownLimit {
override def readSchema(): StructType = prunedSchema
+ private var limit: Option[Limit] = None
+
+ override def pushLimit(limit: Limit): Unit = {
+ if (!relation.jdbcOptions.pushDownLimit) return
+ this.limit = Some(limit)
+ }
+
+ override def pushedLimit: Limit = if (limit.nonEmpty) limit.get else null
Review comment:
Shall we define the method returns `Option[Limit]` instead of `Limit`? A
lot of Spark code assumes return value is non-null, and I am afraid of this
might introduce unnecessary null pointer exception in the future.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##########
@@ -104,6 +105,7 @@ case class RowDataSourceScanExec(
filters: Set[Filter],
handledFilters: Set[Filter],
aggregation: Option[Aggregation],
+ limit: Limit,
Review comment:
Shall we define it as `Option[Limit]` similar to other push down e.g.
`Option[Aggregation]`?
##########
File path: docs/sql-data-sources-jdbc.md
##########
@@ -246,6 +246,15 @@ logging into the data sources.
<td>read</td>
</tr>
+ <tr>
+ <td><code>pushDownLimit</code></td>
+ <td><code>false</code></td>
+ <td>
+ The option to enable or disable LIMIT push-down into the JDBC data
source. The default value is false, in which case Spark does not push down
LIMIT to the JDBC data source. Otherwise, if sets to true, LIMIT is pushed down
to the JDBC data source. Please note that if LIMIT is pushed down, since only a
limited number of rows is returned, JDBC doesn’t honor partitions any more and
the query is done in one task. SPARK still applies LIMIT on the result from
data source even if LIMIT is pushed down.
Review comment:
nit: `if sets to true` -> `if value sets to true`
##########
File path: sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
##########
@@ -92,6 +93,45 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession
with ExplainSuiteHel
checkAnswer(sql("SELECT name, id FROM h2.test.people"), Seq(Row("fred",
1), Row("mary", 2)))
}
+ test("simple scan with LIMIT") {
+ val df1 = spark.read.table("h2.test.employee").limit(4)
+ checkLimitResult(df1)
+
+ val df2 = spark.read
+ .option("partitionColumn", "dept")
+ .option("lowerBound", "1")
+ .option("upperBound", "2")
+ .option("numPartitions", "2")
+ .table("h2.test.employee")
+ .limit(4)
+ checkLimitResult(df2)
+ }
+
+ private def checkLimitResult(df: DataFrame): Unit = {
+ df.queryExecution.optimizedPlan.collect {
+ case _: DataSourceV2ScanRelation =>
+ val expected_plan_fragment =
+ "PushedLimit: LIMIT 4"
+ checkKeywordsExistsInExplain(df, expected_plan_fragment)
+ }
+
+ checkAnswer(df, Seq(Row(1, "amy", 10000.00, 1000.0),
+ Row(2, "alex", 12000.00, 1200.0),
+ Row(1, "cathy", 9000.00, 1200.0),
+ Row(2, "david", 10000.00, 1300.0)))
+ }
+
+ test("simple scan with LIMIT and partition") {
+ val df = spark.read
+ .option("partitionColumn", "dept")
+ .option("lowerBound", "1")
+ .option("upperBound", "2")
+ .option("numPartitions", "2")
+ .table("h2.test.employee")
+ .limit(4)
+
Review comment:
missing call for `checkLimitResult`?
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/expressions/expressions.scala
##########
@@ -357,3 +361,8 @@ private[sql] object SortValue {
None
}
}
+
+private[sql] final case class LimitValue(number: Literal[_]) extends Limit {
+ assert(number.value.isInstanceOf[Integer], "LimitValue has to be an Integer")
Review comment:
nit: normally we use `require()` other than `assert()` for any
expectation of input parameters.
Also we can print out the `number.value` in error message for ease of
debugging.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
##########
@@ -225,6 +226,31 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan]
with PredicateHelper {
withProjection
}
+ def applyLimit(plan: LogicalPlan): LogicalPlan = plan.transform {
+ case globalLimit @ GlobalLimit(_,
+ LocalLimit(limitExpr, DataSourceV2ScanRelation(_, scan, _))) =>
+ val supportsPushDownLimit = scan match {
+ case _: SupportsPushDownLimit => true
+ case v1: V1ScanWrapper =>
+ v1.v1Scan match {
+ case _: SupportsPushDownLimit => true
+ case _ => false
+ }
+ case _ => false
+ }
+ if (supportsPushDownLimit) {
+ assert(limitExpr.isInstanceOf[Literal] &&
+ limitExpr.asInstanceOf[Literal].value.isInstanceOf[Integer],
+ "Limit has to be an Integer")
+ val value = limitExpr.asInstanceOf[Literal].value.asInstanceOf[Integer]
+ val limit = LogicalExpressions.limit(LiteralValue(value, IntegerType))
+ PushDownUtils.pushLimit(scan, limit)
+ globalLimit
Review comment:
Don't we want to change the query plan for JDBC data source? Why we need
to keep the limit operator in Spark for JDBC data source?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]