asfgit closed pull request #23262: [SPARK-26312][SQL]Replace
RDDConversions.rowToRowRdd with RowEncoder to improve its conversion performance
URL: https://github.com/apache/spark/pull/23262
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index e214bfd050410..49fb288fdea6a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -18,54 +18,14 @@
package org.apache.spark.sql.execution
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Encoder, Row, SparkSession}
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.{Encoder, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning,
UnknownPartitioning}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.types.DataType
-
-object RDDConversions {
- def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]):
RDD[InternalRow] = {
- data.mapPartitions { iterator =>
- val numColumns = outputTypes.length
- val mutableRow = new GenericInternalRow(numColumns)
- val converters =
outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
- iterator.map { r =>
- var i = 0
- while (i < numColumns) {
- mutableRow(i) = converters(i)(r.productElement(i))
- i += 1
- }
-
- mutableRow
- }
- }
- }
-
- /**
- * Convert the objects inside Row into the types Catalyst expected.
- */
- def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]):
RDD[InternalRow] = {
- data.mapPartitions { iterator =>
- val numColumns = outputTypes.length
- val mutableRow = new GenericInternalRow(numColumns)
- val converters =
outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
- iterator.map { r =>
- var i = 0
- while (i < numColumns) {
- mutableRow(i) = converters(i)(r(i))
- i += 1
- }
-
- mutableRow
- }
- }
- }
-}
object ExternalRDD {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index c6000442fae76..b304e2da6e1cf 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -29,11 +29,11 @@ import
org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, Quali
import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir,
InsertIntoTable, LogicalPlan, Project}
-import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.command._
@@ -416,7 +416,10 @@ case class DataSourceStrategy(conf: SQLConf) extends
Strategy with Logging with
output: Seq[Attribute],
rdd: RDD[Row]): RDD[InternalRow] = {
if (relation.relation.needConversion) {
- execution.RDDConversions.rowToRowRdd(rdd, output.map(_.dataType))
+ val converters = RowEncoder(StructType.fromAttributes(output))
+ rdd.mapPartitions { iterator =>
+ iterator.map(converters.toRow)
+ }
} else {
rdd.asInstanceOf[RDD[InternalRow]]
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]