Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1759#discussion_r15771735
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/TypedSql.scala ---
    @@ -0,0 +1,202 @@
    +package org.apache.spark.sql
    +
    +import org.apache.spark.sql.catalyst.analysis._
    +import org.apache.spark.sql.catalyst.expressions.{Expression, ScalaUdf, 
AttributeReference}
    +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
    +import org.apache.spark.sql.catalyst.types._
    +
    +import scala.language.experimental.macros
    +import scala.language.existentials
    +
    +import records._
    +import Macros.RecordMacros
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.{SqlParser, ScalaReflection}
    +
    +/**
    + * A collection of Scala macros for working with SQL in a type-safe way.
    + */
    +private[sql] object SQLMacros {
    +  import scala.reflect.macros._
    +
    +  def sqlImpl(c: Context)(args: c.Expr[Any]*) =
    +    new Macros[c.type](c).sql(args)
    +
    +  case class Schema(dataType: DataType, nullable: Boolean)
    +
    +  class Macros[C <: Context](val c: C) extends ScalaReflection {
    +    val universe: c.universe.type = c.universe
    +
    +    import c.universe._
    +
    +    val rowTpe = tq"_root_.org.apache.spark.sql.catalyst.expressions.Row"
    +
    +    val rMacros = new RecordMacros[c.type](c)
    +
    +    trait InterpolatedItem {
    +      def placeholderName: String
    +      def registerCode: Tree
    +      def localRegister(catalog: Catalog, registry: FunctionRegistry)
    +    }
    +
    +    case class InterpolatedUDF(index: Int, expr: c.Expr[Any], returnType: 
DataType)
    +      extends InterpolatedItem{
    +
    +      val placeholderName = s"func$index"
    +
    +      def registerCode = q"""registerFunction($placeholderName, $expr)"""
    +
    +      def localRegister(catalog: Catalog, registry: FunctionRegistry) = {
    +        registry.registerFunction(
    +          placeholderName, (_: Seq[Expression]) => ScalaUdf(null, 
returnType, Nil))
    +      }
    +    }
    +
    +    case class InterpolatedTable(index: Int, expr: c.Expr[Any], schema: 
StructType)
    +      extends InterpolatedItem{
    +
    +      val placeholderName = s"table$index"
    +
    +      def registerCode = q"""$expr.registerTempTable($placeholderName)"""
    +
    +      def localRegister(catalog: Catalog, registry: FunctionRegistry) = {
    +        catalog.registerTable(None, placeholderName, 
LocalRelation(schema.toAttributes :_*))
    +      }
    +    }
    +
    +    case class RecSchema(name: String, index: Int, cType: DataType, tpe: 
Type)
    +
    +    def sql(args: Seq[c.Expr[Any]]) = {
    +
    +      val q"""
    +        $interpName(
    +          scala.StringContext.apply(..$rawParts))""" = c.prefix.tree
    +
    +      //rawParts.map(_.toString).foreach(println)
    +
    +      val parts =
    +        rawParts.map(
    +          _.toString.stripPrefix("\"")
    +           .replaceAll("\\\\", "")
    +           .stripSuffix("\""))
    +
    +      val interpolatedArguments = args.zipWithIndex.map { case (arg, i) =>
    +        // println(arg + " " + arg.actualType)
    +        arg.actualType match {
    +          case TypeRef(_, _, Seq(schemaType)) =>
    +            InterpolatedTable(i, arg, 
schemaFor(schemaType).dataType.asInstanceOf[StructType])
    +          case TypeRef(_, _, Seq(inputType, outputType)) =>
    +            InterpolatedUDF(i, arg, schemaFor(outputType).dataType)
    +        }
    +      }
    +
    +      val query = parts(0) + args.indices.map { i =>
    +        interpolatedArguments(i).placeholderName + parts(i + 1)
    +      }.mkString("")
    +
    +      val parser = new SqlParser()
    +      val logicalPlan = parser(query)
    +      val catalog = new SimpleCatalog(true)
    +      val functionRegistry = new SimpleFunctionRegistry
    +      val analyzer = new Analyzer(catalog, functionRegistry, true)
    +
    +      interpolatedArguments.foreach(_.localRegister(catalog, 
functionRegistry))
    +      val analyzedPlan = analyzer(logicalPlan)
    +
    +      val fields = analyzedPlan.output.map(attr => (attr.name, 
attr.dataType))
    +      val record = genRecord(q"row", fields)
    +
    +      val tree = q"""
    +        ..${interpolatedArguments.map(_.registerCode)}
    +        val result = sql($query)
    +        result.map(row => $record)
    +      """
    +
    +      // println(tree)
    +      c.Expr(tree)
    +    }
    +
    +    // TODO: Handle nullable fields
    --- End diff --
    
    Yeah, this is a good question about our interfaces.  I see a couple of ways 
we could handle this:
     - Have a separate isNull method and make calling the primitive accessor 
invalid when that method returns true.  If a user fails to check we throw an 
exception.
     - Box everything
     - Use nullability information as follows:  Return `Option[Type]` when the 
attribute is nullable, return the primitive when it is not nullable.  Right now 
we don't do a great job in the optimizer of propagating nullability 
information, but overtime this should get better.  That way we could avoid the 
cost of `Option` on any attribute that was involved in an predicate that would 
prevent it from being null.
    
    Personally I like the last option the best.  It makes it very explicit to 
users when things could be null, and still gives them a way to get 
high-performance access when a primitive value cannot be null.  It however does 
introduce some possible confusion (i.e. changing the query in subtle ways, such 
as adding a predicate, could change the return types). This approach also 
requires the most work to be done improving catalyst's analysis.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to