Github user marmbrus commented on a diff in the pull request:
https://github.com/apache/spark/pull/1819#discussion_r15978931
--- Diff:
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -32,6 +38,113 @@ private[hive] trait HiveStrategies {
val hiveContext: HiveContext
+ /**
+ * :: Experimental ::
+ * Finds table scans that would use the Hive SerDe and replaces them
with our own native parquet
+ * table scan operator.
+ *
+ * TODO: Much of this logic is duplicated in HiveTableScan. Ideally we
would do some refactoring
+ * but since this is after the code freeze for 1.1 all logic is here to
minimize disruption.
+ */
+ @Experimental
+ object ParquetConversion extends Strategy {
+ implicit class LogicalPlanHacks(s: SchemaRDD) {
+ def lowerCase =
+ new SchemaRDD(s.sqlContext, LowerCaseSchema(s.logicalPlan))
+ }
+
+ implicit class PhysicalPlanHacks(s: SparkPlan) {
+ def fakeOutput(newOutput: Seq[Attribute]) = OutputFaker(newOutput, s)
+ }
+
+ def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case PhysicalOperation(projectList, predicates, relation:
MetastoreRelation)
+ if relation.tableDesc.getSerdeClassName.contains("Parquet") &&
+ hiveContext.convertMetastoreParquet =>
+
+ // Filter out all predicates that only deal with partition keys
+ val partitionKeyIds = relation.partitionKeys.map(_.exprId).toSet
+ val (pruningPredicates, otherPredicates) = predicates.partition {
+ _.references.map(_.exprId).subsetOf(partitionKeyIds)
+ }
+
+ // We are going to throw the predicates and projection back at the
whole optimization
+ // sequence so lets unresolve all the attributes, allowing them to
be rebound to the
+ // matching parquet attributes.
+ val unresolvedOtherPredicates = otherPredicates.map(_ transform {
+ case a: AttributeReference => UnresolvedAttribute(a.name)
+ }).reduceOption(And).getOrElse(Literal(true))
+
+ val unresolvedProjection = projectList.map(_ transform {
+ // Handle non-partitioning columns
+ case a: AttributeReference if
!partitionKeyIds.contains(a.exprId) => UnresolvedAttribute(a.name)
+ })
+
+ if (relation.hiveQlTable.isPartitioned) {
+ val rawPredicate =
pruningPredicates.reduceOption(And).getOrElse(Literal(true))
+ // Translate the predicate so that it automatically casts the
input values to the correct
+ // data types during evaluation
+ val castedPredicate = rawPredicate transform {
+ case a: AttributeReference =>
+ val idx = relation.partitionKeys.indexWhere(a.exprId ==
_.exprId)
+ val key = relation.partitionKeys(idx)
+ Cast(BoundReference(idx, StringType, nullable = true),
key.dataType)
+ }
+
+ val inputData = new
GenericMutableRow(relation.partitionKeys.size)
+ val pruningCondition =
+ if(codegenEnabled) {
+ GeneratePredicate(castedPredicate)
+ } else {
+ InterpretedPredicate(castedPredicate)
+ }
+
+ val partitions = relation.hiveQlPartitions.filter { part =>
+ val partitionValues = part.getValues
+ var i = 0
+ while (i < partitionValues.size()) {
+ inputData(i) = partitionValues(i)
+ i += 1
+ }
+ pruningCondition(inputData)
+ }
+
+ org.apache.spark.sql.execution.Union(
+ partitions.par.map { p =>
+ val partValues = p.getValues()
+ val internalProjection = unresolvedProjection.map(_
transform {
+ // Handle partitioning columns
+ case a: AttributeReference if
partitionKeyIds.contains(a.exprId) => {
+ val idx = relation.partitionKeys.indexWhere(a.exprId ==
_.exprId)
+ val key = relation.partitionKeys(idx)
+
+ Alias(Cast(Literal(partValues.get(idx), StringType),
key.dataType), a.name)()
+ }
+ })
+
+ hiveContext
--- End diff --
It did due to the hadoopConf getting broadcasted over and over again.
Hence: c0d9b72
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]