[GitHub] spark pull request #20797: [SPARK-23583][SQL] Invoke should support interpre...

2018-03-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20797#discussion_r177419935
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -266,8 +266,31 @@ case class Invoke(
   override def nullable: Boolean = targetObject.nullable || needNullCheck 
|| returnNullable
   override def children: Seq[Expression] = targetObject +: arguments
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported.")
+  override def eval(input: InternalRow): Any = {
--- End diff --

I basically have the same comments here as for `StaticInvoke`: Why do we 
need to resolve the method at runtime?  Why are you using 
`CodeGenerator.defaultValue(dataType)` to determine if something is a primitive?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20753: [SPARK-23582][SQL] StaticInvoke should support in...

2018-03-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20753#discussion_r177413074
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -188,8 +189,30 @@ case class StaticInvoke(
   override def nullable: Boolean = needNullCheck || returnNullable
   override def children: Seq[Expression] = arguments
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported.")
+  override def eval(input: InternalRow): Any = {
+val args = arguments.map(e => e.eval(input).asInstanceOf[Object])
+val argClasses = ScalaReflection.expressionJavaClasses(arguments)
+val cls = if (staticObject.getName == objectName) {
+  staticObject
+} else {
+  Utils.classForName(objectName)
+}
+val method = cls.getDeclaredMethod(functionName, argClasses : _*)
+if (needNullCheck && args.exists(_ == null)) {
+  // return null if one of arguments is null
+  null
+} else {
+  val ret = method.invoke(null, args: _*)
+
+  if (CodeGenerator.defaultValue(dataType) == "null") {
--- End diff --

This is a bit scary. Can you just use 
`ScalaReflection.typeBoxedJavaMapping`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20753: [SPARK-23582][SQL] StaticInvoke should support in...

2018-03-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20753#discussion_r177411749
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -188,8 +189,30 @@ case class StaticInvoke(
   override def nullable: Boolean = needNullCheck || returnNullable
   override def children: Seq[Expression] = arguments
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported.")
+  override def eval(input: InternalRow): Any = {
+val args = arguments.map(e => e.eval(input).asInstanceOf[Object])
+val argClasses = ScalaReflection.expressionJavaClasses(arguments)
+val cls = if (staticObject.getName == objectName) {
+  staticObject
+} else {
+  Utils.classForName(objectName)
+}
+val method = cls.getDeclaredMethod(functionName, argClasses : _*)
--- End diff --

Why are we resolving this during evaluation, shouldn't we be able to 
determine this statically?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20756: [SPARK-23593][SQL] Add interpreted execution for ...

2018-03-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20756#discussion_r177411427
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1261,8 +1261,42 @@ case class InitializeJavaBean(beanInstance: 
Expression, setters: Map[String, Exp
   override def children: Seq[Expression] = beanInstance +: 
setters.values.toSeq
   override def dataType: DataType = beanInstance.dataType
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported.")
+  private lazy val resolvedSetters = {
+assert(beanInstance.dataType.isInstanceOf[ObjectType])
+
+val ObjectType(beanClass) = beanInstance.dataType
+setters.map {
+  case (name, expr) =>
+// Looking for known type mapping first, then using Class attached 
in `ObjectType`.
+// Finally also looking for general `Object`-type parameter for 
generic methods.
+val paramTypes = 
CallMethodViaReflection.typeMapping.getOrElse(expr.dataType,
+Seq(expr.dataType.asInstanceOf[ObjectType].cls)) ++ 
Seq(classOf[Object])
+val methods = paramTypes.flatMap { fieldClass =>
+  try {
+Some(beanClass.getDeclaredMethod(name, fieldClass))
+  } catch {
+case e: NoSuchMethodException => None
+  }
+}
+if (methods.isEmpty) {
+  throw new NoSuchMethodException(s"""A method named "$name" is 
not declared """ +
+"in any enclosing class nor any supertype")
+}
+methods.head -> expr
+}
+  }
+
+  override def eval(input: InternalRow): Any = {
+val instance = beanInstance.eval(input)
+if (instance != null) {
+  val bean = instance.asInstanceOf[Object]
+  resolvedSetters.foreach {
+case (setter, expr) =>
+  setter.invoke(bean, expr.eval(input).asInstanceOf[AnyRef])
--- End diff --

@viirya can you add a null check to both the interpreted and code generated 
version? Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20912: [SPARK-23794][SQL] Make UUID as stateful expression

2018-03-27 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/20912
  
Merging to master. Thanks for the quick turnaround!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20903: [SPARK-23599][SQL][Backport-2.3] Use RandomUUIDGenerator...

2018-03-26 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/20903
  
Merging to 2.3. Thanks! Can you close?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20771: [SPARK-23587][SQL] Add interpreted execution for ...

2018-03-26 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20771#discussion_r177022135
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -599,8 +610,86 @@ case class MapObjects private(
 
   override def children: Seq[Expression] = lambdaFunction :: inputData :: 
Nil
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported")
+  // The data with UserDefinedType are actually stored with the data type 
of its sqlType.
+  // When we want to apply MapObjects on it, we have to use it.
+  lazy private val inputDataType = inputData.dataType match {
+case u: UserDefinedType[_] => u.sqlType
+case _ => inputData.dataType
+  }
+
+  private def executeFuncOnCollection(inputCollection: Seq[_]): Seq[_] = {
+inputCollection.map { element =>
+  val row = InternalRow.fromSeq(Seq(element))
+  lambdaFunction.eval(row)
+}
+  }
+
+  // Executes lambda function on input collection.
+  private lazy val executeFunc: Any => Seq[_] = inputDataType match {
+case ObjectType(cls) if classOf[Seq[_]].isAssignableFrom(cls) =>
+  x => executeFuncOnCollection(x.asInstanceOf[Seq[_]])
+case ObjectType(cls) if cls.isArray =>
+  x => executeFuncOnCollection(x.asInstanceOf[Array[_]].toSeq)
+case ObjectType(cls) if 
classOf[java.util.List[_]].isAssignableFrom(cls) =>
+  x => 
executeFuncOnCollection(x.asInstanceOf[java.util.List[_]].asScala)
+case ObjectType(cls) if cls == classOf[Object] =>
+  if (cls.isArray) {
+x => executeFuncOnCollection(x.asInstanceOf[Array[_]].toSeq)
+  } else {
+x => executeFuncOnCollection(x.asInstanceOf[Seq[_]])
+  }
+case ArrayType(et, _) =>
+  x => executeFuncOnCollection(x.asInstanceOf[ArrayData].array)
+  }
+
+  // Converts the processed collection to custom collection class if any.
+  private lazy val getResults: Seq[_] => Any = customCollectionCls match {
--- End diff --

Can you add a catch all clause that throws a nice exception to this match 
statement?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20771: [SPARK-23587][SQL] Add interpreted execution for ...

2018-03-26 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20771#discussion_r177021865
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -599,8 +610,86 @@ case class MapObjects private(
 
   override def children: Seq[Expression] = lambdaFunction :: inputData :: 
Nil
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported")
+  // The data with UserDefinedType are actually stored with the data type 
of its sqlType.
+  // When we want to apply MapObjects on it, we have to use it.
+  lazy private val inputDataType = inputData.dataType match {
+case u: UserDefinedType[_] => u.sqlType
+case _ => inputData.dataType
+  }
+
+  private def executeFuncOnCollection(inputCollection: Seq[_]): Seq[_] = {
+inputCollection.map { element =>
+  val row = InternalRow.fromSeq(Seq(element))
+  lambdaFunction.eval(row)
+}
+  }
+
+  // Executes lambda function on input collection.
+  private lazy val executeFunc: Any => Seq[_] = inputDataType match {
--- End diff --

I am wondering if we shouldn't just return an `Iterator` instead of a 
`Seq`? This seems a bit more flexible, allows us to avoid materializing an 
intermediate sequence. WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20771: [SPARK-23587][SQL] Add interpreted execution for ...

2018-03-26 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20771#discussion_r177021309
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -599,8 +610,86 @@ case class MapObjects private(
 
   override def children: Seq[Expression] = lambdaFunction :: inputData :: 
Nil
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported")
+  // The data with UserDefinedType are actually stored with the data type 
of its sqlType.
+  // When we want to apply MapObjects on it, we have to use it.
+  lazy private val inputDataType = inputData.dataType match {
+case u: UserDefinedType[_] => u.sqlType
+case _ => inputData.dataType
+  }
+
+  private def executeFuncOnCollection(inputCollection: Seq[_]): Seq[_] = {
+inputCollection.map { element =>
+  val row = InternalRow.fromSeq(Seq(element))
--- End diff --

NIT reuse the row object.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20771: [SPARK-23587][SQL] Add interpreted execution for ...

2018-03-26 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20771#discussion_r177020985
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -599,8 +610,86 @@ case class MapObjects private(
 
   override def children: Seq[Expression] = lambdaFunction :: inputData :: 
Nil
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported")
+  // The data with UserDefinedType are actually stored with the data type 
of its sqlType.
+  // When we want to apply MapObjects on it, we have to use it.
+  lazy private val inputDataType = inputData.dataType match {
+case u: UserDefinedType[_] => u.sqlType
+case _ => inputData.dataType
+  }
+
+  private def executeFuncOnCollection(inputCollection: Seq[_]): Seq[_] = {
+inputCollection.map { element =>
+  val row = InternalRow.fromSeq(Seq(element))
+  lambdaFunction.eval(row)
+}
+  }
+
+  // Executes lambda function on input collection.
+  private lazy val executeFunc: Any => Seq[_] = inputDataType match {
+case ObjectType(cls) if classOf[Seq[_]].isAssignableFrom(cls) =>
+  x => executeFuncOnCollection(x.asInstanceOf[Seq[_]])
+case ObjectType(cls) if cls.isArray =>
+  x => executeFuncOnCollection(x.asInstanceOf[Array[_]].toSeq)
+case ObjectType(cls) if 
classOf[java.util.List[_]].isAssignableFrom(cls) =>
+  x => 
executeFuncOnCollection(x.asInstanceOf[java.util.List[_]].asScala)
+case ObjectType(cls) if cls == classOf[Object] =>
+  if (cls.isArray) {
+x => executeFuncOnCollection(x.asInstanceOf[Array[_]].toSeq)
+  } else {
+x => executeFuncOnCollection(x.asInstanceOf[Seq[_]])
+  }
+case ArrayType(et, _) =>
+  x => executeFuncOnCollection(x.asInstanceOf[ArrayData].array)
--- End diff --

This will blow up with `UnsafeArrayData` :(... It would be nice if we can 
avoid copying the entire array. We could implement an `ArrayData` wrapper that 
implements `Seq` or `Iterable` (I slightly prefer the latter).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20771: [SPARK-23587][SQL] Add interpreted execution for ...

2018-03-26 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20771#discussion_r177016698
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -599,8 +610,86 @@ case class MapObjects private(
 
   override def children: Seq[Expression] = lambdaFunction :: inputData :: 
Nil
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported")
+  // The data with UserDefinedType are actually stored with the data type 
of its sqlType.
+  // When we want to apply MapObjects on it, we have to use it.
+  lazy private val inputDataType = inputData.dataType match {
+case u: UserDefinedType[_] => u.sqlType
+case _ => inputData.dataType
+  }
+
+  private def executeFuncOnCollection(inputCollection: Seq[_]): Seq[_] = {
+inputCollection.map { element =>
+  val row = InternalRow.fromSeq(Seq(element))
+  lambdaFunction.eval(row)
+}
+  }
+
+  // Executes lambda function on input collection.
+  private lazy val executeFunc: Any => Seq[_] = inputDataType match {
+case ObjectType(cls) if classOf[Seq[_]].isAssignableFrom(cls) =>
+  x => executeFuncOnCollection(x.asInstanceOf[Seq[_]])
+case ObjectType(cls) if cls.isArray =>
+  x => executeFuncOnCollection(x.asInstanceOf[Array[_]].toSeq)
+case ObjectType(cls) if 
classOf[java.util.List[_]].isAssignableFrom(cls) =>
+  x => 
executeFuncOnCollection(x.asInstanceOf[java.util.List[_]].asScala)
+case ObjectType(cls) if cls == classOf[Object] =>
+  if (cls.isArray) {
+x => executeFuncOnCollection(x.asInstanceOf[Array[_]].toSeq)
+  } else {
+x => executeFuncOnCollection(x.asInstanceOf[Seq[_]])
+  }
+case ArrayType(et, _) =>
+  x => executeFuncOnCollection(x.asInstanceOf[ArrayData].array)
+  }
+
+  // Converts the processed collection to custom collection class if any.
+  private lazy val getResults: Seq[_] => Any = customCollectionCls match {
+case Some(cls) if classOf[Seq[_]].isAssignableFrom(cls) =>
+  // Scala sequence
+  identity _
+case Some(cls) if 
classOf[scala.collection.Set[_]].isAssignableFrom(cls) =>
+  // Scala set
+  _.toSet
+case Some(cls) if classOf[java.util.List[_]].isAssignableFrom(cls) =>
+  // Java list
+  if (cls == classOf[java.util.List[_]] || cls == 
classOf[java.util.AbstractList[_]] ||
+  cls == classOf[java.util.AbstractSequentialList[_]]) {
+// Specifying non concrete implementations of `java.util.List`
+_.asJava
+  } else {
+// Specifying concrete implementations of `java.util.List`
+(results) => {
+  val constructors = cls.getConstructors()
--- End diff --

Is there a way we can move the constructor resolution out of the closure. I 
am fine with some code duplication here :)...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20771: [SPARK-23587][SQL] Add interpreted execution for ...

2018-03-26 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20771#discussion_r177015279
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -599,8 +610,86 @@ case class MapObjects private(
 
   override def children: Seq[Expression] = lambdaFunction :: inputData :: 
Nil
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported")
+  // The data with UserDefinedType are actually stored with the data type 
of its sqlType.
+  // When we want to apply MapObjects on it, we have to use it.
+  lazy private val inputDataType = inputData.dataType match {
+case u: UserDefinedType[_] => u.sqlType
+case _ => inputData.dataType
+  }
+
+  private def executeFuncOnCollection(inputCollection: Seq[_]): Seq[_] = {
+inputCollection.map { element =>
+  val row = InternalRow.fromSeq(Seq(element))
+  lambdaFunction.eval(row)
+}
+  }
+
+  // Executes lambda function on input collection.
+  private lazy val executeFunc: Any => Seq[_] = inputDataType match {
+case ObjectType(cls) if classOf[Seq[_]].isAssignableFrom(cls) =>
+  x => executeFuncOnCollection(x.asInstanceOf[Seq[_]])
+case ObjectType(cls) if cls.isArray =>
+  x => executeFuncOnCollection(x.asInstanceOf[Array[_]].toSeq)
+case ObjectType(cls) if 
classOf[java.util.List[_]].isAssignableFrom(cls) =>
+  x => 
executeFuncOnCollection(x.asInstanceOf[java.util.List[_]].asScala)
+case ObjectType(cls) if cls == classOf[Object] =>
--- End diff --

Ugghh... I know understand why this needed. `RowEncoder` does not pass the 
needed type information down: 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala#L146

This obviously needs to be done during evaluation. You got it right in the 
previous commit. I am sorry for misunderstanding this, and making you move it. 
Next time please call me out on this!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20756: [SPARK-23593][SQL] Add interpreted execution for ...

2018-03-25 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20756#discussion_r176956857
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
 ---
@@ -68,6 +68,23 @@ class ObjectExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   mapEncoder.serializer.head, mapExpected, mapInputRow)
   }
 
+  test("SPARK-23593: InitializeJavaBean should support interpreted 
execution") {
+val list = new java.util.LinkedList[Int]()
+list.add(1)
+
+val initializeBean = InitializeJavaBean(Literal.fromObject(new 
java.util.LinkedList[Int]),
+  Map("add" -> Literal(1)))
+checkEvaluation(initializeBean, list, InternalRow.fromSeq(Seq()))
+
+val initializeWithNonexistingMethod = InitializeJavaBean(
+  Literal.fromObject(new java.util.LinkedList[Int]),
--- End diff --

Can you also add a test for when the parameter types do not match up?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20756: [SPARK-23593][SQL] Add interpreted execution for ...

2018-03-25 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20756#discussion_r176956802
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1261,8 +1261,42 @@ case class InitializeJavaBean(beanInstance: 
Expression, setters: Map[String, Exp
   override def children: Seq[Expression] = beanInstance +: 
setters.values.toSeq
   override def dataType: DataType = beanInstance.dataType
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported.")
+  private lazy val resolvedSetters = {
+assert(beanInstance.dataType.isInstanceOf[ObjectType])
+
+val ObjectType(beanClass) = beanInstance.dataType
+setters.map {
+  case (name, expr) =>
+// Looking for known type mapping first, then using Class attached 
in `ObjectType`.
+// Finally also looking for general `Object`-type parameter for 
generic methods.
+val paramTypes = 
CallMethodViaReflection.typeMapping.getOrElse(expr.dataType,
+Seq(expr.dataType.asInstanceOf[ObjectType].cls)) ++ 
Seq(classOf[Object])
+val methods = paramTypes.flatMap { fieldClass =>
+  try {
+Some(beanClass.getDeclaredMethod(name, fieldClass))
+  } catch {
+case e: NoSuchMethodException => None
+  }
+}
+if (methods.isEmpty) {
+  throw new NoSuchMethodException(s"""A method named "$name" is 
not declared """ +
+"in any enclosing class nor any supertype")
+}
+methods.head -> expr
+}
+  }
+
+  override def eval(input: InternalRow): Any = {
+val instance = beanInstance.eval(input)
+if (instance != null) {
+  val bean = instance.asInstanceOf[Object]
+  resolvedSetters.foreach {
+case (setter, expr) =>
+  setter.invoke(bean, expr.eval(input).asInstanceOf[AnyRef])
--- End diff --

There is a subtle difference between code generation and interpreted mode 
here. A null value for an expression that maps to a java primitive will be some 
default value (e.g. -1) for code generation and `null` for interpreted mode, 
this can lead to different results.

I am not sure we should address this, because I am not 100% if this can 
ever happen. @cloud-fan could you shed some light on this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20756: [SPARK-23593][SQL] Add interpreted execution for ...

2018-03-25 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20756#discussion_r176956651
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1261,8 +1261,39 @@ case class InitializeJavaBean(beanInstance: 
Expression, setters: Map[String, Exp
   override def children: Seq[Expression] = beanInstance +: 
setters.values.toSeq
   override def dataType: DataType = beanInstance.dataType
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported.")
+  private lazy val resolvedSetters = {
+val ObjectType(beanClass) = beanInstance.dataType
+setters.map {
+  case (name, expr) =>
+// Looking for known type mapping first, then using Class attached 
in `ObjectType`.
+// Finally also looking for general `Object`-type parameter for 
generic methods.
+val paramTypes = 
CallMethodViaReflection.typeMapping.getOrElse(expr.dataType,
--- End diff --

Sorry for not coming back to this sooner. AFAIK `CallMethodViaReflection` 
expression was only designed to work with a couple of primitives. I think we 
are looking for something a little bit more complete here, i.e. support all 
types in Spark SQL's type system. I also don't think that we should put the 
mappings in `CallMethodViaReflection` because the mapping is now using in more 
expressions, `ScalaReflection` is IMO a better place for this logic.

And finally which PR will implement this. cc @maropu for visibility.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20835: [HOT-FIX] Fix SparkOutOfMemoryError: Unable to acquire 2...

2018-03-25 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/20835
  
I have cherry-picked this into branch-2.3


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20861: [SPARK-23599][SQL] Use RandomUUIDGenerator in Uuid expre...

2018-03-25 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/20861
  
@viirya I have backported #20817 to 2.3


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20861: [SPARK-23599][SQL] Use RandomUUIDGenerator in Uuid expre...

2018-03-22 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/20861
  
@viirya can you create a backport for 2.3?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20861: [SPARK-23599][SQL] Use RandomUUIDGenerator in Uui...

2018-03-20 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20861#discussion_r175727781
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolvedUuidExpressionsSuite.scala
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, 
LogicalPlan}
+
+/**
+ * Test suite for resolving Uuid expressions.
+ */
+class ResolvedUuidExpressionsSuite extends AnalysisTest {
+
+  private lazy val a = 'a.int
+  private lazy val r = LocalRelation(a)
+  private lazy val uuid1 = Uuid().as('_uuid1)
+  private lazy val uuid2 = Uuid().as('_uuid2)
+  private lazy val uuid3 = Uuid().as('_uuid3)
+  private lazy val uuid1Ref = uuid1.toAttribute
+
+  private val analyzer = getAnalyzer(caseSensitive = true)
+
+  private def getUuidExpressions(plan: LogicalPlan): Seq[Uuid] = {
+val uuids = new ArrayBuffer[Uuid]()
+plan.transformUp {
--- End diff --

Nit use `flatMap`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20861: [SPARK-23599][SQL] Use RandomUUIDGenerator in Uui...

2018-03-20 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20861#discussion_r175727739
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolvedUuidExpressionsSuite.scala
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, 
LogicalPlan}
+
+/**
+ * Test suite for resolving Uuid expressions.
+ */
+class ResolvedUuidExpressionsSuite extends AnalysisTest {
+
+  private lazy val a = 'a.int
+  private lazy val r = LocalRelation(a)
+  private lazy val uuid1 = Uuid().as('_uuid1)
+  private lazy val uuid2 = Uuid().as('_uuid2)
+  private lazy val uuid3 = Uuid().as('_uuid3)
+  private lazy val uuid1Ref = uuid1.toAttribute
+
+  private val analyzer = getAnalyzer(caseSensitive = true)
+
+  private def getUuidExpressions(plan: LogicalPlan): Seq[Uuid] = {
+val uuids = new ArrayBuffer[Uuid]()
+plan.transformUp {
+  case p =>
+p.transformExpressionsUp {
--- End diff --

NIT: use `collect`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20861: [SPARK-23599][SQL] Use RandomUUIDGenerator in Uui...

2018-03-20 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20861#discussion_r175725276
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -98,6 +99,8 @@ class Analyzer(
 this(catalog, conf, conf.optimizerMaxIterations)
   }
 
+  private lazy val random = new Random()
--- End diff --

Shall we put `random` in the `ResolvedUuidExpressions`? That makes it a 
little bit easier to follow.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20850: [SPARK-23713][SQL] Cleanup UnsafeWriter and Buffe...

2018-03-19 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20850#discussion_r175608496
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
 ---
@@ -86,11 +88,39 @@ public void grow(int neededSize) {
 }
   }
 
-  public void reset() {
+  byte[] buffer() {
+return buffer;
+  }
+
+  int getCursor() {
+return cursor;
+  }
+
+  void incrementCursor(int val) {
+cursor += val;
+  }
+
+  int pushCursor() {
--- End diff --

Can we make this a little bit less complex? I think just storing the cursor 
in the UnsafeWriter is enough.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20850: [SPARK-23713][SQL] Cleanup UnsafeWriter and Buffe...

2018-03-19 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20850#discussion_r175407382
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java
 ---
@@ -36,8 +90,92 @@
   public abstract void write(int ordinal, float value);
   public abstract void write(int ordinal, double value);
   public abstract void write(int ordinal, Decimal input, int precision, 
int scale);
-  public abstract void write(int ordinal, UTF8String input);
-  public abstract void write(int ordinal, byte[] input);
-  public abstract void write(int ordinal, CalendarInterval input);
-  public abstract void setOffsetAndSize(int ordinal, int currentCursor, 
int size);
+
+  public final void write(int ordinal, UTF8String input) {
+final int numBytes = input.numBytes();
+final int roundedSize = 
ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes);
+
+// grow the global buffer before writing data.
+grow(roundedSize);
+
+zeroOutPaddingBytes(numBytes);
+
+// Write the bytes to the variable length portion.
+input.writeToMemory(buffer(), cursor());
+
+setOffsetAndSize(ordinal, numBytes);
+
+// move the cursor forward.
+addCursor(roundedSize);
+  }
+
+  public final void write(int ordinal, byte[] input) {
+write(ordinal, input, 0, input.length);
+  }
+
+  public final void write(int ordinal, byte[] input, int offset, int 
numBytes) {
+final int roundedSize = 
ByteArrayMethods.roundNumberOfBytesToNearestWord(input.length);
+
+// grow the global buffer before writing data.
+grow(roundedSize);
+
+zeroOutPaddingBytes(numBytes);
+
+// Write the bytes to the variable length portion.
+Platform.copyMemory(
+  input, Platform.BYTE_ARRAY_OFFSET + offset, buffer(), cursor(), 
numBytes);
+
+setOffsetAndSize(ordinal, numBytes);
+
+// move the cursor forward.
+addCursor(roundedSize);
+  }
+
+  public final void write(int ordinal, CalendarInterval input) {
+// grow the global buffer before writing data.
+grow(16);
+
+// Write the months and microseconds fields of Interval to the 
variable length portion.
+Platform.putLong(buffer(), cursor(), input.months);
+Platform.putLong(buffer(), cursor() + 8, input.microseconds);
+
+setOffsetAndSize(ordinal, 16);
+
+// move the cursor forward.
+addCursor(16);
+  }
+
+  protected final void _write(long offset, boolean value) {
--- End diff --

why the `_write` names? Just call them`writeBoolean` etc...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20850: [SPARK-23713][SQL] Cleanup UnsafeWriter and Buffe...

2018-03-19 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20850#discussion_r175407178
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java
 ---
@@ -40,29 +37,45 @@
  */
 public final class UnsafeRowWriter extends UnsafeWriter {
 
-  private final BufferHolder holder;
-  // The offset of the global buffer where we start to write this row.
-  private int startingOffset;
+  private final UnsafeRow row;
+
   private final int nullBitsSize;
   private final int fixedSize;
 
-  public UnsafeRowWriter(BufferHolder holder, int numFields) {
-this.holder = holder;
+  public UnsafeRowWriter(UnsafeRow row, int initialBufferSize) {
--- End diff --

Do we really need two `UnsafeRow` constructors?

For the the top level row writer I also think it might be nice to create 
row internally, and just have a constructor that takes a numFields and 
(optionally) size argument.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20850: [SPARK-23713][SQL] Cleanup UnsafeWriter and Buffe...

2018-03-19 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20850#discussion_r175393443
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
 ---
@@ -31,24 +31,24 @@
  * for each incoming record, we should call `reset` of BufferHolder 
instance before write the record
  * and reuse the data buffer.
  *
- * Generally we should call `UnsafeRow.setTotalSize` and pass in 
`BufferHolder.totalSize` to update
+ * Generally we should call `UnsafeRowWriter.setTotalSize` using 
`BufferHolder.totalSize` to update
  * the size of the result row, after writing a record to the buffer. 
However, we can skip this step
  * if the fields of row are all fixed-length, as the size of result row is 
also fixed.
  */
-public class BufferHolder {
+public final class BufferHolder {
--- End diff --

Why is this still public since you are making everything package private?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20850: [SPARK-23713][SQL] Cleanup UnsafeWriter and Buffe...

2018-03-19 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20850#discussion_r175393169
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java
 ---
@@ -47,126 +40,119 @@ private void assertIndexIsValid(int index) {
 assert index < numElements : "index (" + index + ") should < " + 
numElements;
   }
 
-  public void initialize(BufferHolder holder, int numElements, int 
elementSize) {
+  public UnsafeArrayWriter(UnsafeWriter writer) {
+super(writer.getBufferHolder());
+  }
+
+  public void initialize(int numElements, int elementSize) {
--- End diff --

Should we move `elementSize` into the constructor? I don't think there are 
case where we are reusing `UnsafeArrayWriter `s.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20850: [SPARK-23713][SQL] Cleanup UnsafeWriter and Buffe...

2018-03-19 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20850#discussion_r175392909
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
 ---
@@ -86,11 +86,17 @@ public void grow(int neededSize) {
 }
   }
 
-  public void reset() {
+  byte[] buffer() { return buffer; }
+
+  int getCursor() { return cursor; }
+
+  void addCursor(int val) { cursor += val; }
--- End diff --

`incrementCursor`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20850: [SPARK-23713][SQL] Cleanup UnsafeWriter and BufferHolder...

2018-03-19 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/20850
  
@kiszk this is a good start! This is very performance critical code, can 
you please extend/update/run the existing `UnsafeProjectionBenchmark`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20850: [SPARK-23713][SQL] Cleanup UnsafeWriter and Buffe...

2018-03-19 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20850#discussion_r175391993
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
 ---
@@ -111,27 +111,27 @@ object GenerateUnsafeProjection extends 
CodeGenerator[Seq[Expression], UnsafePro
 s"""
   // Remember the current cursor so that we can calculate how 
many bytes are
   // written later.
-  final int $tmpCursor = $bufferHolder.cursor;
-  ${writeStructToBuffer(ctx, input.value, t.map(_.dataType), 
bufferHolder)}
-  $rowWriter.setOffsetAndSize($index, $tmpCursor, 
$bufferHolder.cursor - $tmpCursor);
+  final int $tmpCursor = $rowWriter.cursor();
--- End diff --

It seems a bit weird that we have to are storing state internal to the 
`UnsafeWriter`/`BufferHolder` here. It would be very nice if we can internalize 
this code into the `UnsafeWriter`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20854: [SPARK-23712][SQL] Interpreted UnsafeRowJoiner [W...

2018-03-18 Thread hvanhovell
GitHub user hvanhovell opened a pull request:

https://github.com/apache/spark/pull/20854

[SPARK-23712][SQL] Interpreted UnsafeRowJoiner [WIP]

## What changes were proposed in this pull request?
This PR adds an interpreted version of `UnsafeRowJoiner` to Spark SQL.

Its performance is almost to par with the code generated `UnsafeRowJoiner`. 
There seems to be an overhead of 10ns per call. It might be an idea to not use 
code generation at all for an `UnsafeRowJoiner`

## How was this patch tested?
Modified existing row joiner tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hvanhovell/spark SPARK-23712

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20854.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20854


commit b637ded5ddd38f58e2c0d1b5172ebed5cb9014e2
Author: Herman van Hovell <hvanhovell@...>
Date:   2018-03-17T13:42:13Z

Add interpreted unsafe row joiner

commit d0b40a9ff6368051d737224dd9931a7ef1b428cb
Author: Herman van Hovell <hvanhovell@...>
Date:   2018-03-18T12:16:30Z

Add benchmark




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20750: [SPARK-23581][SQL] Add interpreted unsafe projection

2018-03-16 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/20750
  
@cloud-fan has some issue with his mac, so I will be merging :)...

Thanks for the reviews!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-16 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r175083026
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala
 ---
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter}
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.types.{UserDefinedType, _}
+import org.apache.spark.unsafe.Platform
+
+/**
+ * An interpreted unsafe projection. This class reuses the [[UnsafeRow]] 
it produces, a consumer
+ * should copy the row if it is being buffered. This class is not thread 
safe.
+ *
+ * @param expressions that produces the resulting fields. These 
expressions must be bound
+ *to a schema.
+ */
+class InterpretedUnsafeProjection(expressions: Array[Expression]) extends 
UnsafeProjection {
+  import InterpretedUnsafeProjection._
+
+  /** Number of (top level) fields in the resulting row. */
+  private[this] val numFields = expressions.length
+
+  /** Array that expression results. */
+  private[this] val values = new Array[Any](numFields)
+
+  /** The row representing the expression results. */
+  private[this] val intermediate = new GenericInternalRow(values)
+
+  /** The row returned by the projection. */
+  private[this] val result = new UnsafeRow(numFields)
+
+  /** The buffer which holds the resulting row's backing data. */
+  private[this] val holder = new BufferHolder(result, numFields * 32)
+
+  /** The writer that writes the intermediate result to the result row. */
+  private[this] val writer: InternalRow => Unit = {
+val rowWriter = new UnsafeRowWriter(holder, numFields)
+val baseWriter = generateStructWriter(
+  holder,
+  rowWriter,
+  expressions.map(e => StructField("", e.dataType, e.nullable)))
+if (!expressions.exists(_.nullable)) {
+  // No nullable fields. The top-level null bit mask will always be 
zeroed out.
+  baseWriter
+} else {
+  // Zero out the null bit mask before we write the row.
+  row => {
+rowWriter.zeroOutNullBytes()
+baseWriter(row)
+  }
+}
+  }
+
+  override def initialize(partitionIndex: Int): Unit = {
+expressions.foreach(_.foreach {
+  case n: Nondeterministic => n.initialize(partitionIndex)
+  case _ =>
+})
+  }
+
+  override def apply(row: InternalRow): UnsafeRow = {
+// Put the expression results in the intermediate row.
+var i = 0
+while (i < numFields) {
+  values(i) = expressions(i).eval(row)
+  i += 1
+}
+
+// Write the intermediate row to an unsafe row.
+holder.reset()
+writer(intermediate)
+result.setTotalSize(holder.totalSize())
+result
+  }
+}
+
+/**
+ * Helper functions for creating an [[InterpretedUnsafeProjection]].
+ */
+object InterpretedUnsafeProjection extends UnsafeProjectionCreator {
+
+  /**
+   * Returns an [[UnsafeProjection]] for given sequence of bound 
Expressions.
+   */
+  override protected def createProjection(exprs: Seq[Expression]): 
UnsafeProjection = {
+// We need to make sure that we do not reuse stateful expressions.
+val cleanedExpressions = exprs.map(_.transform {
+  case s: Stateful => s.freshCopy()
+})
+new InterpretedUnsafeProjection(cleanedExpressions.toArray)
+  }
+
+  /**
+   * Generate a struct writer function. The generated function writes an 
[[Internal

[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-16 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r175065574
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala
 ---
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter}
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.types.{UserDefinedType, _}
+import org.apache.spark.unsafe.Platform
+
+/**
+ * An interpreted unsafe projection. This class reuses the [[UnsafeRow]] 
it produces, a consumer
+ * should copy the row if it is being buffered. This class is not thread 
safe.
+ *
+ * @param expressions that produces the resulting fields. These 
expressions must be bound
+ *to a schema.
+ */
+class InterpretedUnsafeProjection(expressions: Array[Expression]) extends 
UnsafeProjection {
+  import InterpretedUnsafeProjection._
+
+  /** Number of (top level) fields in the resulting row. */
+  private[this] val numFields = expressions.length
+
+  /** Array that expression results. */
+  private[this] val values = new Array[Any](numFields)
+
+  /** The row representing the expression results. */
+  private[this] val intermediate = new GenericInternalRow(values)
+
+  /** The row returned by the projection. */
+  private[this] val result = new UnsafeRow(numFields)
+
+  /** The buffer which holds the resulting row's backing data. */
+  private[this] val holder = new BufferHolder(result, numFields * 32)
+
+  /** The writer that writes the intermediate result to the result row. */
+  private[this] val writer: InternalRow => Unit = {
+val rowWriter = new UnsafeRowWriter(holder, numFields)
+val baseWriter = generateStructWriter(
+  holder,
+  rowWriter,
+  expressions.map(e => StructField("", e.dataType, e.nullable)))
+if (!expressions.exists(_.nullable)) {
+  // No nullable fields. The top-level null bit mask will always be 
zeroed out.
+  baseWriter
+} else {
+  // Zero out the null bit mask before we write the row.
+  row => {
+rowWriter.zeroOutNullBytes()
+baseWriter(row)
+  }
+}
+  }
+
+  override def initialize(partitionIndex: Int): Unit = {
+expressions.foreach(_.foreach {
+  case n: Nondeterministic => n.initialize(partitionIndex)
+  case _ =>
+})
+  }
+
+  override def apply(row: InternalRow): UnsafeRow = {
+// Put the expression results in the intermediate row.
+var i = 0
+while (i < numFields) {
+  values(i) = expressions(i).eval(row)
+  i += 1
+}
+
+// Write the intermediate row to an unsafe row.
+holder.reset()
+writer(intermediate)
+result.setTotalSize(holder.totalSize())
+result
+  }
+}
+
+/**
+ * Helper functions for creating an [[InterpretedUnsafeProjection]].
+ */
+object InterpretedUnsafeProjection extends UnsafeProjectionCreator {
+
+  /**
+   * Returns an [[UnsafeProjection]] for given sequence of bound 
Expressions.
+   */
+  override protected def createProjection(exprs: Seq[Expression]): 
UnsafeProjection = {
+// We need to make sure that we do not reuse stateful expressions.
+val cleanedExpressions = exprs.map(_.transform {
+  case s: Stateful => s.freshCopy()
+})
+new InterpretedUnsafeProjection(cleanedExpressions.toArray)
+  }
+
+  /**
+   * Generate a struct writer function. The generated function writes an 
[[Internal

[GitHub] spark issue #20835: [HOT-FIX] Fix SparkOutOfMemoryError: Unable to acquire 2...

2018-03-15 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/20835
  
Merging to master. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20835: [HOT-FIX] Fix SparkOutOfMemoryError: Unable to acquire 2...

2018-03-15 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/20835
  
@kiszk do we have a more reasonable way to test this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20835: [HOT-FIX] Fix SparkOutOfMemoryError: Unable to acquire 2...

2018-03-15 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/20835
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20727: [SPARK-23577][SQL] Supports custom line separator...

2018-03-15 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20727#discussion_r174821385
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextOptions.scala
 ---
@@ -39,9 +39,12 @@ private[text] class TextOptions(@transient private val 
parameters: CaseInsensiti
*/
   val wholeText = parameters.getOrElse(WHOLETEXT, "false").toBoolean
 
+  val lineSeparator: String = parameters.getOrElse(LINE_SEPARATOR, "\n")
+  require(lineSeparator.nonEmpty, s"'$LINE_SEPARATOR' cannot be an empty 
string.")
 }
 
 private[text] object TextOptions {
   val COMPRESSION = "compression"
   val WHOLETEXT = "wholetext"
+  val LINE_SEPARATOR = "lineSep"
--- End diff --

I don't really care what we are going to call it this. I think it is 
important that we are consistent across datasources. IMO Since we can define 
anything to be a separator, not only newlines, `records` seems to fit a bit 
better.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20817: [SPARK-23599][SQL] Add a UUID generator from Pseu...

2018-03-15 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20817#discussion_r174811371
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RandomUUIDGenerator.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import java.util.UUID
+
+import org.apache.commons.math3.random.MersenneTwister
+
+/**
+ * This class is used to generate a UUID from Pseudo-Random Numbers.
+ *
+ * For the algorithm, see RFC 4122: A Universally Unique IDentifier (UUID) 
URN Namespace,
+ * section 4.4 "Algorithms for Creating a UUID from Truly Random or 
Pseudo-Random Numbers".
+ */
+case class RandomUUIDGenerator(randomSeed: Long) {
+  private val random = new MersenneTwister(randomSeed)
+
+  def getNextUUID(): UUID = {
--- End diff --

Perhaps we should also create a version that creates a  UTF8String directly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-14 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r174594054
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala
 ---
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter}
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.types.{UserDefinedType, _}
+import org.apache.spark.unsafe.Platform
+
+/**
+ * An interpreted unsafe projection. This class reuses the [[UnsafeRow]] 
it produces, a consumer
+ * should copy the row if it is being buffered. This class is not thread 
safe.
+ *
+ * @param expressions that produces the resulting fields. These 
expressions must be bound
+ *to a schema.
+ */
+class InterpretedUnsafeProjection(expressions: Array[Expression]) extends 
UnsafeProjection {
+  import InterpretedUnsafeProjection._
+
+  /** Number of (top level) fields in the resulting row. */
+  private[this] val numFields = expressions.length
+
+  /** Array that expression results. */
+  private[this] val values = new Array[Any](numFields)
+
+  /** The row representing the expression results. */
+  private[this] val intermediate = new GenericInternalRow(values)
+
+  /** The row returned by the projection. */
+  private[this] val result = new UnsafeRow(numFields)
+
+  /** The buffer which holds the resulting row's backing data. */
+  private[this] val holder = new BufferHolder(result, numFields * 32)
+
+  /** The writer that writes the intermediate result to the result row. */
+  private[this] val writer: InternalRow => Unit = {
+val rowWriter = new UnsafeRowWriter(holder, numFields)
+val baseWriter = generateStructWriter(
+  holder,
+  rowWriter,
+  expressions.map(e => StructField("", e.dataType, e.nullable)))
+if (!expressions.exists(_.nullable)) {
+  // No nullable fields. The top-level null bit mask will always be 
zeroed out.
+  baseWriter
+} else {
+  // Zero out the null bit mask before we write the row.
+  row => {
+rowWriter.zeroOutNullBytes()
+baseWriter(row)
+  }
+}
+  }
+
+  override def initialize(partitionIndex: Int): Unit = {
+expressions.foreach(_.foreach {
+  case n: Nondeterministic => n.initialize(partitionIndex)
+  case _ =>
+})
+  }
+
+  override def apply(row: InternalRow): UnsafeRow = {
+// Put the expression results in the intermediate row.
+var i = 0
+while (i < numFields) {
+  values(i) = expressions(i).eval(row)
+  i += 1
+}
+
+// Write the intermediate row to an unsafe row.
+holder.reset()
+writer(intermediate)
+result.setTotalSize(holder.totalSize())
+result
+  }
+}
+
+/**
+ * Helper functions for creating an [[InterpretedUnsafeProjection]].
+ */
+object InterpretedUnsafeProjection extends UnsafeProjectionCreator {
+
+  /**
+   * Returns an [[UnsafeProjection]] for given sequence of bound 
Expressions.
+   */
+  override protected def createProjection(exprs: Seq[Expression]): 
UnsafeProjection = {
+// We need to make sure that we do not reuse stateful non 
deterministic expressions.
+val cleanedExpressions = exprs.map(_.transform {
+  case s: StatefulNondeterministic => s.freshCopy()
--- End diff --

In codegen the state is put in the generated class, if you happen to visit 
the same expression twice the state is added twice and is not shared dur

[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-14 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r174587783
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeWriter.java
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions.codegen;
+
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Base class for writing Unsafe* structures.
+ */
+public abstract class UnsafeWriter {
+  public abstract void setNullByte(int ordinal);
--- End diff --

See my previous discussion with @mgaido91. I am fine either way, I can also 
add the missing methods an be done with it, that will just make the interpreted 
code path a bit messier.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-14 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r174587229
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java
 ---
@@ -83,10 +83,10 @@ private long getElementOffset(int ordinal, int 
elementSize) {
 return startingOffset + headerInBytes + ordinal * elementSize;
   }
 
-  public void setOffsetAndSize(int ordinal, long currentCursor, int size) {
+  public void setOffsetAndSize(int ordinal, long currentCursor, long size) 
{
--- End diff --

I think we can safely change the signature to take two ints.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20817: [SPARK-23599][SQL] Add a UUID generator from Pseudo-Rand...

2018-03-14 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/20817
  
@kiszk is it taking more memory because of the test? If it does can we make 
the test case smaller?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20699: [SPARK-23544][SQL]Remove redundancy ShuffleExchange in t...

2018-03-14 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/20699
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20817: [SPARK-23599][SQL] Add a UUID generator from Pseu...

2018-03-14 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20817#discussion_r174416499
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RandomUUIDGenerator.scala
 ---
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import java.util.UUID
+
+import scala.util.Random
+
+/**
+ * This class is used to generate a UUID from Pseudo-Random Numbers 
produced by
+ * Scala Random.
+ *
+ * For the algorithm, see RFC 4122: A Universally Unique IDentifier (UUID) 
URN Namespace,
+ * section 4.4 "Algorithms for Creating a UUID from Truly Random or 
Pseudo-Random Numbers".
+ */
+case class RandomUUIDGenerator(random: Random) {
+  def getNextUUID(): UUID = {
+val mostSigBits = (random.nextLong() & 0x0FFFL) | 
0x4000L
+val leastSigBits = (random.nextLong() | 0x8000L) & 
0xBFFFL
+
+new UUID(mostSigBits, leastSigBits)
--- End diff --

I think we need to use a different RNG. `java.util.Random` only has 48 bits 
of state, which is less than the 122 bits we need for random number generation. 
Something like [PCG](http://pcg-random.org/) or a Mersenne twister would work.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20804: [SPARK-23656][Test] Perform assertions in XXH64Suite.tes...

2018-03-13 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/20804
  
LGTM - merging to master. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20795: [SPARK-23486]cache the function name from the catalog fo...

2018-03-11 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/20795
  
Ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20771: [SPARK-23587][SQL] Add interpreted execution for ...

2018-03-09 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20771#discussion_r173436038
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -599,8 +610,79 @@ case class MapObjects private(
 
   override def children: Seq[Expression] = lambdaFunction :: inputData :: 
Nil
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported")
+  // The data with UserDefinedType are actually stored with the data type 
of its sqlType.
+  // When we want to apply MapObjects on it, we have to use it.
+  lazy private val inputDataType = inputData.dataType match {
+case u: UserDefinedType[_] => u.sqlType
+case _ => inputData.dataType
+  }
+
+  private def executeFuncOnCollection(inputCollection: Seq[_]): Seq[_] = {
+inputCollection.map { element =>
+  val row = InternalRow.fromSeq(Seq(element))
+  lambdaFunction.eval(row)
+}
+  }
+
+  // Executes lambda function on input collection.
+  private lazy val executeFunc: Any => Seq[_] = inputDataType match {
+case ObjectType(cls) if classOf[Seq[_]].isAssignableFrom(cls) =>
+  x => executeFuncOnCollection(x.asInstanceOf[Seq[_]])
+case ObjectType(cls) if cls.isArray =>
+  x => executeFuncOnCollection(x.asInstanceOf[Array[_]].toSeq)
+case ObjectType(cls) if 
classOf[java.util.List[_]].isAssignableFrom(cls) =>
+  x => 
executeFuncOnCollection(x.asInstanceOf[java.util.List[_]].asScala)
+case ObjectType(cls) if cls == classOf[Object] =>
+  (inputCollection) => {
+if (inputCollection.getClass.isArray) {
--- End diff --

(I am sorry for sounding like a broken record) But can we move this check 
out of the the function closure?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20771: [SPARK-23587][SQL] Add interpreted execution for ...

2018-03-09 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20771#discussion_r173431326
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -599,8 +610,79 @@ case class MapObjects private(
 
   override def children: Seq[Expression] = lambdaFunction :: inputData :: 
Nil
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported")
+  // The data with UserDefinedType are actually stored with the data type 
of its sqlType.
+  // When we want to apply MapObjects on it, we have to use it.
+  lazy private val inputDataType = inputData.dataType match {
+case u: UserDefinedType[_] => u.sqlType
+case _ => inputData.dataType
+  }
+
+  private def executeFuncOnCollection(inputCollection: Seq[_]): Seq[_] = {
+inputCollection.map { element =>
+  val row = InternalRow.fromSeq(Seq(element))
+  lambdaFunction.eval(row)
+}
+  }
+
+  // Executes lambda function on input collection.
+  private lazy val executeFunc: Any => Seq[_] = inputDataType match {
+case ObjectType(cls) if classOf[Seq[_]].isAssignableFrom(cls) =>
+  x => executeFuncOnCollection(x.asInstanceOf[Seq[_]])
+case ObjectType(cls) if cls.isArray =>
+  x => executeFuncOnCollection(x.asInstanceOf[Array[_]].toSeq)
+case ObjectType(cls) if 
classOf[java.util.List[_]].isAssignableFrom(cls) =>
+  x => 
executeFuncOnCollection(x.asInstanceOf[java.util.List[_]].asScala)
+case ObjectType(cls) if cls == classOf[Object] =>
+  (inputCollection) => {
+if (inputCollection.getClass.isArray) {
+  
executeFuncOnCollection(inputCollection.asInstanceOf[Array[_]].toSeq)
+} else {
+  executeFuncOnCollection(inputCollection.asInstanceOf[Seq[_]])
+}
+  }
+case ArrayType(et, _) =>
+  x => executeFuncOnCollection(x.asInstanceOf[ArrayData].array)
+  }
+
+  // Converts the processed collection to custom collection class if any.
+  private lazy val getResults: Seq[_] => Any = customCollectionCls match {
+case Some(cls) if classOf[Seq[_]].isAssignableFrom(cls) =>
+  // Scala sequence
+  _.toSeq
+case Some(cls) if 
classOf[scala.collection.Set[_]].isAssignableFrom(cls) =>
+  // Scala set
+  _.toSet
+case Some(cls) if classOf[java.util.List[_]].isAssignableFrom(cls) =>
+  // Java list
+  if (cls == classOf[java.util.List[_]] || cls == 
classOf[java.util.AbstractList[_]] ||
+  cls == classOf[java.util.AbstractSequentialList[_]]) {
+_.asJava
+  } else {
+(results) => {
+  val builder = Try(cls.getConstructor(Integer.TYPE)).map { 
constructor =>
--- End diff --

Can you try to do the constructor lookup only once? The duplication that 
that will cause is ok.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20771: [SPARK-23587][SQL] Add interpreted execution for ...

2018-03-09 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20771#discussion_r173431013
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -599,8 +610,79 @@ case class MapObjects private(
 
   override def children: Seq[Expression] = lambdaFunction :: inputData :: 
Nil
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported")
+  // The data with UserDefinedType are actually stored with the data type 
of its sqlType.
+  // When we want to apply MapObjects on it, we have to use it.
+  lazy private val inputDataType = inputData.dataType match {
+case u: UserDefinedType[_] => u.sqlType
+case _ => inputData.dataType
+  }
+
+  private def executeFuncOnCollection(inputCollection: Seq[_]): Seq[_] = {
+inputCollection.map { element =>
+  val row = InternalRow.fromSeq(Seq(element))
+  lambdaFunction.eval(row)
+}
+  }
+
+  // Executes lambda function on input collection.
+  private lazy val executeFunc: Any => Seq[_] = inputDataType match {
+case ObjectType(cls) if classOf[Seq[_]].isAssignableFrom(cls) =>
+  x => executeFuncOnCollection(x.asInstanceOf[Seq[_]])
+case ObjectType(cls) if cls.isArray =>
+  x => executeFuncOnCollection(x.asInstanceOf[Array[_]].toSeq)
+case ObjectType(cls) if 
classOf[java.util.List[_]].isAssignableFrom(cls) =>
+  x => 
executeFuncOnCollection(x.asInstanceOf[java.util.List[_]].asScala)
+case ObjectType(cls) if cls == classOf[Object] =>
+  (inputCollection) => {
+if (inputCollection.getClass.isArray) {
+  
executeFuncOnCollection(inputCollection.asInstanceOf[Array[_]].toSeq)
+} else {
+  executeFuncOnCollection(inputCollection.asInstanceOf[Seq[_]])
+}
+  }
+case ArrayType(et, _) =>
+  x => executeFuncOnCollection(x.asInstanceOf[ArrayData].array)
+  }
+
+  // Converts the processed collection to custom collection class if any.
+  private lazy val getResults: Seq[_] => Any = customCollectionCls match {
+case Some(cls) if classOf[Seq[_]].isAssignableFrom(cls) =>
+  // Scala sequence
+  _.toSeq
+case Some(cls) if 
classOf[scala.collection.Set[_]].isAssignableFrom(cls) =>
+  // Scala set
+  _.toSet
+case Some(cls) if classOf[java.util.List[_]].isAssignableFrom(cls) =>
+  // Java list
+  if (cls == classOf[java.util.List[_]] || cls == 
classOf[java.util.AbstractList[_]] ||
--- End diff --

IIUC you are matching against non concrete implementations of 
`java.util.List`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20771: [SPARK-23587][SQL] Add interpreted execution for ...

2018-03-09 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20771#discussion_r173430815
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -599,8 +610,79 @@ case class MapObjects private(
 
   override def children: Seq[Expression] = lambdaFunction :: inputData :: 
Nil
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported")
+  // The data with UserDefinedType are actually stored with the data type 
of its sqlType.
+  // When we want to apply MapObjects on it, we have to use it.
+  lazy private val inputDataType = inputData.dataType match {
+case u: UserDefinedType[_] => u.sqlType
+case _ => inputData.dataType
+  }
+
+  private def executeFuncOnCollection(inputCollection: Seq[_]): Seq[_] = {
+inputCollection.map { element =>
+  val row = InternalRow.fromSeq(Seq(element))
+  lambdaFunction.eval(row)
+}
+  }
+
+  // Executes lambda function on input collection.
+  private lazy val executeFunc: Any => Seq[_] = inputDataType match {
+case ObjectType(cls) if classOf[Seq[_]].isAssignableFrom(cls) =>
+  x => executeFuncOnCollection(x.asInstanceOf[Seq[_]])
+case ObjectType(cls) if cls.isArray =>
+  x => executeFuncOnCollection(x.asInstanceOf[Array[_]].toSeq)
+case ObjectType(cls) if 
classOf[java.util.List[_]].isAssignableFrom(cls) =>
+  x => 
executeFuncOnCollection(x.asInstanceOf[java.util.List[_]].asScala)
+case ObjectType(cls) if cls == classOf[Object] =>
+  (inputCollection) => {
+if (inputCollection.getClass.isArray) {
+  
executeFuncOnCollection(inputCollection.asInstanceOf[Array[_]].toSeq)
+} else {
+  executeFuncOnCollection(inputCollection.asInstanceOf[Seq[_]])
+}
+  }
+case ArrayType(et, _) =>
+  x => executeFuncOnCollection(x.asInstanceOf[ArrayData].array)
+  }
+
+  // Converts the processed collection to custom collection class if any.
+  private lazy val getResults: Seq[_] => Any = customCollectionCls match {
+case Some(cls) if classOf[Seq[_]].isAssignableFrom(cls) =>
+  // Scala sequence
+  _.toSeq
--- End diff --

This identity right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20757: [SPARK-23595][SQL] ValidateExternalType should su...

2018-03-09 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20757#discussion_r173429606
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1421,13 +1421,36 @@ case class ValidateExternalType(child: Expression, 
expected: DataType)
 
   override def nullable: Boolean = child.nullable
 
-  override def dataType: DataType = 
RowEncoder.externalDataTypeForInput(expected)
-
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported")
+  override val dataType: DataType = 
RowEncoder.externalDataTypeForInput(expected)
 
   private val errMsg = s" is not a valid external type for schema of 
${expected.simpleString}"
 
+  private lazy val checkType: (Any) => Boolean = expected match {
+case _: DecimalType =>
+  (value: Any) => {
+value.isInstanceOf[java.math.BigDecimal] || 
value.isInstanceOf[scala.math.BigDecimal] ||
+  value.isInstanceOf[Decimal]
+  }
+case _: ArrayType =>
+  (value: Any) => {
+value.getClass.isArray || value.isInstanceOf[Seq[_]]
+  }
+case _ =>
+  val dataTypeClazz = RowEncoder.getClassFromExternalType(dataType)
--- End diff --

Does this always return the same result as 
`CodeGenerator.boxedType(dataType)` in terms of functionality? I don't think it 
does, since it misses support for `DateType`, `TimestampType`, `DecimalType`, 
`StringType`, `StructType`, `MapType`, `ArrayType` and `UserDefinedType`. The 
thing is that it does not really matter how this expression is currently used 
(for datasets), what matters is how the code generated version is implemented.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20771: [SPARK-23587][SQL] Add interpreted execution for ...

2018-03-09 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20771#discussion_r173416634
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -501,12 +502,22 @@ case class LambdaVariable(
 value: String,
 isNull: String,
 dataType: DataType,
-nullable: Boolean = true) extends LeafExpression
-  with Unevaluable with NonSQLExpression {
+nullable: Boolean = true) extends LeafExpression with NonSQLExpression 
{
+
+  // Interpreted execution of `LambdaVariable` always get the 0-index 
element from input row.
+  override def eval(input: InternalRow): Any = {
+assert(input.numFields == 1,
+  "The input row of interpreted LambdaVariable should have only 1 
field.")
+input.get(0, dataType)
--- End diff --

Let's spin that off into a different ticket if we want to work on it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20771: [SPARK-23587][SQL] Add interpreted execution for ...

2018-03-09 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20771#discussion_r173416320
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -501,12 +502,22 @@ case class LambdaVariable(
 value: String,
 isNull: String,
 dataType: DataType,
-nullable: Boolean = true) extends LeafExpression
-  with Unevaluable with NonSQLExpression {
+nullable: Boolean = true) extends LeafExpression with NonSQLExpression 
{
+
+  // Interpreted execution of `LambdaVariable` always get the 0-index 
element from input row.
+  override def eval(input: InternalRow): Any = {
+assert(input.numFields == 1,
+  "The input row of interpreted LambdaVariable should have only 1 
field.")
+input.get(0, dataType)
--- End diff --

Yeah I do.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20756: [SPARK-23593][SQL] Add interpreted execution for ...

2018-03-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20756#discussion_r173291374
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1261,8 +1261,42 @@ case class InitializeJavaBean(beanInstance: 
Expression, setters: Map[String, Exp
   override def children: Seq[Expression] = beanInstance +: 
setters.values.toSeq
   override def dataType: DataType = beanInstance.dataType
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported.")
+  private lazy val resolvedSetters = {
+assert(beanInstance.dataType.isInstanceOf[ObjectType])
+
+val ObjectType(beanClass) = beanInstance.dataType
+setters.map {
+  case (name, expr) =>
+// Looking for known type mapping first, then using Class attached 
in `ObjectType`.
+// Finally also looking for general `Object`-type parameter for 
generic methods.
+val paramTypes = 
CallMethodViaReflection.typeMapping.getOrElse(expr.dataType,
+Seq(expr.dataType.asInstanceOf[ObjectType].cls)) ++ 
Seq(classOf[Object])
+val methods = paramTypes.flatMap { fieldClass =>
+  try {
+Some(beanClass.getDeclaredMethod(name, fieldClass))
+  } catch {
+case e: NoSuchMethodException => None
+  }
+}
+if (methods.isEmpty) {
+  throw new NoSuchMethodException(s"""A method named "$name" is 
not declared """ +
+"in any enclosing class nor any supertype, nor through a 
static import")
+}
+methods.head -> expr
+}
+  }
+
+  override def eval(input: InternalRow): Any = {
+val instance = beanInstance.eval(input)
+if (instance != null) {
+  val bean = instance.asInstanceOf[Object]
+  resolvedSetters.foreach {
+case (setter, expr) =>
+  setter.invoke(bean, expr.eval(input).asInstanceOf[Object])
--- End diff --

`AnyRef` is a bit more scala like, that is all.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20773: [SPARK-23602][SQL] PrintToStderr prints value also in in...

2018-03-08 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/20773
  
Merging to master. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20772: [SPARK-23628][SQL] calculateParamLength should not retur...

2018-03-08 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/20772
  
@cloud-fan you beat me to it :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20772: [SPARK-23628][SQL] calculateParamLength should not retur...

2018-03-08 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/20772
  
@mgaido91 this is good catch!

LGTM. Merging to master/2.3. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20773: [SPARK-23602][SQL] PrintToStderr prints value also in in...

2018-03-08 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/20773
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20771: [SPARK-23587][SQL] Add interpreted execution for ...

2018-03-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20771#discussion_r173231736
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -599,8 +610,71 @@ case class MapObjects private(
 
   override def children: Seq[Expression] = lambdaFunction :: inputData :: 
Nil
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported")
+  // The data with PythonUserDefinedType are actually stored with the data 
type of its sqlType.
+  // When we want to apply MapObjects on it, we have to use it.
+  lazy private val inputDataType = inputData.dataType match {
+case p: PythonUserDefinedType => p.sqlType
+case _ => inputData.dataType
+  }
+
+  private def executeFuncOnCollection(inputCollection: Seq[_]): Seq[_] = {
+inputCollection.map { element =>
+  val row = InternalRow.fromSeq(Seq(element))
+  lambdaFunction.eval(row)
+}
+  }
+
+  override def eval(input: InternalRow): Any = {
+val inputCollection = inputData.eval(input)
+
+if (inputCollection == null) {
+  return inputCollection
--- End diff --

NIT: It is slightly cleared to return null here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20771: [SPARK-23587][SQL] Add interpreted execution for ...

2018-03-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20771#discussion_r173231610
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -599,8 +610,71 @@ case class MapObjects private(
 
   override def children: Seq[Expression] = lambdaFunction :: inputData :: 
Nil
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported")
+  // The data with PythonUserDefinedType are actually stored with the data 
type of its sqlType.
+  // When we want to apply MapObjects on it, we have to use it.
+  lazy private val inputDataType = inputData.dataType match {
+case p: PythonUserDefinedType => p.sqlType
+case _ => inputData.dataType
+  }
+
+  private def executeFuncOnCollection(inputCollection: Seq[_]): Seq[_] = {
+inputCollection.map { element =>
+  val row = InternalRow.fromSeq(Seq(element))
+  lambdaFunction.eval(row)
+}
+  }
+
+  override def eval(input: InternalRow): Any = {
+val inputCollection = inputData.eval(input)
+
+if (inputCollection == null) {
+  return inputCollection
+}
+
+val results = inputDataType match {
--- End diff --

We shouldn't be doing this during eval. Please move this into a function 
val.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20771: [SPARK-23587][SQL] Add interpreted execution for ...

2018-03-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20771#discussion_r173231642
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -599,8 +610,71 @@ case class MapObjects private(
 
   override def children: Seq[Expression] = lambdaFunction :: inputData :: 
Nil
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported")
+  // The data with PythonUserDefinedType are actually stored with the data 
type of its sqlType.
+  // When we want to apply MapObjects on it, we have to use it.
+  lazy private val inputDataType = inputData.dataType match {
+case p: PythonUserDefinedType => p.sqlType
+case _ => inputData.dataType
+  }
+
+  private def executeFuncOnCollection(inputCollection: Seq[_]): Seq[_] = {
+inputCollection.map { element =>
+  val row = InternalRow.fromSeq(Seq(element))
+  lambdaFunction.eval(row)
+}
+  }
+
+  override def eval(input: InternalRow): Any = {
+val inputCollection = inputData.eval(input)
+
+if (inputCollection == null) {
+  return inputCollection
+}
+
+val results = inputDataType match {
+  case ObjectType(cls) if classOf[Seq[_]].isAssignableFrom(cls) =>
+executeFuncOnCollection(inputCollection.asInstanceOf[Seq[_]])
+  case ObjectType(cls) if cls.isArray =>
+
executeFuncOnCollection(inputCollection.asInstanceOf[Array[_]].toSeq)
+  case ObjectType(cls) if 
classOf[java.util.List[_]].isAssignableFrom(cls) =>
+
executeFuncOnCollection(inputCollection.asInstanceOf[java.util.List[_]].asScala)
+  case ObjectType(cls) if cls == classOf[Object] =>
+if (inputCollection.getClass.isArray) {
+  
executeFuncOnCollection(inputCollection.asInstanceOf[Array[_]].toSeq)
+} else {
+  executeFuncOnCollection(inputCollection.asInstanceOf[Seq[_]])
+}
+  case ArrayType(et, _) =>
+
executeFuncOnCollection(inputCollection.asInstanceOf[ArrayData].array)
+}
+
+customCollectionCls match {
--- End diff --

We shouldn't be doing this during eval. Please move this into a function 
val.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20773: [SPARK-23602][SQL] PrintToStderr prints value als...

2018-03-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20773#discussion_r173216739
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscExpressionsSuite.scala
 ---
@@ -47,18 +47,24 @@ class MiscExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   }
 
   test("PrintToStderr") {
-val errorStream = new java.io.ByteArrayOutputStream()
-val systemErr = System.err
-System.setErr(new PrintStream(errorStream))
 val inputExpr = Literal(1)
-// check without codegen
-checkEvaluationWithoutCodegen(PrintToStderr(inputExpr), 1)
-val outputEval = errorStream.toString
-errorStream.reset()
-// check with codegen
-checkEvaluationWithoutCodegen(PrintToStderr(inputExpr), 1)
-val outputCodegen = errorStream.toString
-System.setErr(systemErr)
+val systemErr = System.err
+
+val (outputEval, outputCodegen) = try {
+  val errorStream = new java.io.ByteArrayOutputStream()
+  System.setErr(new PrintStream(errorStream))
+  // check without codegen
+  checkEvaluationWithoutCodegen(PrintToStderr(inputExpr), 1)
+  val outputEval = errorStream.toString
+  errorStream.reset()
+  // check with codegen
+  checkEvaluationWithoutCodegen(PrintToStderr(inputExpr), 1)
--- End diff --

`checkEvaluationWithoutCodegen`?  Shouldn't we use 
`checkEvaluationWithUnsafeProjection` or 
`checkEvaluationWithGeneratedMutableProjection`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20773: [SPARK-23602][SQL] PrintToStderr prints value als...

2018-03-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20773#discussion_r173199589
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscExpressionsSuite.scala
 ---
@@ -43,4 +45,21 @@ class MiscExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper {
 checkEvaluation(Length(Uuid()), 36)
 assert(evaluateWithoutCodegen(Uuid()) !== 
evaluateWithoutCodegen(Uuid()))
   }
+
+  test("PrintToStderr") {
+val errorStream = new java.io.ByteArrayOutputStream()
+val systemErr = System.err
--- End diff --

Can you wrap setting the `stderr` in a `try { ... } finally { ... }`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20757: [SPARK-23595][SQL] ValidateExternalType should su...

2018-03-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20757#discussion_r173190616
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala 
---
@@ -121,6 +121,19 @@ object ScalaReflection extends ScalaReflection {
 case _ => false
   }
 
+  def classForNativeTypeOf(dt: DataType): Class[_] = dt match {
--- End diff --

There is a difference between how we implement an expression and how we use 
an expression. In this case the implementations should behave the same, and not 
only in the context in which it is being used.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20771: [SPARK-23587][SQL] Add interpreted execution for ...

2018-03-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20771#discussion_r173156834
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -501,12 +502,22 @@ case class LambdaVariable(
 value: String,
 isNull: String,
 dataType: DataType,
-nullable: Boolean = true) extends LeafExpression
-  with Unevaluable with NonSQLExpression {
+nullable: Boolean = true) extends LeafExpression with NonSQLExpression 
{
+
+  // Interpreted execution of `LambdaVariable` always get the 0-index 
element from input row.
+  override def eval(input: InternalRow): Any = {
+assert(input.numFields == 1,
+  "The input row of interpreted LambdaVariable should have only 1 
field.")
+input.get(0, dataType)
--- End diff --

Not a change for this PR. Maybe we should use accessors here? This uses a 
matching under the hood and is slower than virtual function dispatch. 
Implementing this would also be useful for BoundReference for example. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20771: [SPARK-23587][SQL] Add interpreted execution for ...

2018-03-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20771#discussion_r173156217
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -599,8 +610,71 @@ case class MapObjects private(
 
   override def children: Seq[Expression] = lambdaFunction :: inputData :: 
Nil
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported")
+  // The data with PythonUserDefinedType are actually stored with the data 
type of its sqlType.
+  // When we want to apply MapObjects on it, we have to use it.
+  lazy private val inputDataType = inputData.dataType match {
+case p: PythonUserDefinedType => p.sqlType
--- End diff --

(I just noticed that this wasn't introduced by you, but please change it 
anyway)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20771: [SPARK-23587][SQL] Add interpreted execution for ...

2018-03-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20771#discussion_r173155528
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -599,8 +610,71 @@ case class MapObjects private(
 
   override def children: Seq[Expression] = lambdaFunction :: inputData :: 
Nil
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported")
+  // The data with PythonUserDefinedType are actually stored with the data 
type of its sqlType.
+  // When we want to apply MapObjects on it, we have to use it.
+  lazy private val inputDataType = inputData.dataType match {
+case p: PythonUserDefinedType => p.sqlType
--- End diff --

Please use the `UserDefinedType` super class here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20760: [SPARK-23592][SQL] Add interpreted execution to DecodeUs...

2018-03-08 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/20760
  
Merging to master. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20756: [SPARK-23593][SQL] Add interpreted execution for ...

2018-03-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20756#discussion_r173152685
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1261,8 +1261,42 @@ case class InitializeJavaBean(beanInstance: 
Expression, setters: Map[String, Exp
   override def children: Seq[Expression] = beanInstance +: 
setters.values.toSeq
   override def dataType: DataType = beanInstance.dataType
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported.")
+  private lazy val resolvedSetters = {
+assert(beanInstance.dataType.isInstanceOf[ObjectType])
+
+val ObjectType(beanClass) = beanInstance.dataType
+setters.map {
+  case (name, expr) =>
+// Looking for known type mapping first, then using Class attached 
in `ObjectType`.
+// Finally also looking for general `Object`-type parameter for 
generic methods.
+val paramTypes = 
CallMethodViaReflection.typeMapping.getOrElse(expr.dataType,
+Seq(expr.dataType.asInstanceOf[ObjectType].cls)) ++ 
Seq(classOf[Object])
+val methods = paramTypes.flatMap { fieldClass =>
+  try {
+Some(beanClass.getDeclaredMethod(name, fieldClass))
+  } catch {
+case e: NoSuchMethodException => None
+  }
+}
+if (methods.isEmpty) {
+  throw new NoSuchMethodException(s"""A method named "$name" is 
not declared """ +
+"in any enclosing class nor any supertype, nor through a 
static import")
+}
+methods.head -> expr
+}
+  }
+
+  override def eval(input: InternalRow): Any = {
+val instance = beanInstance.eval(input)
+if (instance != null) {
+  val bean = instance.asInstanceOf[Object]
+  resolvedSetters.foreach {
+case (setter, expr) =>
+  setter.invoke(bean, expr.eval(input).asInstanceOf[Object])
--- End diff --

Do we need the cast to `.asInstanceOf[Object]`? And why not use `AnyRef`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20756: [SPARK-23593][SQL] Add interpreted execution for ...

2018-03-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20756#discussion_r173152492
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1261,8 +1261,39 @@ case class InitializeJavaBean(beanInstance: 
Expression, setters: Map[String, Exp
   override def children: Seq[Expression] = beanInstance +: 
setters.values.toSeq
   override def dataType: DataType = beanInstance.dataType
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported.")
+  private lazy val resolvedSetters = {
+val ObjectType(beanClass) = beanInstance.dataType
+setters.map {
+  case (name, expr) =>
+// Looking for known type mapping first, then using Class attached 
in `ObjectType`.
+// Finally also looking for general `Object`-type parameter for 
generic methods.
+val paramTypes = 
CallMethodViaReflection.typeMapping.getOrElse(expr.dataType,
+Seq(expr.dataType.asInstanceOf[ObjectType].cls)) ++ 
Seq(classOf[Object])
+val methods = paramTypes.flatMap { fieldClass =>
+  try {
+Some(beanClass.getDeclaredMethod(name, fieldClass))
+  } catch {
+case e: NoSuchMethodException => None
+  }
+}
+if (methods.isEmpty) {
+  throw new NoSuchMethodException(s"""A method named "$name" is 
not declared """ +
+"in any enclosing class nor any supertype, nor through a 
static import")
--- End diff --

Can you remove the last bit `, nor through a static import`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20756: [SPARK-23593][SQL] Add interpreted execution for ...

2018-03-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20756#discussion_r173152324
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1261,8 +1261,39 @@ case class InitializeJavaBean(beanInstance: 
Expression, setters: Map[String, Exp
   override def children: Seq[Expression] = beanInstance +: 
setters.values.toSeq
   override def dataType: DataType = beanInstance.dataType
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported.")
+  private lazy val resolvedSetters = {
+val ObjectType(beanClass) = beanInstance.dataType
+setters.map {
+  case (name, expr) =>
+// Looking for known type mapping first, then using Class attached 
in `ObjectType`.
+// Finally also looking for general `Object`-type parameter for 
generic methods.
+val paramTypes = 
CallMethodViaReflection.typeMapping.getOrElse(expr.dataType,
--- End diff --

Ok, who is making those changes? You or @kiszk?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20757: [SPARK-23595][SQL] ValidateExternalType should su...

2018-03-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20757#discussion_r173150013
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala 
---
@@ -121,6 +121,19 @@ object ScalaReflection extends ScalaReflection {
 case _ => false
   }
 
+  def classForNativeTypeOf(dt: DataType): Class[_] = dt match {
--- End diff --

Shouldn't this match `CodeGenerator.boxedType` in terms of functionality? 
See my previous comments, but I am also missing complex types (struct, map & 
array).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20757: [SPARK-23595][SQL] ValidateExternalType should su...

2018-03-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20757#discussion_r173149690
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1416,13 +1417,43 @@ case class ValidateExternalType(child: Expression, 
expected: DataType)
 
   override def nullable: Boolean = child.nullable
 
-  override def dataType: DataType = 
RowEncoder.externalDataTypeForInput(expected)
-
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported")
+  override val dataType: DataType = 
RowEncoder.externalDataTypeForInput(expected)
 
   private val errMsg = s" is not a valid external type for schema of 
${expected.simpleString}"
 
+
+  private lazy val checkType: (Any) => Boolean = expected match {
+case _: DecimalType =>
+  (value: Any) => {
+value.isInstanceOf[java.math.BigDecimal] || 
value.isInstanceOf[scala.math.BigDecimal] ||
+  value.isInstanceOf[Decimal]
+  }
+case _: ArrayType =>
+  (value: Any) => {
+value.getClass.isArray || value.isInstanceOf[Seq[_]]
+  }
+case _ =>
+  val dataTypeClazz = if (dataType.isInstanceOf[ObjectType]) {
--- End diff --

Ok, two more things.
1. Let's just move this logic into the `classForNativeTypeOf` function.
2. Where are we dealing with `PythonUserDefinedType` or `UserDefinedType`? 
The `classForNativeTypeOf` function does not appear to be handling those.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r173146007
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java
 ---
@@ -93,6 +93,26 @@ public void setNullAt(int ordinal) {
 Platform.putLong(holder.buffer, getFieldOffset(ordinal), 0L);
   }
 
+  @Override
+  public void setNullByte(int ordinal) {
--- End diff --

This is pretty low level stuff, so you should know how many bytes things 
contain at this point. I'd rather leave as it is. Doing a type match on such a 
hot code path doesn't seem like a good idea.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r173141050
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java
 ---
@@ -93,6 +93,26 @@ public void setNullAt(int ordinal) {
 Platform.putLong(holder.buffer, getFieldOffset(ordinal), 0L);
   }
 
+  @Override
+  public void setNullByte(int ordinal) {
--- End diff --

We could also name them differently e.g.: `setNull1Byte`, `setNull2Bytes`, 
`setNull4Bytes` & `setNull8Bytes`



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20757: [SPARK-23595][SQL] ValidateExternalType should su...

2018-03-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20757#discussion_r173132924
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1416,13 +1417,42 @@ case class ValidateExternalType(child: Expression, 
expected: DataType)
 
   override def nullable: Boolean = child.nullable
 
-  override def dataType: DataType = 
RowEncoder.externalDataTypeForInput(expected)
-
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported")
+  override val dataType: DataType = 
RowEncoder.externalDataTypeForInput(expected)
 
   private val errMsg = s" is not a valid external type for schema of 
${expected.simpleString}"
 
+  private lazy val dataTypeClazz = if (dataType.isInstanceOf[ObjectType]) {
+dataType.asInstanceOf[ObjectType].cls
+  } else {
+// Some external types (e.g., native types and 
`PythonUserDefinedType`) might not be ObjectType
+ScalaReflection.classForNativeTypeOf(dataType)
+  }
+
+  private lazy val checkType = expected match {
--- End diff --

Just for readability add the type here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20757: [SPARK-23595][SQL] ValidateExternalType should su...

2018-03-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20757#discussion_r173132852
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1416,13 +1417,42 @@ case class ValidateExternalType(child: Expression, 
expected: DataType)
 
   override def nullable: Boolean = child.nullable
 
-  override def dataType: DataType = 
RowEncoder.externalDataTypeForInput(expected)
-
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported")
+  override val dataType: DataType = 
RowEncoder.externalDataTypeForInput(expected)
 
   private val errMsg = s" is not a valid external type for schema of 
${expected.simpleString}"
 
+  private lazy val dataTypeClazz = if (dataType.isInstanceOf[ObjectType]) {
--- End diff --

Can we fold this into the `checkType` functions. I'd rather not deference a 
lazy val every time we check a type.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r173130586
  
--- Diff: 
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java
 ---
@@ -93,6 +93,26 @@ public void setNullAt(int ordinal) {
 Platform.putLong(holder.buffer, getFieldOffset(ordinal), 0L);
   }
 
+  @Override
+  public void setNullByte(int ordinal) {
--- End diff --

These methods are needed for writing `UnsafeArrayData`, we fill the slot 
with 0s if we set it to null. The slot size in `UnsafeArrayData` is dependent 
on the data type we are storing in it.

I wanted to avoid writing a lot of duplicate code in the 
`InterpretedUsafeProjection`, and this is why I added these methods the 
`UnsafeWriter` parent class, and this also why they are in the 
`UnsafeRowWriter`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r173110061
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InterpretedUnsafeProjection.scala
 ---
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.analysis.CleanupAliases
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, 
GenericInternalRow, Nondeterministic, SpecializedGetters, 
StatefulNondeterministic, UnsafeArrayData, UnsafeMapData, UnsafeProjection, 
UnsafeProjectionCreator, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter}
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.types.{UserDefinedType, _}
+import org.apache.spark.unsafe.Platform
+
+/**
+ * An interpreted unsafe projection. This class reuses the [[UnsafeRow]] 
it produces, a consumer
+ * should copy the row if it is being buffered. This class is not thread 
safe.
+ *
+ * @param expressions that produces the resulting fields. These 
expressions must be bound
+ *to a schema.
+ */
+class InterpretedUnsafeProjection(expressions: Array[Expression]) extends 
UnsafeProjection {
+  import InterpretedUnsafeProjection._
+
+  /** Number of (top level) fields in the resulting row. */
+  private[this] val numFields = expressions.length
+
+  /** Array that expression results. */
+  private[this] val values = new Array[Any](numFields)
+
+  /** The row representing the expression results. */
+  private[this] val intermediate = new GenericInternalRow(values)
+
+  /** The row returned by the projection. */
+  private[this] val result = new UnsafeRow(numFields)
+
+  /** The buffer which holds the resulting row's backing data. */
+  private[this] val holder = new BufferHolder(result, numFields * 32)
+
+  /** The writer that writes the intermediate result to the result row. */
+  private[this] val writer: InternalRow => Unit = {
+val rowWriter = new UnsafeRowWriter(holder, numFields)
+val baseWriter = generateStructWriter(
+  holder,
+  rowWriter,
+  expressions.map(e => StructField("", e.dataType, e.nullable)))
+if (!expressions.exists(_.nullable)) {
+  // No nullable fields. The top-level null bit mask will always be 
zeroed out.
+  baseWriter
+} else {
+  // Zero out the null bit mask before we write the row.
+  row => {
+rowWriter.zeroOutNullBytes()
+baseWriter(row)
+  }
+}
+  }
+
+  override def initialize(partitionIndex: Int): Unit = {
+expressions.foreach(_.foreach {
+  case n: Nondeterministic => n.initialize(partitionIndex)
+  case _ =>
+})
+  }
+
+  override def apply(row: InternalRow): UnsafeRow = {
+// Put the expression results in the intermediate row.
+var i = 0
+while (i < numFields) {
+  values(i) = expressions(i).eval(row)
+  i += 1
+}
+
+// Write the intermediate row to an unsafe row.
+holder.reset()
+writer(intermediate)
+result.setTotalSize(holder.totalSize())
+result
+  }
+}
+
+/**
+ * Helper functions for creating an [[InterpretedUnsafeProjection]].
+ */
+object InterpretedUnsafeProjection extends UnsafeProjectionCreator {
+
+  /**
+   * Returns an [[UnsafeProjection]] for given sequence of bound 
Expressions.
+   */
+  override protected def createProjection(exprs: Seq[Expression]): 
UnsafeProjection = {
+// We need to make sure that we do not reuse stateful non 
deterministic expressions.
+val cleanedExpressions = exprs.map(_.transform {
+ 

[GitHub] spark issue #20762: [SPARK-23620] Splitting thread dump lines by using the b...

2018-03-08 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/20762
  
Merging to master. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20750: [SPARK-23581][SQL] Add interpreted unsafe project...

2018-03-07 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20750#discussion_r173025320
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
 ---
@@ -146,6 +145,18 @@ object UnsafeProjection {
 create(exprs.map(BindReferences.bindReference(_, inputSchema)))
   }
 
+  /**
+   * Returns an [[UnsafeProjection]] for given sequence of bound 
Expressions.
+   */
+  protected def createProjection(exprs: Seq[Expression]): UnsafeProjection
--- End diff --

Yeah, that was pretty stupid. It is fixed now :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20762: [SPARK-23620] Splitting thread dump lines by using the b...

2018-03-07 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/20762
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20751: [SPARK-23591][SQL] Add interpreted execution to EncodeUs...

2018-03-07 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/20751
  
LGTM - merging to master. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20756: [SPARK-23593][SQL] Add interpreted execution for ...

2018-03-07 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20756#discussion_r172901366
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1261,8 +1261,24 @@ case class InitializeJavaBean(beanInstance: 
Expression, setters: Map[String, Exp
   override def children: Seq[Expression] = beanInstance +: 
setters.values.toSeq
   override def dataType: DataType = beanInstance.dataType
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported.")
+  override def eval(input: InternalRow): Any = {
+val instance = beanInstance.eval(input).asInstanceOf[Object]
+if (instance != null) {
+  setters.foreach { case (setterMethod, fieldExpr) =>
--- End diff --

See https://github.com/apache/spark/pull/20753#issuecomment-371195614 :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20753: [SPARK-23582][SQL] StaticInvoke should support interpret...

2018-03-07 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/20753
  
and cc @viirya 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20753: [SPARK-23582][SQL] StaticInvoke should support interpret...

2018-03-07 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/20753
  
Ok, let's go with reflection then.

cc @rednaxelafx


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20753: [SPARK-23582][SQL] StaticInvoke should support interpret...

2018-03-07 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/20753
  
@kiszk I think we should benchmark it. My rational for considering method 
handles is that they seem to be made for this purpose, and they should become 
more performant with newer versions of java.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20756: [SPARK-23593][SQL] Add interpreted execution for ...

2018-03-07 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20756#discussion_r172852988
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1261,8 +1261,24 @@ case class InitializeJavaBean(beanInstance: 
Expression, setters: Map[String, Exp
   override def children: Seq[Expression] = beanInstance +: 
setters.values.toSeq
   override def dataType: DataType = beanInstance.dataType
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported.")
+  override def eval(input: InternalRow): Any = {
+val instance = beanInstance.eval(input).asInstanceOf[Object]
+if (instance != null) {
+  setters.foreach { case (setterMethod, fieldExpr) =>
--- End diff --

Hmmm it should be lenient. Can you try 
`MethodType.methodType(classOf[Object], fieldClass))`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20756: [SPARK-23593][SQL] Add interpreted execution for ...

2018-03-07 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20756#discussion_r172842972
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
 ---
@@ -49,7 +49,8 @@ trait ExpressionEvalHelper extends 
GeneratorDrivenPropertyChecks {
   expression: => Expression, expected: Any, inputRow: InternalRow = 
EmptyRow): Unit = {
 val serializer = new JavaSerializer(new SparkConf()).newInstance
 val resolver = ResolveTimeZone(new SQLConf)
-val expr = 
resolver.resolveTimeZones(serializer.deserialize(serializer.serialize(expression)))
+// Make it as method to obtain fresh expression everytime.
--- End diff --

Are we using a literal? Ok, makes sense.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20756: [SPARK-23593][SQL] Add interpreted execution for ...

2018-03-07 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20756#discussion_r172836786
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1261,8 +1261,31 @@ case class InitializeJavaBean(beanInstance: 
Expression, setters: Map[String, Exp
   override def children: Seq[Expression] = beanInstance +: 
setters.values.toSeq
   override def dataType: DataType = beanInstance.dataType
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported.")
+  private lazy val resolvedSetters = {
+val ObjectType(beanClass) = beanInstance.dataType
+
+setters.map { case (setterMethod, fieldExpr) =>
+  val foundMethods = beanClass.getMethods.filter { method =>
--- End diff --

(Picking up our earlier conversation) You are not checking the argument?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20756: [SPARK-23593][SQL] Add interpreted execution for ...

2018-03-07 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20756#discussion_r172836291
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
 ---
@@ -49,7 +49,8 @@ trait ExpressionEvalHelper extends 
GeneratorDrivenPropertyChecks {
   expression: => Expression, expected: Any, inputRow: InternalRow = 
EmptyRow): Unit = {
 val serializer = new JavaSerializer(new SparkConf()).newInstance
 val resolver = ResolveTimeZone(new SQLConf)
-val expr = 
resolver.resolveTimeZones(serializer.deserialize(serializer.serialize(expression)))
+// Make it as method to obtain fresh expression everytime.
--- End diff --

Why this change?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20751: [SPARK-23591][SQL] Add interpreted execution to E...

2018-03-07 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20751#discussion_r172835939
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -105,6 +105,66 @@ trait InvokeLike extends Expression with 
NonSQLExpression {
   }
 }
 
+/**
+ * Common trait for [[DecodeUsingSerializer]] and [[EncodeUsingSerializer]]
+ */
+trait BaseSerializer {
+  /**
+   * If true, Kryo serialization is used, otherwise the Java one is used
+   */
+  val kryo: Boolean
+
+  /**
+   * The serializer instance to be used for serialization/deserialization
+   */
+  lazy val serializerInstance = {
--- End diff --

I was talking about the code below (the comment was somewhat terse). We 
should definitely put the `serializerInstance` into a the generated classes 
immutable state. All I am saying is that is that we can put the factory code 
below in a companion, and use that in both the generated and interpreted code 
paths. This reduces the amount of code generated, and also makes things easier 
to test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20751: [SPARK-23591][SQL] Add interpreted execution to E...

2018-03-07 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20751#discussion_r172834496
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -105,6 +105,66 @@ trait InvokeLike extends Expression with 
NonSQLExpression {
   }
 }
 
+/**
+ * Common trait for [[DecodeUsingSerializer]] and [[EncodeUsingSerializer]]
+ */
+trait BaseSerializer {
--- End diff --

`BaseSerializer` is a bit generic. Can we come up with a better name? We 
could name it `UsingSerializer` or `SerializerSupport` (I am not super excited 
about either, so we could also leave it as it is).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20756: [SPARK-23593][SQL] Add interpreted execution for ...

2018-03-07 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20756#discussion_r172834120
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1261,8 +1261,24 @@ case class InitializeJavaBean(beanInstance: 
Expression, setters: Map[String, Exp
   override def children: Seq[Expression] = beanInstance +: 
setters.values.toSeq
   override def dataType: DataType = beanInstance.dataType
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported.")
+  override def eval(input: InternalRow): Any = {
+val instance = beanInstance.eval(input).asInstanceOf[Object]
+if (instance != null) {
+  setters.foreach { case (setterMethod, fieldExpr) =>
--- End diff --

Yes, we do. The compiler will check the generated code right? It is just a 
matter of where you decide to fail. In a perfect world this would happen on the 
driver.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20756: [SPARK-23593][SQL] Add interpreted execution for ...

2018-03-07 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20756#discussion_r172833058
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1261,8 +1261,24 @@ case class InitializeJavaBean(beanInstance: 
Expression, setters: Map[String, Exp
   override def children: Seq[Expression] = beanInstance +: 
setters.values.toSeq
   override def dataType: DataType = beanInstance.dataType
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported.")
+  override def eval(input: InternalRow): Any = {
+val instance = beanInstance.eval(input).asInstanceOf[Object]
+if (instance != null) {
+  setters.foreach { case (setterMethod, fieldExpr) =>
--- End diff --

`lookup.findSetter` means direct field access in the `MethodHandle` world.  
So you need to use `lookup.findVirtual`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20751: [SPARK-23591][SQL] Add interpreted execution to E...

2018-03-07 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20751#discussion_r172828460
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -105,6 +105,66 @@ trait InvokeLike extends Expression with 
NonSQLExpression {
   }
 }
 
+/**
+ * Common trait for [[DecodeUsingSerializer]] and [[EncodeUsingSerializer]]
+ */
+trait BaseSerializer {
+  /**
+   * If true, Kryo serialization is used, otherwise the Java one is used
+   */
+  val kryo: Boolean
+
+  /**
+   * The serializer instance to be used for serialization/deserialization
+   */
+  lazy val serializerInstance = {
+val conf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf)
+val s = if (kryo) {
+new KryoSerializer(conf)
+  } else {
+new JavaSerializer(conf)
+  }
+s.newInstance()
+  }
+
+  /**
+   * The name of the variable referencing the serializer, which is added 
with
+   * `addImmutableSerializerIfNeeded`
+   */
+  lazy val serializerVarName = if (kryo) {
--- End diff --

Let's make `addImmutableSerializerIfNeeded` return the serializer name. No 
need for a separate method.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20751: [SPARK-23591][SQL] Add interpreted execution to E...

2018-03-07 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20751#discussion_r172828302
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -105,6 +105,66 @@ trait InvokeLike extends Expression with 
NonSQLExpression {
   }
 }
 
+/**
+ * Common trait for [[DecodeUsingSerializer]] and [[EncodeUsingSerializer]]
+ */
+trait BaseSerializer {
+  /**
+   * If true, Kryo serialization is used, otherwise the Java one is used
+   */
+  val kryo: Boolean
+
+  /**
+   * The serializer instance to be used for serialization/deserialization
+   */
+  lazy val serializerInstance = {
--- End diff --

We can also put this in a companion and call this from whole stage code gen.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20748: [SPARK-23611][SQL] Add a helper function to check except...

2018-03-07 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/20748
  
LGTM - merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20756: [SPARK-23593][SQL] Add interpreted execution for ...

2018-03-07 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20756#discussion_r172819668
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1254,8 +1254,24 @@ case class InitializeJavaBean(beanInstance: 
Expression, setters: Map[String, Exp
   override def children: Seq[Expression] = beanInstance +: 
setters.values.toSeq
   override def dataType: DataType = beanInstance.dataType
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported.")
+  override def eval(input: InternalRow): Any = {
+val instance = beanInstance.eval(input).asInstanceOf[Object]
+if (instance != null) {
+  setters.foreach { case (setterMethod, fieldExpr) =>
+val fieldValue = fieldExpr.eval(input).asInstanceOf[Object]
+
+val foundMethods = instance.getClass.getMethods.filter { method =>
+  method.getName == setterMethod && 
Modifier.isPublic(method.getModifiers) &&
+method.getParameterTypes.length == 1
+}
+assert(foundMethods.length == 1,
+  throw new RuntimeException("The Java Bean instance should have 
only one " +
--- End diff --

Well the compiler will check if the method exists. We could throw a 
`NoSuchMethodError` if plan to keep doing resolution in the `eval` method (see 
my other comment).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20756: [SPARK-23593][SQL] Add interpreted execution for ...

2018-03-07 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/20756#discussion_r172801767
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1261,8 +1261,24 @@ case class InitializeJavaBean(beanInstance: 
Expression, setters: Map[String, Exp
   override def children: Seq[Expression] = beanInstance +: 
setters.values.toSeq
   override def dataType: DataType = beanInstance.dataType
 
-  override def eval(input: InternalRow): Any =
-throw new UnsupportedOperationException("Only code-generated 
evaluation is supported.")
+  override def eval(input: InternalRow): Any = {
+val instance = beanInstance.eval(input).asInstanceOf[Object]
+if (instance != null) {
+  setters.foreach { case (setterMethod, fieldExpr) =>
--- End diff --

Why are we resolving setters during `eval`? That seems a bit expensive. How 
about we create the setters before we execute eval? For example (I got a bit 
carried away):
```scala
  private lazy val resolvedSetters = {
val ObjectType(beanClass) = beanInstance.dataType
val lookup = MethodHandles.lookup()
setters.map {
  case (name, expr) =>
// Resolve expression type (should be better!)
val fieldClass = 
CallMethodViaReflection.typeMapping(expr.dataType).head
val handle = lookup.findVirtual(
  beanClass,
  name,
  MethodType.methodType(classOf[Unit], fieldClass))
handle -> expr
}
  }

  override def eval(input: InternalRow): Any = {
val bean = beanInstance.eval(input)
resolvedSetters.foreach {
  case (setter, expr) =>
setter.invoke(bean, expr.eval(input))
}
bean
  }
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   7   8   9   10   >