asfgit closed pull request #23262: [SPARK-26312][SQL]Replace 
RDDConversions.rowToRowRdd with RowEncoder to improve its conversion performance
URL: https://github.com/apache/spark/pull/23262
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index e214bfd050410..49fb288fdea6a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -18,54 +18,14 @@
 package org.apache.spark.sql.execution
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Encoder, Row, SparkSession}
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.{Encoder, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.types.DataType
-
-object RDDConversions {
-  def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): 
RDD[InternalRow] = {
-    data.mapPartitions { iterator =>
-      val numColumns = outputTypes.length
-      val mutableRow = new GenericInternalRow(numColumns)
-      val converters = 
outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
-      iterator.map { r =>
-        var i = 0
-        while (i < numColumns) {
-          mutableRow(i) = converters(i)(r.productElement(i))
-          i += 1
-        }
-
-        mutableRow
-      }
-    }
-  }
-
-  /**
-   * Convert the objects inside Row into the types Catalyst expected.
-   */
-  def rowToRowRdd(data: RDD[Row], outputTypes: Seq[DataType]): 
RDD[InternalRow] = {
-    data.mapPartitions { iterator =>
-      val numColumns = outputTypes.length
-      val mutableRow = new GenericInternalRow(numColumns)
-      val converters = 
outputTypes.map(CatalystTypeConverters.createToCatalystConverter)
-      iterator.map { r =>
-        var i = 0
-        while (i < numColumns) {
-          mutableRow(i) = converters(i)(r(i))
-          i += 1
-        }
-
-        mutableRow
-      }
-    }
-  }
-}
 
 object ExternalRDD {
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index c6000442fae76..b304e2da6e1cf 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -29,11 +29,11 @@ import 
org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, Quali
 import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, 
InsertIntoTable, LogicalPlan, Project}
-import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
 import org.apache.spark.sql.execution.command._
@@ -416,7 +416,10 @@ case class DataSourceStrategy(conf: SQLConf) extends 
Strategy with Logging with
       output: Seq[Attribute],
       rdd: RDD[Row]): RDD[InternalRow] = {
     if (relation.relation.needConversion) {
-      execution.RDDConversions.rowToRowRdd(rdd, output.map(_.dataType))
+      val converters = RowEncoder(StructType.fromAttributes(output))
+      rdd.mapPartitions { iterator =>
+        iterator.map(converters.toRow)
+      }
     } else {
       rdd.asInstanceOf[RDD[InternalRow]]
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to