[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-12-04 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r238683833
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
 ---
@@ -535,4 +535,98 @@ class UnsafeRowConverterSuite extends SparkFunSuite 
with Matchers with PlanTestB
 assert(unsafeRow.getSizeInBytes ==
   8 + 8 * 2 + roundedSize(field1.getSizeInBytes) + 
roundedSize(field2.getSizeInBytes))
   }
+
+  testBothCodegenAndInterpreted("SPARK-25374 converts back into safe 
representation") {
+def convertBackToInternalRow(inputRow: InternalRow, fields: 
Array[DataType]): InternalRow = {
+  val unsafeProj = UnsafeProjection.create(fields)
+  val unsafeRow = unsafeProj(inputRow)
+  val safeProj = SafeProjection.create(fields)
+  safeProj(unsafeRow)
+}
+
+// Simple tests
+val inputRow = InternalRow.fromSeq(Seq(
+  false, 3.toByte, 15.toShort, -83, 129L, 1.0f, 8.0, 
UTF8String.fromString("test"),
+  Decimal(255), CalendarInterval.fromString("interval 1 day"), 
Array[Byte](1, 2)
+))
+val fields1 = Array(
+  BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType,
+  DoubleType, StringType, DecimalType.defaultConcreteType, 
CalendarIntervalType,
+  BinaryType)
+
+assert(convertBackToInternalRow(inputRow, fields1) === inputRow)
+
+// Array tests
+val arrayRow = InternalRow.fromSeq(Seq(
+  createArray(1, 2, 3),
+  createArray(
+createArray(Seq("a", "b", "c").map(UTF8String.fromString): _*),
+createArray(Seq("d").map(UTF8String.fromString): _*))
+))
+val fields2 = Array[DataType](
+  ArrayType(IntegerType),
+  ArrayType(ArrayType(StringType)))
+
+assert(convertBackToInternalRow(arrayRow, fields2) === arrayRow)
+
+// Struct tests
+val structRow = InternalRow.fromSeq(Seq(
+  InternalRow.fromSeq(Seq[Any](1, 4.0)),
+  InternalRow.fromSeq(Seq(
+UTF8String.fromString("test"),
+InternalRow.fromSeq(Seq(
+  1,
+  createArray(Seq("2", "3").map(UTF8String.fromString): _*)
+))
+  ))
+))
+val fields3 = Array[DataType](
+  StructType(
+StructField("c0", IntegerType) ::
+StructField("c1", DoubleType) ::
+Nil),
+  StructType(
+StructField("c2", StringType) ::
+StructField("c3", StructType(
+  StructField("c4", IntegerType) ::
+  StructField("c5", ArrayType(StringType)) ::
+  Nil)) ::
+Nil))
+
+assert(convertBackToInternalRow(structRow, fields3) === structRow)
+
+// Map tests
+val mapRow = InternalRow.fromSeq(Seq(
+  createMap(Seq("k1", "k2").map(UTF8String.fromString): _*)(1, 2),
+  createMap(
+createMap(3, 5)(Seq("v1", "v2").map(UTF8String.fromString): _*),
+createMap(7, 9)(Seq("v3", "v4").map(UTF8String.fromString): _*)
+  )(
+createMap(Seq("k3", "k4").map(UTF8String.fromString): 
_*)(3.toShort, 4.toShort),
+createMap(Seq("k5", "k6").map(UTF8String.fromString): 
_*)(5.toShort, 6.toShort)
+  )))
+val fields4 = Array[DataType](
+  MapType(StringType, IntegerType),
+  MapType(MapType(IntegerType, StringType), MapType(StringType, 
ShortType)))
+
+val mapResultRow = convertBackToInternalRow(mapRow, 
fields4).toSeq(fields4)
+val mapExpectedRow = mapRow.toSeq(fields4)
+// Since `ArrayBasedMapData` does not override `equals` and `hashCode`,
--- End diff --

`ArrayBasedMapData`/`UnsafeMapData` does not have `equals()` or 
`hashCode()` implemented because we do not have a good story around map 
equality. Implementing equals/hashcode for map is only half of the solution, we 
would also need a comparable binary format.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23127: [SPARK-26159] Codegen for LocalTableScanExec and RDDScan...

2018-11-23 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/23127
  
Looks good. One more higher level question that can also be addressed in a 
follow-up.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...

2018-11-23 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/23127#discussion_r236017398
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -406,14 +415,39 @@ trait BlockingOperatorWithCodegen extends 
CodegenSupport {
   override def limitNotReachedChecks: Seq[String] = Nil
 }
 
+/**
+ * Leaf codegen node reading from a single RDD.
+ */
+trait InputRDDCodegen extends CodegenSupport {
--- End diff --

Should we reconcile this with the code gen for `RowDataSourceScanExec`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23022: [SPARK-26038] Decimal toScalaBigInt/toJavaBigInteger for...

2018-11-23 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/23022
  
Merging to master. Thank!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23096: [SPARK-26129][SQL] Instrumentation for per-query ...

2018-11-20 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/23096#discussion_r235159238
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
---
@@ -648,7 +648,11 @@ class SparkSession private(
* @since 2.0.0
*/
   def sql(sqlText: String): DataFrame = {
-Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
+val tracker = new QueryPlanningTracker
--- End diff --

@dongjoon-hyun just out of curiosity, what would you like to disable here? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23075: [SPARK-26084][SQL] Fixes unresolved AggregateExpression....

2018-11-20 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/23075
  
Also backported to 2.3/2.4.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23075: [SPARK-26084][SQL] Fixes unresolved AggregateExpression....

2018-11-20 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/23075
  
Merging to master. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23075: [SPARK-26084][SQL] Fixes unresolved AggregateExpression....

2018-11-20 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/23075
  
Let's see if this works :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23075: [SPARK-26084][SQL] Fixes unresolved AggregateExpression....

2018-11-20 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/23075
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23075: [SPARK-26084][SQL] Fixes unresolved AggregateExpression....

2018-11-19 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/23075
  
Ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23018: [SPARK-26023][SQL] Dumping truncated plans and generated...

2018-11-13 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/23018
  
Merging to master. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23018: [SPARK-26023][SQL] Dumping truncated plans and generated...

2018-11-13 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/23018
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22961: [SPARK-25947][SQL] Reduce memory usage in Shuffle...

2018-11-08 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22961#discussion_r232061457
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 ---
@@ -214,13 +214,24 @@ object ShuffleExchangeExec {
   override def getPartition(key: Any): Int = key.asInstanceOf[Int]
 }
   case RangePartitioning(sortingExpressions, numPartitions) =>
-// Internally, RangePartitioner runs a job on the RDD that samples 
keys to compute
-// partition bounds. To get accurate samples, we need to copy the 
mutable keys.
+// Extract only fields used for sorting to avoid collecting large 
fields that does not
+// affect sorting result when deciding partition bounds in 
RangePartitioner
 val rddForSampling = rdd.mapPartitionsInternal { iter =>
+  val projection =
+UnsafeProjection.create(sortingExpressions.map(_.child), 
outputAttributes)
   val mutablePair = new MutablePair[InternalRow, Null]()
-  iter.map(row => mutablePair.update(row.copy(), null))
+  // Internally, RangePartitioner runs a job on the RDD that 
samples keys to compute
+  // partition bounds. To get accurate samples, we need to copy 
the mutable keys.
+  iter.map(row => mutablePair.update(projection(row).copy(), null))
 }
-implicit val ordering = new 
LazilyGeneratedOrdering(sortingExpressions, outputAttributes)
+// Construct ordering on extracted sort key.
+val orderingAttributes =
--- End diff --

This is a bit clunky IMO. Can we do this instead:
```scala
val orderingAttributes = sortingExpressions.zipWithIndex.map { case (ord, 
i) =>
  ord.copy(child = BoundReference(i, ord.dataType, ord.nullable))
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22964: [SPARK-25963] Optimize generate followed by window

2018-11-07 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/22964
  
@uzadude where is this relevant? You will end up with two shuffles if you 
do this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22932: [SPARK-25102][SQL] Write Spark version to ORC/Par...

2018-11-04 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22932#discussion_r230604337
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/package.scala ---
@@ -44,4 +44,13 @@ package object sql {
   type Strategy = SparkStrategy
 
   type DataFrame = Dataset[Row]
+
+  /**
+   * Metadata key which is used to write Spark version in the followings:
+   * - Parquet file metadata
+   * - ORC file metadata
+   *
+   * Note that Hive table property `spark.sql.create.version` also has 
Spark version.
+   */
+  private[sql] val CREATE_VERSION = "org.apache.spark.sql.create.version"
--- End diff --

Is this a pre-existing key? Seems that `org.apache.spark.version` should be 
enough.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22925: [SPARK-25913][SQL] Extend UnaryExecNode by unary SparkPl...

2018-11-01 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/22925
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22789: [SPARK-25767][SQL] Fix lazily evaluated stream of expres...

2018-10-29 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/22789
  
Merged to master/2.4


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22789: [SPARK-25767][SQL] Fix lazily evaluated stream of expres...

2018-10-29 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/22789
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22789: [SPARK-25767][SQL] Fix lazily evaluated stream of...

2018-10-28 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22789#discussion_r228760802
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
 ---
@@ -319,4 +319,15 @@ class WholeStageCodegenSuite extends QueryTest with 
SharedSQLContext {
   assert(df.limit(1).collect() === Array(Row("bat", 8.0)))
 }
   }
+
+  test("SPARK-25767: Lazy evaluated stream of expressions handled 
correctly") {
--- End diff --

oh wait, I executed those commands in sparkShell and that falls back to 
interpreted mode when compilation fails. That is what happened here. Sorry 
about the fuss.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22789: [SPARK-25767][SQL] Fix lazily evaluated stream of...

2018-10-28 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22789#discussion_r228749168
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -146,7 +146,10 @@ trait CodegenSupport extends SparkPlan {
   if (outputVars != null) {
 assert(outputVars.length == output.length)
 // outputVars will be used to generate the code for UnsafeRow, so 
we should copy them
-outputVars.map(_.copy())
+outputVars.map(_.copy()) match {
--- End diff --

What in `consume` is relying on a side effect of traversing the 
`outputVars`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22789: [SPARK-25767][SQL] Fix lazily evaluated stream of...

2018-10-28 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22789#discussion_r228748979
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
 ---
@@ -319,4 +319,15 @@ class WholeStageCodegenSuite extends QueryTest with 
SharedSQLContext {
   assert(df.limit(1).collect() === Array(Row("bat", 8.0)))
 }
   }
+
+  test("SPARK-25767: Lazy evaluated stream of expressions handled 
correctly") {
--- End diff --

Even without this patch this test would pass (I just tried it on master). A 
stream always evaluates its first element, so you probably need to add another 
key here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22789: [SPARK-25767][SQL] Fix lazily evaluated stream of expres...

2018-10-28 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/22789
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22822: [SPARK-25678] Requesting feedback regarding a prototype ...

2018-10-25 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/22822
  
@UtkarshMe well there is signal in the lack of responsiveness. Adding and 
maintaining cluster managers has proven to be quite painful, case and point is 
the lack of love that Mesos is receiving. I don't really see a way forward here 
unless there is strong consensus in the community.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22822: [SPARK-25678] Requesting feedback regarding a prototype ...

2018-10-25 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/22822
  
@UtkarshMe you should reach out to the spark dev list about this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22817: [SPARK-25816][SQL] ResolveReferences should work bottom-...

2018-10-24 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/22817
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22817: [SPARK-25816][SQL] ResolveReferences should work bottom-...

2018-10-24 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/22817
  
@peter-toth what are you trying to fix here? Could you add this to the PR 
description?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22576: [SPARK-25560][SQL] Allow FunctionInjection in Spa...

2018-10-19 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22576#discussion_r226623886
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala ---
@@ -168,4 +173,21 @@ class SparkSessionExtensions {
   def injectParser(builder: ParserBuilder): Unit = {
 parserBuilders += builder
   }
+
+  private[this] val injectedFunctions = 
mutable.Buffer.empty[FunctionDescription]
+
+  private[sql] def registerFunctions(functionRegistry: FunctionRegistry) = 
{
+for ((name, expressionInfo, function) <- injectedFunctions) {
--- End diff --

👍 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22576: [SPARK-25560][SQL] Allow FunctionInjection in SparkExten...

2018-10-19 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/22576
  
@RussellSpitzer I am merging this, can you address my comment in a follow 
up? Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22576: [SPARK-25560][SQL] Allow FunctionInjection in Spa...

2018-10-19 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22576#discussion_r226571338
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala ---
@@ -168,4 +173,21 @@ class SparkSessionExtensions {
   def injectParser(builder: ParserBuilder): Unit = {
 parserBuilders += builder
   }
+
+  private[this] val injectedFunctions = 
mutable.Buffer.empty[FunctionDescription]
+
+  private[sql] def registerFunctions(functionRegistry: FunctionRegistry) = 
{
+for ((name, expressionInfo, function) <- injectedFunctions) {
--- End diff --

Can you move the stuff that changes the `FunctionRegistry` into the 
`BaseSessionStateBuilder` and just make this return the 
`Seq[FunctionDescription]`? The return type of this function a 
`FunctionRegistry` sort of implies that you are getting back a new registry 
instead of a mutated one. If we are mutating then I prefer to do that in the 
BaseSessionBuilder so it is obvious that this is safe to do because we mutating 
a clone. It also makes this code more inline with the rest of the extension 
class (not mutating). Sorry for the late change of heart. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22712: [SPARK-25724] Add sorting functionality in MapTyp...

2018-10-13 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22712#discussion_r224957118
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala ---
@@ -73,6 +74,90 @@ case class MapType(
   override private[spark] def existsRecursively(f: (DataType) => Boolean): 
Boolean = {
 f(this) || keyType.existsRecursively(f) || 
valueType.existsRecursively(f)
   }
+
+  private[this] class OrderedWrapper {
+var isOrdered: Boolean = false
--- End diff --

I prefer not to make this mutable if we can. That can be a source of some 
pretty weird errors if we move from an unordered to an ordered map. Why do you 
need this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22696: [SPARK-25708][SQL] HAVING without GROUP BY means ...

2018-10-11 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22696#discussion_r224590474
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1894,6 +1894,8 @@ working with timestamps in `pandas_udf`s to get the 
best performance, see
 
   - In PySpark, when creating a `SparkSession` with 
`SparkSession.builder.getOrCreate()`, if there is an existing `SparkContext`, 
the builder was trying to update the `SparkConf` of the existing `SparkContext` 
with configurations specified to the builder, but the `SparkContext` is shared 
by all `SparkSession`s, so we should not update them. Since 3.0, the builder 
come to not update the configurations. This is the same behavior as Java/Scala 
API in 2.3 and above. If you want to update them, you need to update them prior 
to creating a `SparkSession`.
 
+  - In Spark version 2.4 and earlier, HAVING without GROUP BY is treated 
as WHERE. This means, `SELECT 1 FROM range(10) HAVING true` is executed as 
`SELECT 1 FROM range(10) WHERE true`  and returns 10 rows. This violates SQL 
standard, and has been fixed in Spark 3.0. Since Spark 3.0, HAVING without 
GROUP BY is treated as a global aggregate, which means `SELECT 1 FROM range(10) 
HAVING true` will return only one row.
--- End diff --

You will need to feature flag it if you port it to 2.4. People might rely 
on its current behavior.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22696: [SPARK-25708][SQL] HAVING without GROUP BY means global ...

2018-10-11 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/22696
  
I added the release-notes label to the JIRA ticket. I am not sure if there 
is a migration-guide label.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22576: [SPARK-25560][SQL] Allow FunctionInjection in Spa...

2018-10-11 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22576#discussion_r224366907
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala ---
@@ -168,4 +173,22 @@ class SparkSessionExtensions {
   def injectParser(builder: ParserBuilder): Unit = {
 parserBuilders += builder
   }
+
+  private[this] val injectedFunctions =
--- End diff --

NIT: no new line?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22576: [SPARK-25560][SQL] Allow FunctionInjection in Spa...

2018-10-11 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22576#discussion_r224366774
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
 ---
@@ -95,7 +95,8 @@ abstract class BaseSessionStateBuilder(
* This either gets cloned from a pre-existing version or cloned from 
the built-in registry.
*/
   protected lazy val functionRegistry: FunctionRegistry = {
-
parentState.map(_.functionRegistry).getOrElse(FunctionRegistry.builtin).clone()
+parentState.map(_.functionRegistry.clone())
+  
.getOrElse{extensions.registerFunctions(FunctionRegistry.builtin.clone())}
--- End diff --

Use parenthesis instead of curly braces?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22576: [SPARK-25560][SQL] Allow FunctionInjection in Spa...

2018-10-11 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22576#discussion_r224364692
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala ---
@@ -168,4 +173,22 @@ class SparkSessionExtensions {
   def injectParser(builder: ParserBuilder): Unit = {
 parserBuilders += builder
   }
+
+  private[this] val injectedFunctions =
+mutable.Buffer.empty[FunctionDescription]
+
+private[sql] def registerFunctions(functionRegistry: FunctionRegistry) 
= {
--- End diff --

Can you fix the indentation for the next 14 lines?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22429: [SPARK-25440][SQL] Dumping query execution info t...

2018-10-10 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22429#discussion_r223983702
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala ---
@@ -189,23 +192,34 @@ class QueryExecution(val sparkSession: SparkSession, 
val logical: LogicalPlan) {
   """.stripMargin.trim
   }
 
+  private def writeOrError(writer: Writer)(f: Writer => Unit): Unit =
+try f(writer) catch { case e: AnalysisException => 
writer.write(e.toString) }
--- End diff --

Please use multiple lines here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22429: [SPARK-25440][SQL] Dumping query execution info t...

2018-10-10 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22429#discussion_r223983537
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala ---
@@ -167,6 +172,58 @@ package object util {
 builder.toString()
   }
 
+  /**
+   * The performance overhead of creating and logging strings for wide 
schemas can be large. To
+   * limit the impact, we bound the number of fields to include by 
default. This can be overridden
+   * by setting the 'spark.debug.maxToStringFields' conf in SparkEnv or by 
settings the SQL config
+   * `spark.sql.debug.maxToStringFields`.
+   */
+  private[spark] def maxNumToStringFields: Int = {
+val legacyLimit = if (SparkEnv.get != null) {
+  SparkEnv.get.conf.get(config.MAX_TO_STRING_FIELDS)
+} else {
+  config.MAX_TO_STRING_FIELDS.defaultValue.get
+}
+val sqlConfLimit = SQLConf.get.maxToStringFields
+
+Math.max(sqlConfLimit, legacyLimit)
+  }
+
+  /** Whether we have warned about plan string truncation yet. */
+  private val truncationWarningPrinted = new AtomicBoolean(false)
+
+  /**
+   * Format a sequence with semantics similar to calling .mkString(). Any 
elements beyond
+   * maxNumToStringFields will be dropped and replaced by a "... N more 
fields" placeholder.
+   *
+   * @return the trimmed and formatted string.
+   */
+  def truncatedString[T](
+  seq: Seq[T],
+  start: String,
+  sep: String,
+  end: String,
+  maxFields: Option[Int]): String = {
+val maxNumFields = maxFields.getOrElse(maxNumToStringFields)
--- End diff --

You should document this behavior.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22429: [SPARK-25440][SQL] Dumping query execution info t...

2018-10-10 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22429#discussion_r223982046
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala ---
@@ -167,6 +172,58 @@ package object util {
 builder.toString()
   }
 
+  /**
+   * The performance overhead of creating and logging strings for wide 
schemas can be large. To
+   * limit the impact, we bound the number of fields to include by 
default. This can be overridden
+   * by setting the 'spark.debug.maxToStringFields' conf in SparkEnv or by 
settings the SQL config
+   * `spark.sql.debug.maxToStringFields`.
+   */
+  private[spark] def maxNumToStringFields: Int = {
+val legacyLimit = if (SparkEnv.get != null) {
--- End diff --

Just for context why do you want to retain the legacy behavior? It is 
probably fine to break it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22429: [SPARK-25440][SQL] Dumping query execution info t...

2018-10-10 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22429#discussion_r223980665
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala 
---
@@ -455,21 +457,37 @@ abstract class TreeNode[BaseType <: 
TreeNode[BaseType]] extends Product {
   }.mkString(", ")
 
   /** ONE line description of this node. */
-  def simpleString: String = s"$nodeName $argString".trim
+  def simpleString(maxFields: Option[Int]): String = {
--- End diff --

Please document the `maxFields` parameter. I am especially interested in 
what `None` represents here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22429: [SPARK-25440][SQL] Dumping query execution info t...

2018-10-10 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22429#discussion_r223979931
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala 
---
@@ -52,7 +52,7 @@ case class CatalystDataToAvro(child: Expression) extends 
UnaryExpression {
 out.toByteArray
   }
 
-  override def simpleString: String = {
+  override def simpleString(maxFields: Option[Int]): String = {
 s"to_avro(${child.sql}, ${child.dataType.simpleString})"
--- End diff --

Should we also pass the maxFields to `child.dataType`? For example 
`StructType` fields are truncated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22429: [SPARK-25440][SQL] Dumping query execution info t...

2018-10-10 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22429#discussion_r223979392
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -633,4 +633,14 @@ package object config {
   .stringConf
   .toSequence
   .createWithDefault(Nil)
+
+  private[spark] val MAX_TO_STRING_FIELDS =
+ConfigBuilder("spark.debug.maxToStringFields")
+  .internal()
+  .doc("Maximum number of fields of sequence-like entries that can be 
converted to strings " +
--- End diff --

What is a sequence like entry?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22429: [SPARK-25440][SQL] Dumping query execution info to a fil...

2018-10-10 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/22429
  
@boy-uber the thing you are suggesting is a pretty big undertaking and 
beyond the scope of this PR.

If you are going to add structured plans to the explain output, you 
probably also want some guarantees about stability over multiple spark versions 
and you probably also want to be able to reconstruct the plan. Neither is easy. 
If you want to have this in Spark, then I suggest sending a proposal to the dev 
list.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22674: [SPARK-25680][SQL] SQL execution listener shouldn...

2018-10-09 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22674#discussion_r223886858
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala 
---
@@ -75,95 +76,74 @@ trait QueryExecutionListener {
  */
 @Experimental
 @InterfaceStability.Evolving
-class ExecutionListenerManager private extends Logging {
-
-  private[sql] def this(conf: SparkConf) = {
-this()
+// The `session` is used to indicate which session carries this listener 
manager, and we only
+// catch SQL executions which are launched by the same session.
+// The `loadExtensions` flag is used to indicate whether we should load 
the pre-defined,
+// user-specified listeners during construction. We should not do it when 
cloning this listener
+// manager, as we will copy all listeners to the cloned listener manager.
+class ExecutionListenerManager private[sql](session: SparkSession, 
loadExtensions: Boolean)
+  extends SparkListener with Logging {
+
+  private[this] val listeners = new 
CopyOnWriteArrayList[QueryExecutionListener]
+
+  if (loadExtensions) {
+val conf = session.sparkContext.conf
 conf.get(QUERY_EXECUTION_LISTENERS).foreach { classNames =>
   Utils.loadExtensions(classOf[QueryExecutionListener], classNames, 
conf).foreach(register)
 }
   }
 
+  session.sparkContext.listenerBus.addToSharedQueue(this)
+
   /**
* Registers the specified [[QueryExecutionListener]].
*/
   @DeveloperApi
-  def register(listener: QueryExecutionListener): Unit = writeLock {
-listeners += listener
+  def register(listener: QueryExecutionListener): Unit = {
+listeners.add(listener)
   }
 
   /**
* Unregisters the specified [[QueryExecutionListener]].
*/
   @DeveloperApi
-  def unregister(listener: QueryExecutionListener): Unit = writeLock {
-listeners -= listener
+  def unregister(listener: QueryExecutionListener): Unit = {
+listeners.remove(listener)
   }
 
   /**
* Removes all the registered [[QueryExecutionListener]].
*/
   @DeveloperApi
-  def clear(): Unit = writeLock {
+  def clear(): Unit = {
 listeners.clear()
   }
 
   /**
* Get an identical copy of this listener manager.
*/
-  @DeveloperApi
-  override def clone(): ExecutionListenerManager = writeLock {
-val newListenerManager = new ExecutionListenerManager
-listeners.foreach(newListenerManager.register)
+  private[sql] def clone(session: SparkSession): ExecutionListenerManager 
= {
+val newListenerManager = new ExecutionListenerManager(session, 
loadExtensions = false)
+listeners.iterator().asScala.foreach(newListenerManager.register)
 newListenerManager
   }
 
-  private[sql] def onSuccess(funcName: String, qe: QueryExecution, 
duration: Long): Unit = {
-readLock {
-  withErrorHandling { listener =>
-listener.onSuccess(funcName, qe, duration)
+  override def onOtherEvent(event: SparkListenerEvent): Unit = event match 
{
+case e: SparkListenerSQLExecutionEnd if shouldCatchEvent(e) =>
+  val funcName = e.executionName.get
+  e.executionFailure match {
+case Some(ex) =>
+  listeners.iterator().asScala.foreach(_.onFailure(funcName, e.qe, 
ex))
--- End diff --

This is a bit of high level thought, you could consider making the calling 
event queue responsible for the dispatch of these events. That way you can 
leverage any improvement to the underlying event bus.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22674: [SPARK-25680][SQL] SQL execution listener shouldn...

2018-10-09 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22674#discussion_r223885742
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala 
---
@@ -75,95 +76,74 @@ trait QueryExecutionListener {
  */
 @Experimental
 @InterfaceStability.Evolving
-class ExecutionListenerManager private extends Logging {
-
-  private[sql] def this(conf: SparkConf) = {
-this()
+// The `session` is used to indicate which session carries this listener 
manager, and we only
+// catch SQL executions which are launched by the same session.
+// The `loadExtensions` flag is used to indicate whether we should load 
the pre-defined,
+// user-specified listeners during construction. We should not do it when 
cloning this listener
+// manager, as we will copy all listeners to the cloned listener manager.
+class ExecutionListenerManager private[sql](session: SparkSession, 
loadExtensions: Boolean)
+  extends SparkListener with Logging {
+
+  private[this] val listeners = new 
CopyOnWriteArrayList[QueryExecutionListener]
+
+  if (loadExtensions) {
+val conf = session.sparkContext.conf
 conf.get(QUERY_EXECUTION_LISTENERS).foreach { classNames =>
   Utils.loadExtensions(classOf[QueryExecutionListener], classNames, 
conf).foreach(register)
 }
   }
 
+  session.sparkContext.listenerBus.addToSharedQueue(this)
+
   /**
* Registers the specified [[QueryExecutionListener]].
*/
   @DeveloperApi
-  def register(listener: QueryExecutionListener): Unit = writeLock {
-listeners += listener
+  def register(listener: QueryExecutionListener): Unit = {
+listeners.add(listener)
   }
 
   /**
* Unregisters the specified [[QueryExecutionListener]].
*/
   @DeveloperApi
-  def unregister(listener: QueryExecutionListener): Unit = writeLock {
-listeners -= listener
+  def unregister(listener: QueryExecutionListener): Unit = {
+listeners.remove(listener)
   }
 
   /**
* Removes all the registered [[QueryExecutionListener]].
*/
   @DeveloperApi
-  def clear(): Unit = writeLock {
+  def clear(): Unit = {
 listeners.clear()
   }
 
   /**
* Get an identical copy of this listener manager.
*/
-  @DeveloperApi
-  override def clone(): ExecutionListenerManager = writeLock {
-val newListenerManager = new ExecutionListenerManager
-listeners.foreach(newListenerManager.register)
+  private[sql] def clone(session: SparkSession): ExecutionListenerManager 
= {
+val newListenerManager = new ExecutionListenerManager(session, 
loadExtensions = false)
+listeners.iterator().asScala.foreach(newListenerManager.register)
 newListenerManager
   }
 
-  private[sql] def onSuccess(funcName: String, qe: QueryExecution, 
duration: Long): Unit = {
-readLock {
-  withErrorHandling { listener =>
-listener.onSuccess(funcName, qe, duration)
+  override def onOtherEvent(event: SparkListenerEvent): Unit = event match 
{
+case e: SparkListenerSQLExecutionEnd if shouldCatchEvent(e) =>
+  val funcName = e.executionName.get
+  e.executionFailure match {
+case Some(ex) =>
+  listeners.iterator().asScala.foreach(_.onFailure(funcName, e.qe, 
ex))
+case _ =>
+  listeners.iterator().asScala.foreach(_.onSuccess(funcName, e.qe, 
e.duration))
   }
-}
-  }
 
-  private[sql] def onFailure(funcName: String, qe: QueryExecution, 
exception: Exception): Unit = {
-readLock {
-  withErrorHandling { listener =>
-listener.onFailure(funcName, qe, exception)
-  }
-}
+case _ => // Ignore
   }
 
-  private[this] val listeners = ListBuffer.empty[QueryExecutionListener]
-
-  /** A lock to prevent updating the list of listeners while we are 
traversing through them. */
-  private[this] val lock = new ReentrantReadWriteLock()
-
-  private def withErrorHandling(f: QueryExecutionListener => Unit): Unit = 
{
-for (listener <- listeners) {
-  try {
-f(listener)
-  } catch {
-case NonFatal(e) => logWarning("Error executing query execution 
listener", e)
-  }
-}
-  }
-
-  /** Acquires a read lock on the cache for the duration of `f`. */
-  private def readLock[A](f: => A): A = {
-val rl = lock.readLock()
-rl.lock()
-try f finally {
-  rl.unlock()
-}
-  }
-
-  /** Acquires a write lock on the cache for the duration of `f`. */
-  private def writeLock[A](f: => A): A = {
-val wl = lo

[GitHub] spark pull request #22674: [SPARK-25680][SQL] SQL execution listener shouldn...

2018-10-09 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22674#discussion_r223873662
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala 
---
@@ -75,95 +76,74 @@ trait QueryExecutionListener {
  */
 @Experimental
 @InterfaceStability.Evolving
-class ExecutionListenerManager private extends Logging {
-
-  private[sql] def this(conf: SparkConf) = {
-this()
+// The `session` is used to indicate which session carries this listener 
manager, and we only
--- End diff --

Why is this not a class doc?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22674: [SPARK-25680][SQL] SQL execution listener shouldn...

2018-10-09 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22674#discussion_r223873406
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala ---
@@ -39,7 +39,14 @@ case class SparkListenerSQLExecutionStart(
 
 @DeveloperApi
 case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)
-  extends SparkListenerEvent
+  extends SparkListenerEvent {
+
+  @JsonIgnore private[sql] var executionName: Option[String] = None
--- End diff --

Why do we want to be backwards compatible here? SHS?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

2018-09-18 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/16677
  
1. `numOutputs` is the number or records
2. 8 bytes per `MapStatus`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22429: [SPARK-25440][SQL] Dumping query execution info to a fil...

2018-09-18 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/22429
  
@MaxGekk please just modify simpleString it is internal API for this reason.

@rednaxelafx rope approach has the benefit that it does not create a ton of 
intermediate buffers. We could do that as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22429: [SPARK-25440][SQL] Dumping query execution info t...

2018-09-16 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22429#discussion_r217928428
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala ---
@@ -250,5 +254,36 @@ class QueryExecution(val sparkSession: SparkSession, 
val logical: LogicalPlan) {
 def codegenToSeq(): Seq[(String, String)] = {
   org.apache.spark.sql.execution.debug.codegenStringSeq(executedPlan)
 }
+
+/**
+ * Dumps debug information about query execution into the specified 
file.
+ */
+def toFile(path: String): Unit = {
+  val maxFields = SparkEnv.get.conf.getInt(Utils.MAX_TO_STRING_FIELDS,
+Utils.DEFAULT_MAX_TO_STRING_FIELDS)
+  val filePath = new Path(path)
+  val fs = FileSystem.get(filePath.toUri, 
sparkSession.sessionState.newHadoopConf())
+  val writer = new BufferedWriter(new 
OutputStreamWriter(fs.create(filePath)))
+
+  try {
+SparkEnv.get.conf.set(Utils.MAX_TO_STRING_FIELDS, 
Int.MaxValue.toString)
+writer.write("== Parsed Logical Plan ==\n")
--- End diff --

Can we combine this entire block with what is done in the `toString()` 
method?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22429: [SPARK-25440][SQL] Dumping query execution info t...

2018-09-16 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22429#discussion_r217928334
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala ---
@@ -250,5 +254,36 @@ class QueryExecution(val sparkSession: SparkSession, 
val logical: LogicalPlan) {
 def codegenToSeq(): Seq[(String, String)] = {
   org.apache.spark.sql.execution.debug.codegenStringSeq(executedPlan)
 }
+
+/**
+ * Dumps debug information about query execution into the specified 
file.
+ */
+def toFile(path: String): Unit = {
+  val maxFields = SparkEnv.get.conf.getInt(Utils.MAX_TO_STRING_FIELDS,
+Utils.DEFAULT_MAX_TO_STRING_FIELDS)
+  val filePath = new Path(path)
+  val fs = FileSystem.get(filePath.toUri, 
sparkSession.sessionState.newHadoopConf())
+  val writer = new BufferedWriter(new 
OutputStreamWriter(fs.create(filePath)))
+
+  try {
+SparkEnv.get.conf.set(Utils.MAX_TO_STRING_FIELDS, 
Int.MaxValue.toString)
--- End diff --

It is generally a bad idea to change this conf as people expect that it is 
immutable. Also this change has some far reaching consequences, others will now 
also be exposed to a different `Utils.MAX_TO_STRING_FIELDS` value when calling 
`explain()`. Can you please just pass the parameter down the tree?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22429: [SPARK-25440][SQL] Dumping query execution info t...

2018-09-16 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22429#discussion_r217928262
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala 
---
@@ -469,7 +470,17 @@ abstract class TreeNode[BaseType <: 
TreeNode[BaseType]] extends Product {
   def treeString: String = treeString(verbose = true)
 
   def treeString(verbose: Boolean, addSuffix: Boolean = false): String = {
-generateTreeString(0, Nil, new StringBuilder, verbose = verbose, 
addSuffix = addSuffix).toString
+val baos = new ByteArrayOutputStream()
--- End diff --

What is the benefit of using this instead of using a `java.io.StringWriter` 
or `org.apache.commons.io.output.StringBuilderWriter`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22429: [SPARK-25440][SQL] Dumping query execution info t...

2018-09-16 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22429#discussion_r217915071
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala ---
@@ -250,5 +253,35 @@ class QueryExecution(val sparkSession: SparkSession, 
val logical: LogicalPlan) {
 def codegenToSeq(): Seq[(String, String)] = {
   org.apache.spark.sql.execution.debug.codegenStringSeq(executedPlan)
 }
+
+/**
+ * Dumps debug information about query execution into the specified 
file.
+ */
+def toFile(path: String): Unit = {
+  val filePath = new Path(path)
+  val fs = FileSystem.get(filePath.toUri, 
sparkSession.sparkContext.hadoopConfiguration)
--- End diff --

Why use the hadoop configuration of the `SparkContext`? It is probably 
better to use the one that `sparkSession.sessionState.newHadoopConf()` provides.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22429: [SPARK-25440][SQL] Dumping query execution info t...

2018-09-16 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22429#discussion_r217913739
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala 
---
@@ -469,7 +470,13 @@ abstract class TreeNode[BaseType <: 
TreeNode[BaseType]] extends Product {
   def treeString: String = treeString(verbose = true)
 
   def treeString(verbose: Boolean, addSuffix: Boolean = false): String = {
-generateTreeString(0, Nil, new StringBuilder, verbose = verbose, 
addSuffix = addSuffix).toString
+val bos = new ByteArrayOutputStream()
+treeString(bos, verbose, addSuffix)
+bos.toString
+  }
+
+  def treeString(os: OutputStream, verbose: Boolean, addSuffix: Boolean): 
Unit = {
--- End diff --

Can you please use a `java.io.Writer` or something else you can directly 
write a string to? You are now using `getBytes()` everywhere and that is far 
from cheap because it needs to encode the chars and allocate a byte array for 
each string.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...

2018-09-14 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22355#discussion_r217841164
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp
+
+
+/**
+ * A [[MutableProjection]] that is calculated by calling `eval` on each of 
the specified
+ * expressions.
+ *
+ * @param expressions a sequence of expressions that determine the value 
of each column of the
+ *output row.
+ */
+class InterpretedMutableProjection(expressions: Seq[Expression]) extends 
MutableProjection {
+  def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
+this(toBoundExprs(expressions, inputSchema))
+
+  private[this] val buffer = new Array[Any](expressions.size)
+
+  override def initialize(partitionIndex: Int): Unit = {
+expressions.foreach(_.foreach {
+  case n: Nondeterministic => n.initialize(partitionIndex)
+  case _ =>
+})
+  }
+
+  private[this] val validExprs = expressions.zipWithIndex.filter {
+case (NoOp, _) => false
+case _ => true
+  }
+  private[this] var mutableRow: InternalRow = new 
GenericInternalRow(expressions.size)
+  def currentValue: InternalRow = mutableRow
+
+  override def target(row: InternalRow): MutableProjection = {
+mutableRow = row
+this
+  }
+
+  override def apply(input: InternalRow): InternalRow = {
+validExprs.foreach { case (expr, i) =>
--- End diff --

Can you please use the old code? That should be much more performant that 
this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22417: [SPARK-25426][SQL] Remove the duplicate fallback logic i...

2018-09-14 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/22417
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22344: [SPARK-25352][SQL] Perform ordered global limit w...

2018-09-12 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22344#discussion_r217070658
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -68,22 +68,42 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   object SpecialLimits extends Strategy {
 override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
   case ReturnAnswer(rootPlan) => rootPlan match {
-case Limit(IntegerLiteral(limit), Sort(order, true, child))
-if limit < conf.topKSortFallbackThreshold =>
-  TakeOrderedAndProjectExec(limit, order, child.output, 
planLater(child)) :: Nil
-case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, 
true, child)))
-if limit < conf.topKSortFallbackThreshold =>
-  TakeOrderedAndProjectExec(limit, order, projectList, 
planLater(child)) :: Nil
+case Limit(IntegerLiteral(limit), s@Sort(order, true, child)) =>
+  if (limit < conf.topKSortFallbackThreshold) {
--- End diff --

@viirya sorry to be a little late to the party. This pattern is repeated 4x 
can you just most into a helper function?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22205: [SPARK-25212][SQL] Support Filter in ConvertToLoc...

2018-08-27 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22205#discussion_r213124828
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1349,6 +1353,12 @@ object ConvertToLocalRelation extends 
Rule[LogicalPlan] {
 
 case Limit(IntegerLiteral(limit), LocalRelation(output, data, 
isStreaming)) =>
   LocalRelation(output, data.take(limit), isStreaming)
+
+case Filter(condition, LocalRelation(output, data, isStreaming))
+if !hasUnevaluableExpr(condition) =>
--- End diff --

I suppose it is fine in this case. The only thing is that it violates the 
contract of the optimizer: it should not change the results of a query.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22205: [SPARK-25212][SQL] Support Filter in ConvertToLocalRelat...

2018-08-27 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/22205
  
@gatorsmile what are you afraid of exactly? We could check which tests are 
affected. Also do you want to disable this for testing only?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22239: [SPARK-19355][SQL][Followup] Remove the child.outputPart...

2018-08-26 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/22239
  
Shall we rename it to: **[SPARK-19355][SQL][Followup] Remove the 
child.outputOrdering check in global limit**?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22239: [SPARK-19355][SQL][Followup] Remove the child.outputPart...

2018-08-26 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/22239
  
LGTM - Let's wait a little bit with merging to allow others to comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22239: [SPARK-19355][SQL][Followup] Remove the child.outputPart...

2018-08-26 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/22239
  
Setting `spark.sql.limit.flatGlobalLimit` to `false` works for me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22239: [SPARK-19355][SQL][Followup] Remove the child.outputPart...

2018-08-26 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/22239
  
cc @cloud-fan for a sanity check.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22239: [SPARK-19355][SQL][Followup] Remove the child.outputPart...

2018-08-26 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/22239
  
@viirya did you try to run `TakeOrderedAndProjectSuite`? I am pretty sure 
that will fail now ;)...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-08-26 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r212830045
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the `limit` elements of the child output.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
-}
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
 
-  override def outputPartitioning: Partitioning = child.outputPartitioning
+  protected override def doExecute(): RDD[InternalRow] = {
+val childRDD = child.execute()
+val partitioner = LocalPartitioning(childRDD)
+val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
+  childRDD, child.output, partitioner, serializer)
+val numberOfOutput: Seq[Long] = if 
(shuffleDependency.rdd.getNumPartitions != 0) {
+  // submitMapStage does not accept RDD with 0 partition.
+  // So, we will not submit this dependency.
+  val submittedStageFuture = 
sparkContext.submitMapStage(shuffleDependency)
+  submittedStageFuture.get().recordsByPartitionId.toSeq
+} else {
+  Nil
+}
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+// During global limit, try to evenly distribute limited rows across 
data
+// partitions. If disabled, scanning data partitions sequentially 
until reaching limit number.
+// Besides, if child output has certain ordering, we can't evenly pick 
up rows from
+// each parititon.
+val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && 
child.outputOrdering == Nil
--- End diff --

`select * from table order by a limit 10` gets planned differently right? 
It should use `TakeOrderedAndProjectExec`.

There is nothing in the SQL standard that mandates that a nested order by 
followed by a limit has to respect that ordering clause. In fact, AFAIR, the 
standard does not even support nested limits (they make stuff 
non-deterministic).

If we end up supporting this, then I'd rather have an explicit flag in 
`GlobalLimitExec` (`orderedLimit` or something like that) and set that during 
planning by matching on `Limit(limit, Sort(order, true, child))`. I want the 
explicit flag because then we can figure out what limit is doing by looking at 
the physical plan. I want to explicitly check for an underlying sort to match 
the current `TakeOrderedAndProjectExec` semantics and to avoid weird behavior 
because something way down the plan has set some arbitrary ordering.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-08-25 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r212805707
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the `limit` elements of the child output.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
-}
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
 
-  override def outputPartitioning: Partitioning = child.outputPartitioning
+  protected override def doExecute(): RDD[InternalRow] = {
+val childRDD = child.execute()
+val partitioner = LocalPartitioning(childRDD)
+val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
+  childRDD, child.output, partitioner, serializer)
+val numberOfOutput: Seq[Long] = if 
(shuffleDependency.rdd.getNumPartitions != 0) {
+  // submitMapStage does not accept RDD with 0 partition.
+  // So, we will not submit this dependency.
+  val submittedStageFuture = 
sparkContext.submitMapStage(shuffleDependency)
+  submittedStageFuture.get().recordsByPartitionId.toSeq
+} else {
+  Nil
+}
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+// During global limit, try to evenly distribute limited rows across 
data
+// partitions. If disabled, scanning data partitions sequentially 
until reaching limit number.
+// Besides, if child output has certain ordering, we can't evenly pick 
up rows from
+// each parititon.
+val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && 
child.outputOrdering == Nil
--- End diff --

If we remove it, we may need to feature flag it first since people may rely 
on the old behavior. Anyway all of this is up for debate.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16677: [SPARK-19355][SQL] Use map output statistics to i...

2018-08-25 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16677#discussion_r212805327
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -93,25 +96,93 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 }
 
 /**
- * Take the first `limit` elements of each child partition, but do not 
collect or shuffle them.
+ * Take the `limit` elements of the child output.
  */
-case class LocalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
UnaryExecNode {
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+  override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
-}
 
-/**
- * Take the first `limit` elements of the child's single output partition.
- */
-case class GlobalLimitExec(limit: Int, child: SparkPlan) extends 
BaseLimitExec {
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
-  override def requiredChildDistribution: List[Distribution] = AllTuples 
:: Nil
+  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
 
-  override def outputPartitioning: Partitioning = child.outputPartitioning
+  protected override def doExecute(): RDD[InternalRow] = {
+val childRDD = child.execute()
+val partitioner = LocalPartitioning(childRDD)
+val shuffleDependency = ShuffleExchangeExec.prepareShuffleDependency(
+  childRDD, child.output, partitioner, serializer)
+val numberOfOutput: Seq[Long] = if 
(shuffleDependency.rdd.getNumPartitions != 0) {
+  // submitMapStage does not accept RDD with 0 partition.
+  // So, we will not submit this dependency.
+  val submittedStageFuture = 
sparkContext.submitMapStage(shuffleDependency)
+  submittedStageFuture.get().recordsByPartitionId.toSeq
+} else {
+  Nil
+}
 
-  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+// During global limit, try to evenly distribute limited rows across 
data
+// partitions. If disabled, scanning data partitions sequentially 
until reaching limit number.
+// Besides, if child output has certain ordering, we can't evenly pick 
up rows from
+// each parititon.
+val flatGlobalLimit = sqlContext.conf.limitFlatGlobalLimit && 
child.outputOrdering == Nil
--- End diff --

@viirya dumb question, what is `child.outputOrdering` doing here? I am not 
entirely sure that we should guarantee that you should get the lowest elements 
of a dataset if you perform a limit in the middle of a query (a top level 
sort-limit does have this guarantee). I also don't think the SQL standard 
supports/mandates this.

Moreover checking `child.outputOrdering` only checks the order of the 
partition and not the order of the frame as a whole. You should also add the 
`child.outputPartitioning`.

I would be slightly in favor of removing the `child.outputOrdering` check.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22216: [SPARK-25223][SQL] Use a map to store values for NamedLa...

2018-08-24 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/22216
  
I think the use of global state and a thread local is far more hacky and 
probably is slower.

The only clean solution I see here is to pass the lambda values around 
using the input row. I am not saying that this easy.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21899: [SPARK-24912][SQL] Don't obscure source of OOM du...

2018-08-17 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/21899#discussion_r211044133
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
 ---
@@ -118,12 +119,20 @@ case class BroadcastExchangeExec(
   // SparkFatalException, which is a subclass of Exception. 
ThreadUtils.awaitResult
   // will catch this exception and re-throw the wrapped fatal 
throwable.
   case oe: OutOfMemoryError =>
-throw new SparkFatalException(
+val sizeMessage = if (dataSize != -1) {
+  s"${SparkLauncher.DRIVER_MEMORY} by at least the estimated 
size of the " +
--- End diff --

Forgive me for asking a dumb question, but where will this exception come 
from? The block manager?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...

2018-08-10 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22062#discussion_r209372979
  
--- Diff: 
core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import java.lang.{Long => JLong}
+
+import org.mockito.Mockito.when
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark._
+import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.memory._
+import org.apache.spark.unsafe.Platform
+
+class ShuffleExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext with MockitoSugar {
+
+  test("nested spill should be no-op") {
+val conf = new SparkConf()
+  .setMaster("local[1]")
+  .setAppName("ShuffleExternalSorterSuite")
+  .set("spark.testing", "true")
+  .set("spark.testing.memory", "1600")
+  .set("spark.memory.fraction", "1")
+sc = new SparkContext(conf)
+
+val memoryManager = UnifiedMemoryManager(conf, 1)
+
+var shouldAllocate = false
+
+// Mock `TaskMemoryManager` to allocate free memory when 
`shouldAllocate` is true.
+// This will trigger a nested spill and expose issues if we don't 
handle this case properly.
+val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) {
+  override def acquireExecutionMemory(required: Long, consumer: 
MemoryConsumer): Long = {
+// ExecutionMemoryPool.acquireMemory will wait until there are 400 
bytes for a task to use.
+// So we leave 400 bytes for the task.
+if (shouldAllocate &&
+  memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed 
> 400) {
+  val acquireExecutionMemoryMethod =
+memoryManager.getClass.getMethods.filter(_.getName == 
"acquireExecutionMemory").head
+  acquireExecutionMemoryMethod.invoke(
+memoryManager,
+JLong.valueOf(
+  memoryManager.maxHeapMemory - 
memoryManager.executionMemoryUsed - 400),
+JLong.valueOf(1L), // taskAttemptId
+MemoryMode.ON_HEAP
+  ).asInstanceOf[java.lang.Long]
+}
+super.acquireExecutionMemory(required, consumer)
+  }
+}
+val taskContext = mock[TaskContext]
--- End diff --

lol


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14083: [SPARK-16406][SQL] Improve performance of Logical...

2018-08-10 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/14083#discussion_r209336169
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
 ---
@@ -138,6 +140,88 @@ package object expressions  {
 def indexOf(exprId: ExprId): Int = {
   Option(exprIdToOrdinal.get(exprId)).getOrElse(-1)
 }
+
+private def unique[T](m: Map[T, Seq[Attribute]]): Map[T, 
Seq[Attribute]] = {
+  m.mapValues(_.distinct).map(identity)
+}
+
+/** Map to use for direct case insensitive attribute lookups. */
+@transient private lazy val direct: Map[String, Seq[Attribute]] = {
+  unique(attrs.groupBy(_.name.toLowerCase))
+}
+
+/** Map to use for qualified case insensitive attribute lookups. */
+@transient private val qualified: Map[(String, String), 
Seq[Attribute]] = {
+  val grouped = attrs.filter(_.qualifier.isDefined).groupBy { a =>
+(a.qualifier.get.toLowerCase, a.name.toLowerCase)
+  }
+  unique(grouped)
+}
+
+/** Perform attribute resolution given a name and a resolver. */
+def resolve(nameParts: Seq[String], resolver: Resolver): 
Option[NamedExpression] = {
+  // Collect matching attributes given a name and a lookup.
+  def collectMatches(name: String, candidates: 
Option[Seq[Attribute]]): Seq[Attribute] = {
+candidates.toSeq.flatMap(_.collect {
+  case a if resolver(a.name, name) => a.withName(name)
+})
+  }
+
+  // Find matches for the given name assuming that the 1st part is a 
qualifier (i.e. table name,
+  // alias, or subquery alias) and the 2nd part is the actual name. 
This returns a tuple of
+  // matched attributes and a list of parts that are to be resolved.
+  //
+  // For example, consider an example where "a" is the table name, "b" 
is the column name,
+  // and "c" is the struct field name, i.e. "a.b.c". In this case, 
Attribute will be "a.b",
+  // and the second element will be List("c").
+  val matches = nameParts match {
+case qualifier +: name +: nestedFields =>
+  val key = (qualifier.toLowerCase, name.toLowerCase)
+  val attributes = collectMatches(name, qualified.get(key)).filter 
{ a =>
+resolver(qualifier, a.qualifier.get)
+  }
+  (attributes, nestedFields)
+case all =>
+  (Nil, all)
+  }
+
+  // If none of attributes match `table.column` pattern, we try to 
resolve it as a column.
+  val (candidates, nestedFields) = matches match {
+case (Seq(), _) =>
+  val name = nameParts.head
+  val attributes = collectMatches(name, 
direct.get(name.toLowerCase))
+  (attributes, nameParts.tail)
+case _ => matches
+  }
+
+  def name = UnresolvedAttribute(nameParts).name
+  candidates match {
+case Seq(a) if nestedFields.nonEmpty =>
+  // One match, but we also need to extract the requested nested 
field.
+  // The foldLeft adds ExtractValues for every remaining parts of 
the identifier,
+  // and aliased it with the last part of the name.
+  // For example, consider "a.b.c", where "a" is resolved to an 
existing attribute.
+  // Then this will add ExtractValue("c", ExtractValue("b", a)), 
and alias the final
+  // expression as "c".
+  val fieldExprs = nestedFields.foldLeft(a: Expression) { (e, 
name) =>
+ExtractValue(e, Literal(name), resolver)
--- End diff --

@heuermh I have not filed the issue for this. Do you want to work on this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...

2018-08-10 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22062#discussion_r209292284
  
--- Diff: 
core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import java.lang.{Long => JLong}
+
+import org.mockito.Mockito.when
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark._
+import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.memory._
+import org.apache.spark.unsafe.Platform
+
+class ShuffleExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext with MockitoSugar {
+
+  test("nested spill should be no-op") {
+val conf = new SparkConf()
+  .setMaster("local[1]")
+  .setAppName("ShuffleExternalSorterSuite")
+  .set("spark.testing", "true")
+  .set("spark.testing.memory", "1600")
+  .set("spark.memory.fraction", "1")
+sc = new SparkContext(conf)
+
+val memoryManager = UnifiedMemoryManager(conf, 1)
+
+var shouldAllocate = false
+
+// Mock `TaskMemoryManager` to allocate free memory when 
`shouldAllocate` is true.
+// This will trigger a nested spill and expose issues if we don't 
handle this case properly.
+val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) {
+  override def acquireExecutionMemory(required: Long, consumer: 
MemoryConsumer): Long = {
+// ExecutionMemoryPool.acquireMemory will wait until there are 400 
bytes for a task to use.
+// So we leave 400 bytes for the task.
+if (shouldAllocate &&
+  memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed 
> 400) {
+  val acquireExecutionMemoryMethod =
+memoryManager.getClass.getMethods.filter(_.getName == 
"acquireExecutionMemory").head
+  acquireExecutionMemoryMethod.invoke(
+memoryManager,
+JLong.valueOf(
+  memoryManager.maxHeapMemory - 
memoryManager.executionMemoryUsed - 400),
+JLong.valueOf(1L), // taskAttemptId
+MemoryMode.ON_HEAP
+  ).asInstanceOf[java.lang.Long]
+}
+super.acquireExecutionMemory(required, consumer)
+  }
+}
+val taskContext = mock[TaskContext]
--- End diff --

Do we need mockito here? We can also create a `TaskContextImpl` by hand 
right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...

2018-08-10 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22062#discussion_r209291439
  
--- Diff: 
core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort
+
+import java.lang.{Long => JLong}
+
+import org.mockito.Mockito.when
+import org.scalatest.mockito.MockitoSugar
+
+import org.apache.spark._
+import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.memory._
+import org.apache.spark.unsafe.Platform
+
+class ShuffleExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext with MockitoSugar {
+
+  test("nested spill should be no-op") {
+val conf = new SparkConf()
+  .setMaster("local[1]")
+  .setAppName("ShuffleExternalSorterSuite")
+  .set("spark.testing", "true")
+  .set("spark.testing.memory", "1600")
+  .set("spark.memory.fraction", "1")
+sc = new SparkContext(conf)
+
+val memoryManager = UnifiedMemoryManager(conf, 1)
+
+var shouldAllocate = false
+
+// Mock `TaskMemoryManager` to allocate free memory when 
`shouldAllocate` is true.
+// This will trigger a nested spill and expose issues if we don't 
handle this case properly.
+val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) {
+  override def acquireExecutionMemory(required: Long, consumer: 
MemoryConsumer): Long = {
+// ExecutionMemoryPool.acquireMemory will wait until there are 400 
bytes for a task to use.
+// So we leave 400 bytes for the task.
+if (shouldAllocate &&
+  memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed 
> 400) {
+  val acquireExecutionMemoryMethod =
+memoryManager.getClass.getMethods.filter(_.getName == 
"acquireExecutionMemory").head
+  acquireExecutionMemoryMethod.invoke(
+memoryManager,
+JLong.valueOf(
+  memoryManager.maxHeapMemory - 
memoryManager.executionMemoryUsed - 400),
+JLong.valueOf(1L), // taskAttemptId
+MemoryMode.ON_HEAP
+  ).asInstanceOf[java.lang.Long]
+}
+super.acquireExecutionMemory(required, consumer)
+  }
+}
+val taskContext = mock[TaskContext]
+val taskMetrics = new TaskMetrics
+when(taskContext.taskMetrics()).thenReturn(taskMetrics)
+val sorter = new ShuffleExternalSorter(
+  taskMemoryManager,
+  sc.env.blockManager,
+  taskContext,
+  100, // initialSize - This will require ShuffleInMemorySorter to 
acquire at least 800 bytes
+  1, // numPartitions
+  conf,
+  new ShuffleWriteMetrics)
+val inMemSorter = {
+  val field = sorter.getClass.getDeclaredField("inMemSorter")
+  field.setAccessible(true)
+  field.get(sorter).asInstanceOf[ShuffleInMemorySorter]
+}
+// Allocate memory to make the next "insertRecord" call triggers a 
spill.
+val bytes = new Array[Byte](1)
+while (inMemSorter.hasSpaceForAnotherRecord) {
--- End diff --

Access to the `hasSpaceForAnotherRecord` is the only reason why we need 
reflection right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22062: [SPARK-25081][Core]Nested spill in ShuffleExterna...

2018-08-10 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22062#discussion_r209262151
  
--- Diff: 
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java ---
@@ -94,12 +94,20 @@ public int numRecords() {
   }
 
   public void reset() {
+// Reset `pos` here so that `spill` triggered by the below 
`allocateArray` will be no-op.
+pos = 0;
--- End diff --

For my understanding: this is enough to fix the actual issue here right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spi...

2018-08-10 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/21369
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22064: [MINOR][BUILD] Add ECCN notice required by http://www.ap...

2018-08-10 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/22064
  
LGTM FWIW


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

2018-08-10 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/16677
  
Merging to master. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22057: [SPARK-25077][SQL] Delete unused variable in WindowExec

2018-08-09 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/22057
  
So I guess LGTM. I am generally not a fan of these aesthetic changes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16677: [SPARK-19355][SQL] Use map output statistics to improve ...

2018-08-08 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/16677
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22035: [SPARK-23911][SQL][FOLLOW-UP] Fix examples of aggregate ...

2018-08-08 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/22035
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-07 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/21954#discussion_r208273712
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.DataType
+
+/**
+ * Resolve a higher order functions from the catalog. This is different 
from regular function
+ * resolution because lambda functions can only be resolved after the 
function has been resolved;
+ * so we need to resolve higher order function when all children are 
either resolved or a lambda
+ * function.
+ */
+case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends 
Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperators {
+case q: LogicalPlan =>
+  q.transformExpressions {
+case u @ UnresolvedFunction(fn, children, false)
+if hasLambdaAndResolvedArguments(children) =>
+  withPosition(u) {
+catalog.lookupFunction(fn, children) match {
+  case func: HigherOrderFunction => func
+  case other => other.failAnalysis(
+"A lambda function should only be used in a higher order 
function. However, " +
+  s"its class is ${other.getClass.getCanonicalName}, which 
is not a " +
+  s"higher order function.")
+}
+  }
+  }
+  }
+
+  /**
+   * Check if the arguments of a function are either resolved or a lambda 
function.
+   */
+  private def hasLambdaAndResolvedArguments(expressions: Seq[Expression]): 
Boolean = {
+val (lambdas, others) = 
expressions.partition(_.isInstanceOf[LambdaFunction])
+lambdas.nonEmpty && others.forall(_.resolved)
+  }
+}
+
+/**
+ * Resolve the lambda variables exposed by a higher order functions.
+ *
+ * This rule works in two steps:
+ * [1]. Bind the anonymous variables exposed by the higher order function 
to the lambda function's
+ *  arguments; this creates named and typed lambda variables. The 
argument names are checked
+ *  for duplicates and the number of arguments are checked during this 
step.
+ * [2]. Resolve the used lambda variables used in the lambda function's 
function expression tree.
+ *  Note that we allow the use of variables from outside the current 
lambda, this can either
+ *  be a lambda function defined in an outer scope, or a attribute in 
produced by the plan's
+ *  child. If names are duplicate, the name defined in the most inner 
scope is used.
+ */
+case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] 
{
+
+  type LambdaVariableMap = Map[String, NamedExpression]
+
+  private val canonicalizer = {
+if (!conf.caseSensitiveAnalysis) {
+  s: String => s.toLowerCase
+} else {
+  s: String => s
+}
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+plan.resolveOperators {
+  case q: LogicalPlan =>
+q.mapExpressions(resolve(_, Map.empty))
+}
+  }
+
+  /**
+   * Create a bound lambda function by binding the arguments of a lambda 
function to the given
+   * partial arguments (dataType and nullability only). If the expression 
happens to be an already
+   * bound lambda function then we assume it has been bound to the correct 
arguments and do
+   * nothing. This

[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_from_arrays function

2018-08-07 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/21258#discussion_r208199133
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
 ---
@@ -235,6 +235,69 @@ case class CreateMap(children: Seq[Expression]) 
extends Expression {
   override def prettyName: String = "map"
 }
 
+/**
+ * Returns a catalyst Map containing the two arrays in children 
expressions as keys and values.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(keys, values) - Creates a map with a pair of the given 
key/value arrays. All elements
+  in keys should not be null""",
+  examples = """
+Examples:
+  > SELECT _FUNC_([1.0, 3.0], ['2', '4']);
+   {1.0:"2",3.0:"4"}
+  """, since = "2.4.0")
+case class CreateMapFromArray(left: Expression, right: Expression)
+extends BinaryExpression with ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, 
ArrayType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (ArrayType(_, cn), ArrayType(_, _)) =>
+if (!cn) {
+  TypeCheckResult.TypeCheckSuccess
+} else {
+  TypeCheckResult.TypeCheckFailure("All of the given keys should 
be non-null")
+}
+  case _ =>
+TypeCheckResult.TypeCheckFailure("The given two arguments should 
be an array")
+}
+  }
+
+  override def dataType: DataType = {
+MapType(
+  keyType = left.dataType.asInstanceOf[ArrayType].elementType,
+  valueType = right.dataType.asInstanceOf[ArrayType].elementType,
+  valueContainsNull = 
left.dataType.asInstanceOf[ArrayType].containsNull)
+  }
+
+  override def nullable: Boolean = false
+
+  override def nullSafeEval(keyArray: Any, valueArray: Any): Any = {
+val keyArrayData = keyArray.asInstanceOf[ArrayData]
--- End diff --

I would like to err on the safe side here. `CreateMap` should be fixed IMO.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function

2018-08-07 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22013#discussion_r208136330
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -365,3 +365,69 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Transform Keys in a map using the transform_keys function.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, func) - Transforms elements in a map using the 
function.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1);
+   map(array(2, 3, 4), array(1, 2, 3))
+  > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k, v) -> k + v);
+   map(array(2, 4, 6), array(1, 2, 3))
+  """,
+  since = "2.4.0")
+case class TransformKeys(
+input: Expression,
+function: Expression)
+  extends ArrayBasedHigherOrderFunction with CodegenFallback {
+
+  override def nullable: Boolean = input.nullable
+
+  override def dataType: DataType = {
+val valueType = input.dataType.asInstanceOf[MapType].valueType
+MapType(function.dataType, valueType, input.nullable)
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(MapType, 
expectingFunctionType)
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction):
+  TransformKeys = {
+val (keyElementType, valueElementType, containsNull) = input.dataType 
match {
+  case MapType(keyType, valueType, containsNullValue) =>
+(keyType, valueType, containsNullValue)
+  case _ =>
+val MapType(keyType, valueType, containsNullValue) = 
MapType.defaultConcreteType
+(keyType, valueType, containsNullValue)
+}
+copy(function = f(function, (keyElementType, false) :: 
(valueElementType, containsNull) :: Nil))
+  }
+
+  @transient lazy val (keyVar, valueVar) = {
+val LambdaFunction(
+_, (keyVar: NamedLambdaVariable) :: (valueVar: NamedLambdaVariable) :: 
Nil, _) = function
+(keyVar, valueVar)
+  }
+
+  override def eval(input: InternalRow): Any = {
+val arr = this.input.eval(input).asInstanceOf[MapData]
+if (arr == null) {
+  null
+} else {
+  val f = functionForEval
+  val resultKeys = new GenericArrayData(new 
Array[Any](arr.numElements))
+  var i = 0
+  while (i < arr.numElements) {
+keyVar.value.set(arr.keyArray().get(i, keyVar.dataType))
+valueVar.value.set(arr.valueArray().get(i, valueVar.dataType))
+resultKeys.update(i, f.eval(input))
--- End diff --

This assumes that the transformation will return a unique key right? If it 
doesn't you'll break the map semantics. For example: `map_key(some_map, (k, v) 
-> 0)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22013: [SPARK-23939][SQL] Add transform_keys function

2018-08-06 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/22013
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22012: [SPARK-25036][SQL] Should compare ExprValue.isNull with ...

2018-08-06 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/22012
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21986: [SPARK-23937][SQL] Add map_filter SQL function

2018-08-06 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/21986#discussion_r207954320
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -123,7 +125,10 @@ trait HigherOrderFunction extends Expression {
   }
 }
 
-trait ArrayBasedHigherOrderFunction extends HigherOrderFunction with 
ExpectsInputTypes {
+/**
+ * Trait for functions having as input one argument and one function.
+ */
+trait UnaryHigherOrderFunction extends HigherOrderFunction with 
ExpectsInputTypes {
--- End diff --

We use the term `Unary` a lot and this is different from the other uses. 
The name should convey a HigherOrderFunction that only uses a single (lambda) 
function right? The only thing I can come up with is 
`SingleHigherOrderFunction`. `Simple` would probably also be fine.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21982: [SPARK-23911][SQL] Add aggregate function.

2018-08-03 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/21982
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21965: [SPARK-23909][SQL] Add filter function.

2018-08-03 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/21965#discussion_r207480086
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -210,3 +219,54 @@ case class ArrayTransform(
 
   override def prettyName: String = "transform"
 }
+
+/**
+ * Filters the input array using the given lambda function.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, func) - Filters the input array using the given 
predicate.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), x -> x % 2 == 1);
+   array(1, 3)
+  """,
+  since = "2.4.0")
+case class ArrayFilter(
+input: Expression,
+function: Expression)
+  extends ArrayBasedHigherOrderFunction with CodegenFallback {
+
+  override def nullable: Boolean = input.nullable
+
+  override def dataType: DataType = input.dataType
+
+  override def expectingFunctionType: AbstractDataType = BooleanType
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): ArrayFilter = {
+val elem = 
ArrayBasedHigherOrderFunction.elementArgumentType(input.dataType)
+copy(function = f(function, elem :: Nil))
+  }
+
+  @transient lazy val LambdaFunction(_, Seq(elementVar: 
NamedLambdaVariable), _) = function
+
+  override def eval(input: InternalRow): Any = {
+val arr = this.input.eval(input).asInstanceOf[ArrayData]
+if (arr == null) {
+  null
+} else {
+  val f = functionForEval
+  val buffer = new mutable.ArrayBuffer[Any]
+  var i = 0
+  while (i < arr.numElements) {
+elementVar.value.set(arr.get(i, elementVar.dataType))
+if (f.eval(input).asInstanceOf[Boolean]) {
+  buffer += elementVar.value.get
+}
+i += 1
+  }
+  new GenericArrayData(buffer)
+}
+  }
+
+  override def prettyName: String = "filter"
--- End diff --

Is filter too generic? wdyt?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21965: [SPARK-23909][SQL] Add filter function.

2018-08-03 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/21965#discussion_r207479758
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -210,3 +219,54 @@ case class ArrayTransform(
 
   override def prettyName: String = "transform"
 }
+
+/**
+ * Filters the input array using the given lambda function.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, func) - Filters the input array using the given 
predicate.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), x -> x % 2 == 1);
+   array(1, 3)
+  """,
+  since = "2.4.0")
+case class ArrayFilter(
+input: Expression,
+function: Expression)
+  extends ArrayBasedHigherOrderFunction with CodegenFallback {
+
+  override def nullable: Boolean = input.nullable
+
+  override def dataType: DataType = input.dataType
+
+  override def expectingFunctionType: AbstractDataType = BooleanType
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): ArrayFilter = {
+val elem = 
ArrayBasedHigherOrderFunction.elementArgumentType(input.dataType)
+copy(function = f(function, elem :: Nil))
+  }
+
+  @transient lazy val LambdaFunction(_, Seq(elementVar: 
NamedLambdaVariable), _) = function
+
+  override def eval(input: InternalRow): Any = {
+val arr = this.input.eval(input).asInstanceOf[ArrayData]
+if (arr == null) {
+  null
+} else {
+  val f = functionForEval
+  val buffer = new mutable.ArrayBuffer[Any]
--- End diff --

I am wondering if we should use the buffer builder with a size hint here? 
Or, alternatively manage the array ourself.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-02 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/21954#discussion_r207171941
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+name: String,
+dataType: DataType,
+nullable: Boolean,
+value: AtomicReference[Any] = new AtomicReference(),
--- End diff --

Yeah, that makes sense. Let's leave it for now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-02 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/21954#discussion_r207158636
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+name: String,
+dataType: DataType,
+nullable: Boolean,
+value: AtomicReference[Any] = new AtomicReference(),
--- End diff --

You did? Could you elaborate? There shouldn't be any current access here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-02 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/21954#discussion_r207145478
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+name: String,
+dataType: DataType,
+nullable: Boolean,
+value: AtomicReference[Any] = new AtomicReference(),
--- End diff --

You are only using the `AtomicReference ` as an container right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21954: [SPARK-23908][SQL] Add transform function.

2018-08-02 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/21954#discussion_r207141916
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import java.util.concurrent.atomic.AtomicReference
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types._
+
+/**
+ * A named lambda variable.
+ */
+case class NamedLambdaVariable(
+name: String,
+dataType: DataType,
+nullable: Boolean,
+value: AtomicReference[Any] = new AtomicReference(),
+exprId: ExprId = NamedExpression.newExprId)
+  extends LeafExpression
+  with NamedExpression
+  with CodegenFallback {
+
+  override def qualifier: Option[String] = None
+
+  override def newInstance(): NamedExpression =
+copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
+
+  override def toAttribute: Attribute = {
+AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, 
None)
+  }
+
+  override def eval(input: InternalRow): Any = value.get
+
+  override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
+
+  override def simpleString: String = s"lambda $name#${exprId.id}: 
${dataType.simpleString}"
+}
+
+/**
+ * A lambda function and its arguments. A lambda function can be hidden 
when a user wants to
+ * process an completely independent expression in a 
[[HigherOrderFunction]], the lambda function
+ * and its variables are then only used for internal bookkeeping within 
the higher order function.
+ */
+case class LambdaFunction(
+function: Expression,
+arguments: Seq[NamedExpression],
+hidden: Boolean = false)
+  extends Expression with CodegenFallback {
+
+  override def children: Seq[Expression] = function +: arguments
+  override def dataType: DataType = function.dataType
+  override def nullable: Boolean = function.nullable
+
+  lazy val bound: Boolean = arguments.forall(_.resolved)
+
+  override def eval(input: InternalRow): Any = function.eval(input)
+}
+
+/**
+ * A higher order function takes one or more (lambda) functions and 
applies these to some objects.
+ * The function produces a number of variables which can be consumed by 
some lambda function.
+ */
+trait HigherOrderFunction extends Expression {
+
+  override def children: Seq[Expression] = inputs ++ functions
+
+  /**
+   * Inputs to the higher ordered function.
+   */
+  def inputs: Seq[Expression]
+
+  /**
+   * All inputs have been resolved. This means that the types and 
nullabilty of (most of) the
+   * lambda function arguments is known, and that we can start binding the 
lambda functions.
+   */
+  lazy val inputResolved: Boolean = inputs.forall(_.resolved)
+
+  /**
+   * Functions applied by the higher order function.
+   */
+  def functions: Seq[Expression]
+
+  /**
+   * All inputs must be resolved and all functions must be resolved lambda 
functions.
+   */
+  override lazy val resolved: Boolean = inputResolved && functions.forall {
+case l: LambdaFunction => l.resolved
+case _ => false
+  }
+
+  /**
+   * Bind the lambda functions to the [[HigherOrderFunction]] using the 
given bind function. The
+   * bind function takes the potential lambda and it's (partial) arguments 
and converts this into
+   * a bound lambda function.
+   */
+  def bind(f: (Expression, Seq[(Da

[GitHub] spark issue #21930: [SPARK-14540][Core] Fix remaining major issues for Scala...

2018-07-31 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/21930
  
Yeah I would not worry about it


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21930: [SPARK-14540][Core] Fix remaining major issues fo...

2018-07-31 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/21930#discussion_r206540368
  
--- Diff: core/src/main/scala/org/apache/spark/TaskContext.scala ---
@@ -123,7 +123,7 @@ abstract class TaskContext extends Serializable {
*
* Exceptions thrown by the listener will result in failure of the task.
*/
-  def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext = {
+  def addTaskCompletionListener[U](f: (TaskContext) => U): TaskContext = {
--- End diff --

Do we need to change this? I don't think it is a problem binary 
compatibility wise, but it seems a but weird since we don't use the result of 
the function.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-29 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205967625
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3968,3 +3964,242 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike
+with ComplexTypeMergingExpression {
+  override def dataType: DataType = {
+dataTypeCheck
+left.dataType
+  }
+
+  @transient lazy val evalExcept: (ArrayData, ArrayData) => ArrayData = {
+if (elementTypeSupportEquals) {
+  (array1, array2) =>
+val hs = new OpenHashSet[Any]
--- End diff --

I would be strongly in favor of just using `MemoryBlock` for binary types, 
or something similar.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21897: [minor] Improve documentation for HiveStringType's

2018-07-27 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/21897
  
@rxin no, they should not have been public. IMO we should just hide them 
for 3.0.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21897: [minor] Improve documentation for HiveStringType's

2018-07-27 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/21897
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21821: [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWrit...

2018-07-25 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/21821
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21840: [WIP] New copy() method for Column of StructType

2018-07-23 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/21840#discussion_r204476440
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3858,3 +3858,29 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+case class StructCopy(
+struct: Expression,
+fieldName: String,
+fieldValue: Expression) extends Expression with CodegenFallback {
+
+  override def children: Seq[Expression] = Seq(struct, fieldValue)
+  override def nullable: Boolean = struct.nullable
+
+  lazy val fieldIndex = 
struct.dataType.asInstanceOf[StructType].fieldIndex(fieldName)
+
+  override def dataType: DataType = {
+val structType = struct.dataType.asInstanceOf[StructType]
+val field = structType.fields(fieldIndex).copy(dataType = 
fieldValue.dataType)
+
+structType.copy(fields = structType.fields.updated(fieldIndex, field))
+  }
+
+  override def eval(input: InternalRow): Any = {
+val newFieldValue = fieldValue.eval(input)
+val structValue = struct.eval(input).asInstanceOf[GenericInternalRow]
--- End diff --

You cannot assume this and you also cannot update the row in-place. You 
will need to copy the row I am affraid.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21821: [SPARK-24867] [SQL] Add AnalysisBarrier to DataFrameWrit...

2018-07-23 Thread hvanhovell
Github user hvanhovell commented on the issue:

https://github.com/apache/spark/pull/21821
  
@gatorsmile do we still need this patch if maryann fixes this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   8   9   10   >