Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1819#discussion_r15978931
  
    --- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
    @@ -32,6 +38,113 @@ private[hive] trait HiveStrategies {
     
       val hiveContext: HiveContext
     
    +  /**
    +   * :: Experimental ::
    +   * Finds table scans that would use the Hive SerDe and replaces them 
with our own native parquet
    +   * table scan operator.
    +   *
    +   * TODO: Much of this logic is duplicated in HiveTableScan.  Ideally we 
would do some refactoring
    +   * but since this is after the code freeze for 1.1 all logic is here to 
minimize disruption.
    +   */
    +  @Experimental
    +  object ParquetConversion extends Strategy {
    +    implicit class LogicalPlanHacks(s: SchemaRDD) {
    +      def lowerCase =
    +        new SchemaRDD(s.sqlContext, LowerCaseSchema(s.logicalPlan))
    +    }
    +
    +    implicit class PhysicalPlanHacks(s: SparkPlan) {
    +      def fakeOutput(newOutput: Seq[Attribute]) = OutputFaker(newOutput, s)
    +    }
    +
    +    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    +      case PhysicalOperation(projectList, predicates, relation: 
MetastoreRelation)
    +          if relation.tableDesc.getSerdeClassName.contains("Parquet") &&
    +             hiveContext.convertMetastoreParquet =>
    +
    +        // Filter out all predicates that only deal with partition keys
    +        val partitionKeyIds = relation.partitionKeys.map(_.exprId).toSet
    +        val (pruningPredicates, otherPredicates) = predicates.partition {
    +          _.references.map(_.exprId).subsetOf(partitionKeyIds)
    +        }
    +
    +        // We are going to throw the predicates and projection back at the 
whole optimization
    +        // sequence so lets unresolve all the attributes, allowing them to 
be rebound to the
    +        // matching parquet attributes.
    +        val unresolvedOtherPredicates = otherPredicates.map(_ transform {
    +          case a: AttributeReference => UnresolvedAttribute(a.name)
    +        }).reduceOption(And).getOrElse(Literal(true))
    +
    +        val unresolvedProjection = projectList.map(_ transform {
    +          // Handle non-partitioning columns
    +          case a: AttributeReference if 
!partitionKeyIds.contains(a.exprId) => UnresolvedAttribute(a.name)
    +        })
    +
    +        if (relation.hiveQlTable.isPartitioned) {
    +          val rawPredicate = 
pruningPredicates.reduceOption(And).getOrElse(Literal(true))
    +          // Translate the predicate so that it automatically casts the 
input values to the correct
    +          // data types during evaluation
    +          val castedPredicate = rawPredicate transform {
    +            case a: AttributeReference =>
    +              val idx = relation.partitionKeys.indexWhere(a.exprId == 
_.exprId)
    +              val key = relation.partitionKeys(idx)
    +              Cast(BoundReference(idx, StringType, nullable = true), 
key.dataType)
    +          }
    +
    +          val inputData = new 
GenericMutableRow(relation.partitionKeys.size)
    +          val pruningCondition =
    +            if(codegenEnabled) {
    +              GeneratePredicate(castedPredicate)
    +            } else {
    +              InterpretedPredicate(castedPredicate)
    +            }
    +
    +          val partitions = relation.hiveQlPartitions.filter { part =>
    +            val partitionValues = part.getValues
    +            var i = 0
    +            while (i < partitionValues.size()) {
    +              inputData(i) = partitionValues(i)
    +              i += 1
    +            }
    +            pruningCondition(inputData)
    +          }
    +
    +          org.apache.spark.sql.execution.Union(
    +            partitions.par.map { p =>
    +              val partValues = p.getValues()
    +              val internalProjection = unresolvedProjection.map(_ 
transform {
    +                // Handle partitioning columns
    +                case a: AttributeReference if 
partitionKeyIds.contains(a.exprId) => {
    +                  val idx = relation.partitionKeys.indexWhere(a.exprId == 
_.exprId)
    +                  val key = relation.partitionKeys(idx)
    +
    +                  Alias(Cast(Literal(partValues.get(idx), StringType), 
key.dataType), a.name)()
    +                }
    +              })
    +
    +              hiveContext
    --- End diff --
    
    It did due to the hadoopConf getting broadcasted over and over again. 
Hence: c0d9b72


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to