[GitHub] spark pull request #22758: [SPARK-25332][SQL] Instead of broadcast hash join...

2018-10-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22758#discussion_r226151421
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
@@ -193,6 +193,16 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
   None)
 val logicalRelation = cached.getOrElse {
   val updatedTable = inferIfNeeded(relation, options, fileFormat)
+  // Intialize the catalogTable stats if its not defined.An intial 
value has to be defined
--- End diff --

I don't quite understand why table must have stats. For both file sources 
and hive tables, we will estimate the data size with files, if the table 
doesn't have stats.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22758: [SPARK-25332][SQL] Instead of broadcast hash join...

2018-10-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22758#discussion_r226150341
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala ---
@@ -1051,7 +1051,8 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
 
   test("test statistics of LogicalRelation converted from Hive serde 
tables") {
 Seq("orc", "parquet").foreach { format =>
-  Seq(true, false).foreach { isConverted =>
+  // Botth parquet and orc will have Hivestatistics, both are 
convertable to Logical Relation.
+  Seq(true, true).foreach { isConverted =>
--- End diff --

This is to test when the conversion is on and off. We shouldn't change it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22745: [SPARK-21402][SQL][FOLLOW-UP] Fix java map of str...

2018-10-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22745#discussion_r226149644
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
 ---
@@ -509,3 +509,24 @@ case class UnresolvedOrdinal(ordinal: Int)
   override def nullable: Boolean = throw new UnresolvedException(this, 
"nullable")
   override lazy val resolved = false
 }
+
+/**
+ * When constructing `Invoke`, the data type must be given, which may be 
not possible to define
+ * before analysis. This class acts like a placeholder for `Invoke`, and 
will be replaced by
+ * `Invoke` during analysis after the input data is resolved. Data type 
passed to `Invoke``
+ * will be defined by applying `dataTypeFunction` to the data type of the 
input data.
+ */
+case class UnresolvedInvoke(
--- End diff --

I feel this is too general. Maybe we should just create a new expression 
`GetArrayFromMap` and resolve it to `Invoke` later.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22309: [SPARK-20384][SQL] Support value class in schema of Data...

2018-10-17 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22309
  
what's still missing to support top level value class?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22309: [SPARK-20384][SQL] Support value class in schema ...

2018-10-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22309#discussion_r226149168
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala 
---
@@ -632,13 +667,17 @@ object ScalaReflection extends ScalaReflection {
   "cannot be used as field name\n" + 
walkedTypePath.mkString("\n"))
   }
 
+  // as a field, value class is represented by its underlying type
+  val trueFieldType =
+if (isValueClass(fieldType)) getUnderlyingTypeOf(fieldType) 
else fieldType
--- End diff --

shall we update `dataTypeFor` to handle value class, instead of special 
handling it here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22309: [SPARK-20384][SQL] Support value class in schema ...

2018-10-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22309#discussion_r226148985
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala 
---
@@ -376,6 +386,23 @@ object ScalaReflection extends ScalaReflection {
   dataType = ObjectType(udt.getClass))
 Invoke(obj, "deserialize", ObjectType(udt.userClass), getPath :: 
Nil)
 
+  case t if isValueClass(t) =>
+// nested value class is treated as its underlying type
+// top level value class must be treated as a product
+val underlyingType = getUnderlyingTypeOf(t)
+val underlyingClsName = getClassNameFromType(underlyingType)
+val clsName = getUnerasedClassNameFromType(t)
+val newTypePath = s"""- Scala value class: 
$clsName($underlyingClsName)""" +:
--- End diff --

do you have an example of this message?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22309: [SPARK-20384][SQL] Support value class in schema ...

2018-10-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22309#discussion_r226148892
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala 
---
@@ -376,6 +386,23 @@ object ScalaReflection extends ScalaReflection {
   dataType = ObjectType(udt.getClass))
 Invoke(obj, "deserialize", ObjectType(udt.userClass), getPath :: 
Nil)
 
+  case t if isValueClass(t) =>
+// nested value class is treated as its underlying type
+// top level value class must be treated as a product
--- End diff --

This is not true, top level value class can be a single primitive type 
column


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...

2018-10-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22732#discussion_r226143221
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala ---
@@ -393,4 +393,30 @@ class UDFSuite extends QueryTest with SharedSQLContext 
{
   checkAnswer(df, Seq(Row("12"), Row("24"), Row("3null"), Row(null)))
 }
   }
+
+  test("SPARK-25044 Verify null input handling for primitive types - with 
udf()") {
+val udf1 = udf({(x: Long, y: Any) => x * 2 + (if (y == null) 1 else 
0)})
+val df = spark.range(0, 3).toDF("a")
+  .withColumn("b", udf1($"a", lit(null)))
+  .withColumn("c", udf1(lit(null), $"a"))
+
+checkAnswer(
+  df,
+  Seq(
+Row(0, 1, null),
+Row(1, 3, null),
+Row(2, 5, null)))
+  }
+
+  test("SPARK-25044 Verify null input handling for primitive types - with 
udf.register") {
+withTable("t") {
--- End diff --

we can use temp view
```
withTempView("v") {
  Seq((null, 1, "x"), ("N", null, "y"), ("N", 3, null)).toDF("a", "b", 
"c").createTempView("v")
  ...
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...

2018-10-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22732#discussion_r226142651
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
 ---
@@ -73,19 +73,21 @@ case class UserDefinedFunction protected[sql] (
*/
   @scala.annotation.varargs
   def apply(exprs: Column*): Column = {
-if (inputTypes.isDefined && nullableTypes.isDefined) {
-  require(inputTypes.get.length == nullableTypes.get.length)
+val numOfArgs = ScalaReflection.getParameterCount(f)
--- End diff --

since we are here, shall we also check `exprs.length == numOfArgs`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...

2018-10-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22732#discussion_r226141983
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
 ---
@@ -39,29 +42,29 @@ import org.apache.spark.sql.types.DataType
  * @param nullable  True if the UDF can return null value.
  * @param udfDeterministic  True if the UDF is deterministic. 
Deterministic UDF returns same result
  *  each time it is invoked with a particular 
input.
- * @param nullableTypes which of the inputTypes are nullable (i.e. not 
primitive)
  */
 case class ScalaUDF(
 function: AnyRef,
 dataType: DataType,
 children: Seq[Expression],
+inputsNullSafe: Seq[Boolean],
 inputTypes: Seq[DataType] = Nil,
 udfName: Option[String] = None,
 nullable: Boolean = true,
-udfDeterministic: Boolean = true,
-nullableTypes: Seq[Boolean] = Nil)
+udfDeterministic: Boolean = true)
   extends Expression with ImplicitCastInputTypes with NonSQLExpression 
with UserDefinedExpression {
 
   // The constructor for SPARK 2.1 and 2.2
   def this(
   function: AnyRef,
   dataType: DataType,
   children: Seq[Expression],
+  inputsNullSafe: Seq[Boolean],
--- End diff --

this constructor is here for backward compatibility. If we have to change 
it, I don't think we need to keep it anymore. cc @gatorsmile 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22756: [SPARK-25758][ML] Deprecate computeCost on Bisect...

2018-10-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22756#discussion_r226141315
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala ---
@@ -125,8 +125,13 @@ class BisectingKMeansModel private[ml] (
   /**
* Computes the sum of squared distances between the input points and 
their corresponding cluster
* centers.
+   *
+   * @deprecated This method is deprecated and will be removed in 3.0.0. 
Use ClusteringEvaluator
+   * instead. You can also get the cost on the training 
dataset in the summary.
*/
   @Since("2.0.0")
+  @deprecated("This method is deprecated and will be removed in 3.0.0. Use 
ClusteringEvaluator " +
--- End diff --

It looks reasonable to me to deprecate it in 2.4 so that we can remove it 
in 3.0, if this is the last one. Then we can have a consistent ML API in 3.0 
after removing these deprecated APIs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22721: [SPARK-25403][SQL] Refreshes the table after inserting t...

2018-10-17 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22721
  
what's the impact to end users? wrong statistics?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Support par...

2018-10-17 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/20999
  
It seems like Hive can drop partitions directly with a partition 
expression: 
https://github.com/apache/hive/blob/master/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java#L1016

can you double check this part?




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...

2018-10-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22732#discussion_r225952267
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
 ---
@@ -39,29 +40,29 @@ import org.apache.spark.sql.types.DataType
  * @param nullable  True if the UDF can return null value.
  * @param udfDeterministic  True if the UDF is deterministic. 
Deterministic UDF returns same result
  *  each time it is invoked with a particular 
input.
- * @param nullableTypes which of the inputTypes are nullable (i.e. not 
primitive)
  */
 case class ScalaUDF(
 function: AnyRef,
 dataType: DataType,
 children: Seq[Expression],
+handleNullForInputs: Seq[Boolean],
 inputTypes: Seq[DataType] = Nil,
 udfName: Option[String] = None,
 nullable: Boolean = true,
-udfDeterministic: Boolean = true,
-nullableTypes: Seq[Boolean] = Nil)
+udfDeterministic: Boolean = true)
   extends Expression with ImplicitCastInputTypes with NonSQLExpression 
with UserDefinedExpression {
 
   // The constructor for SPARK 2.1 and 2.2
   def this(
   function: AnyRef,
   dataType: DataType,
   children: Seq[Expression],
+  handleNullForInputs: Seq[Boolean],
--- End diff --

I think we should just remove this constructor. It's weird to keep backward 
compatibility for a private class, and I don't think it can work anymore. It's 
not OK to omit the nullable info.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF constructor sig...

2018-10-17 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22732
  
After more thoughts, there is one case we don't handle well. For UDFs like 
`(a: Any, i: Int) => xxx`, previously we can still get the int type info at 
runtime, and add null check for it. Now we can't get type info at compile time, 
because we do something like `Try(ScalaReflection.schemaFor[A1] :: 
ScalaReflection.schemaFor[A2] :: Nil).toOption`. When we fail to get type info 
of `Any`, we just give up and don't try the next input.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22708: [SPARK-21402][SQL] Fix java array of structs deserializa...

2018-10-17 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22708
  
thanks, merging to master/2.4!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22674: [SPARK-25680][SQL] SQL execution listener shouldn't happ...

2018-10-17 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22674
  
since there is no objection, I'm merging it to master, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22750: [SPARK-25747][SQL] remove ColumnarBatchScan.needsUnsafeR...

2018-10-17 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22750
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF constructor sig...

2018-10-17 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22732
  
There is an argument about whether #22259 introduced behavior changes. Here 
is my analysis.

Before #22259 , the type and null check was done as
1. user registers UDFs, like `(a: Int) => xxx`, `(b: Any) => xxx`
2. at compile time, get the type info, and set input types, so that the 
analyzer can add cast or fail the query if the real input data type doesn't 
match. Note that, UDFs like `(b: Any) => xxx` has no type info and we won't do 
type check.
3. at runtime, use reflection to get type info again, and add null check if 
an input is primitive type.

After #22259 , the type and null check is done as
1. user registers UDFs, like `(a: Int) => xxx`, `(b: Any) => xxx`
2. at compile time, get the type info, set input types and input nullable, 
so that the analyzer can add cast or fail the query if the real input data type 
doesn't match, and add null check if necessary. Note that, UDFs like `(b: Any) 
=> xxx` has no type info and we won't do type and null check.

So we may have a behavior change if users register UDFs in a weird way. 
e.g. they define `(a: Int) => xxx`, but cast it to `Any => xx` during 
registration. Then we can't get the real type info at compile time.

I'd say this is an invalid use case, because the data type check is also 
lost. I think we don't need to treat it as a behavior change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...

2018-10-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22732#discussion_r225807388
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
 ---
@@ -31,6 +31,9 @@ import org.apache.spark.sql.types.DataType
  *  null. Use boxed type or [[Option]] if you wanna do the 
null-handling yourself.
  * @param dataType  Return type of function.
  * @param children  The input expressions of this UDF.
+ * @param handleNullForInputs Whether the inputs need null-value handling, 
which preserves the null
--- End diff --

`inputsNullSafe` SGTM, let's also rename `UserDefinedFunction.nullableTypes`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...

2018-10-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22732#discussion_r225804995
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala
 ---
@@ -81,11 +81,11 @@ case class UserDefinedFunction protected[sql] (
   f,
   dataType,
   exprs.map(_.expr),
+  nullableTypes.map(_.map(!_)).getOrElse(exprs.map(_ => false)),
--- End diff --

If we can't get the type information at compile time(e.g. `(a: Any) => 
xxx`), I don't think `ScalaReflection.getParameterTypes` can get something 
useful at runtime. For this case we won't add null check anyway, so I think 
there is no behavior change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

2018-10-17 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/21322
  
I'd like to recommend 
http://apache-spark-developers-list.1001551.n3.nabble.com/SPIP-SPARK-25728-Structured-Intermediate-Representation-Tungsten-IR-for-generating-Java-code-td25370.html
 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22746: [SPARK-24499][SQL][DOC] Split the page of sql-programmin...

2018-10-17 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22746
  
This is very cool! thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22746: [SPARK-24499][SQL][DOC] Split the page of sql-pro...

2018-10-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22746#discussion_r225797461
  
--- Diff: docs/_data/menu-sql.yaml ---
@@ -0,0 +1,79 @@
+- text: Getting Started
+  url: sql-getting-started.html
+  subitems:
+- text: "Starting Point: SparkSession"
+  url: sql-getting-started.html#starting-point-sparksession
+- text: Creating DataFrames
+  url: sql-getting-started.html#creating-dataframes
+- text: Untyped Dataset Operations
--- End diff --

how about `Untyped Dataset Operations (DataFrame operations)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22724: [SPARK-25734][SQL] Literal should have a value correspon...

2018-10-16 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22724
  
thanks, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22708: [SPARK-21402][SQL] Fix java array of structs dese...

2018-10-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22708#discussion_r225767103
  
--- Diff: 
sql/core/src/test/java/test/org/apache/spark/sql/JavaBeanWithArraySuite.java ---
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.sql;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.test.TestSparkSession;
+import org.apache.spark.sql.types.ArrayType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+public class JavaBeanWithArraySuite {
+
+private static final List RECORDS = new ArrayList<>();
+
+static {
+RECORDS.add(new Record(1,
+Arrays.asList(new Interval(111, 211), new Interval(121, 
221)),
+Arrays.asList(11, 21, 31, 41)
+));
+RECORDS.add(new Record(2,
+Arrays.asList(new Interval(112, 212), new Interval(122, 
222)),
+Arrays.asList(12, 22, 32, 42)
+));
+RECORDS.add(new Record(3,
+Arrays.asList(new Interval(113, 213), new Interval(123, 
223)),
+Arrays.asList(13, 23, 33, 43)
+));
+}
+
+private TestSparkSession spark;
+
+@Before
+public void setUp() {
+spark = new TestSparkSession();
+}
+
+@After
+public void tearDown() {
+spark.stop();
+spark = null;
+}
+
+@Test
+public void testBeanWithArrayFieldsDeserialization() {
+
+StructType schema = createSchema();
+Encoder encoder = Encoders.bean(Record.class);
+
+Dataset dataset = spark
+.read()
+.format("json")
+.schema(schema)
+.load("src/test/resources/test-data/with-array-fields")
+.as(encoder);
+
+List records = dataset.collectAsList();
+
+Assert.assertTrue(Util.equals(records, RECORDS));
+}
+
+private static StructType createSchema() {
+StructField[] intervalFields = {
+new StructField("startTime", DataTypes.LongType, true, 
Metadata.empty()),
+new StructField("endTime", DataTypes.LongType, true, 
Metadata.empty())
+};
+DataType intervalType = new StructType(intervalFields);
+
+DataType intervalsType = new ArrayType(intervalType, true);
+
+DataType valuesType = new ArrayType(DataTypes.IntegerType, true);
+
+StructField[] fields = {
+new StructField("id", DataTypes.IntegerType, true, 
Metadata.empty()),
+new StructField("intervals", intervalsType, true, 
Metadata.empty()),
+new StructField("values", valuesType, true, 
Metadata.empty())
+};
+return new StructType(fields);
+}
+
+public static class Record {
+
+private int id;
+private List intervals;
+private List values;
+
+public Record() { }
+
+Record(int id, List intervals, List values) {
+this.id = id;
+this.intervals = intervals;
+this.values = values;
+}
+
+public int 

[GitHub] spark issue #22745: [SPARK-21402][SQL][FOLLOW-UP] Fix java map of structs de...

2018-10-16 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22745
  
It's a different issue, I think it worth a new ticket


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22732: [SPARK-25044][FOLLOW-UP] Change ScalaUDF construc...

2018-10-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22732#discussion_r225614430
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
 ---
@@ -31,6 +31,7 @@ import org.apache.spark.sql.types.DataType
  *  null. Use boxed type or [[Option]] if you wanna do the 
null-handling yourself.
  * @param dataType  Return type of function.
  * @param children  The input expressions of this UDF.
+ * @param handleNullForInputs Whether the inputs need null-value handling 
respectively.
--- End diff --

how about `acceptNullInputs`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22750: [SPARK-25747][SQL] remove ColumnarBatchScan.needsUnsafeR...

2018-10-16 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22750
  
cc @gatorsmile @hvanhovell 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22750: [SPARK-25747][SQL] remove ColumnarBatchScan.needs...

2018-10-16 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/22750

[SPARK-25747][SQL] remove ColumnarBatchScan.needsUnsafeRowConversion

## What changes were proposed in this pull request?

`needsUnsafeRowConversion` is used in 2 places:
1. `ColumnarBatchScan.produceRows`
2. `FileSourceScanExec.doExecute`

When we go to `ColumnarBatchScan.produceRows`, it means whole stage codegen 
is on but the vectorized reader is off. The vectorized reader can be off for 
several reasons:
1. the file format doesn't have a vectorized reader(json, csv, etc.)
2. the vectorized reader config is off
3. the schema is not supported

Anyway when the vectorized reader is off, file format reader will always 
return unsafe rows, so `ColumnarBatchScan.needsUnsafeRowConversion` is not 
needed.

When we go to `FileSourceScanExec.doExecute`, it means whole stage codegen 
is off. For this case, we need the `needsUnsafeRowConversion` to convert 
`ColumnarRow` to `UnsafeRow`, if the file format reader returns batch.

This PR removes `ColumnarBatchScan.needsUnsafeRowConversion`, and keep this 
flag only in `FileSourceScanExec`

## How was this patch tested?

existing tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark minor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22750.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22750


commit 27e6b974192596baa86ac8b38b28c56e65e3c184
Author: Wenchen Fan 
Date:   2018-10-16T15:55:06Z

remove ColumnarBatchScan.needsUnsafeRowConversion




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

2018-10-16 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/21322
  
I think a design doc is better, to make sure we are on the same page before 
the actual coding, which saves time.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22708: [SPARK-21402][SQL] Fix java array of structs deserializa...

2018-10-16 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22708
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22708: [SPARK-21402][SQL] Fix java array of structs deserializa...

2018-10-16 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22708
  
cc @dongjoon-hyun here is another instance of the FileBasedDataSourceSuite 
flaky test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

2018-10-16 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/21322
  
Ah i see. It's kind of a broadcast but has a much small scope and its 
lifecycle is bound to a stage. I'm looking forward to a full design of it, 
thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22724: [SPARK-25734][SQL] Literal should have a value correspon...

2018-10-16 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22724
  
yea +1 on 3.0 only, this is kind of a developer API, advanced users may use 
it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-10-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r225522164
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala ---
@@ -31,48 +30,7 @@ import org.apache.spark.sql.types._
 @BeanInfo
 private[sql] case class MyLabeledPoint(
   @BeanProperty label: Double,
-  @BeanProperty features: UDT.MyDenseVector)
-
-// Wrapped in an object to check Scala compatibility. See SPARK-13929
-object UDT {
--- End diff --

why do we change it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-10-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r225521786
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, 
GenericArrayData, MapData}
+import org.apache.spark.sql.types._
+
+
+/**
+ * An interpreted version of a safe projection.
+ *
+ * @param expressions that produces the resulting fields. These 
expressions must be bound
+ *to a schema.
+ */
+class InterpretedSafeProjection(expressions: Seq[Expression]) extends 
Projection {
+
+  private[this] val mutableRow = new 
SpecificInternalRow(expressions.map(_.dataType))
+
+  private[this] val exprsWithWriters = expressions.zipWithIndex.filter {
+case (NoOp, _) => false
+case _ => true
+  }.map { case (e, i) =>
+val converter = generateSafeValueConverter(e.dataType)
+val writer = generateRowWriter(i, e.dataType)
+val f = if (!e.nullable) {
+  (v: Any) => writer(converter(v))
+} else {
+  (v: Any) => {
+if (v == null) {
+  mutableRow.setNullAt(i)
+} else {
+  writer(converter(v))
+}
+  }
+}
+(e, f)
+  }
+
+  private def isPrimitive(dataType: DataType): Boolean = dataType match {
+case BooleanType => true
+case ByteType => true
+case ShortType => true
+case IntegerType => true
+case LongType => true
+case FloatType => true
+case DoubleType => true
+case _ => false
+  }
+
+  private def generateSafeValueConverter(dt: DataType): Any => Any = dt 
match {
+case ArrayType(elemType, _) =>
+  if (isPrimitive(elemType)) {
+v => {
+  val arrayValue = v.asInstanceOf[ArrayData]
+  new GenericArrayData(arrayValue.toArray[Any](elemType))
+}
+  } else {
+val elementConverter = generateSafeValueConverter(elemType)
+v => {
+  val arrayValue = v.asInstanceOf[ArrayData]
+  val result = new Array[Any](arrayValue.numElements())
+  arrayValue.foreach(elemType, (i, e) => {
+result(i) = elementConverter(e)
+  })
+  new GenericArrayData(result)
+}
+  }
+
+case st: StructType =>
+  val fieldTypes = st.fields.map(_.dataType)
+  val fieldConverters = fieldTypes.map(generateSafeValueConverter)
+  v => {
+val row = v.asInstanceOf[InternalRow]
+val ar = new Array[Any](row.numFields)
+var idx = 0
+while (idx < row.numFields) {
+  ar(idx) = fieldConverters(idx)(row.get(idx, fieldTypes(idx)))
+  idx += 1
+}
+new GenericInternalRow(ar)
+  }
+
+case MapType(keyType, valueType, _) =>
+  lazy val keyConverter = generateSafeValueConverter(keyType)
+  lazy val valueConverter = generateSafeValueConverter(valueType)
+  v => {
+val mapValue = v.asInstanceOf[MapData]
+val keys = mapValue.keyArray().toArray[Any](keyType)
+val values = mapValue.valueArray().toArray[Any](valueType)
+val convertedKeys =
+  if (isPrimitive(keyType)) keys else keys.map(keyConverter)
+val convertedValues =
+  if (isPrimitive(valueType)) values else 
values.map(valueConverter)
+
+ArrayBasedMapData(convertedKeys, convertedValues)
+   

[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-10-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r225521397
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, 
GenericArrayData, MapData}
+import org.apache.spark.sql.types._
+
+
+/**
+ * An interpreted version of a safe projection.
+ *
+ * @param expressions that produces the resulting fields. These 
expressions must be bound
+ *to a schema.
+ */
+class InterpretedSafeProjection(expressions: Seq[Expression]) extends 
Projection {
+
+  private[this] val mutableRow = new 
SpecificInternalRow(expressions.map(_.dataType))
+
+  private[this] val exprsWithWriters = expressions.zipWithIndex.filter {
+case (NoOp, _) => false
+case _ => true
+  }.map { case (e, i) =>
+val converter = generateSafeValueConverter(e.dataType)
+val writer = generateRowWriter(i, e.dataType)
+val f = if (!e.nullable) {
+  (v: Any) => writer(converter(v))
+} else {
+  (v: Any) => {
+if (v == null) {
+  mutableRow.setNullAt(i)
+} else {
+  writer(converter(v))
+}
+  }
+}
+(e, f)
+  }
+
+  private def isPrimitive(dataType: DataType): Boolean = dataType match {
+case BooleanType => true
+case ByteType => true
+case ShortType => true
+case IntegerType => true
+case LongType => true
+case FloatType => true
+case DoubleType => true
+case _ => false
+  }
+
+  private def generateSafeValueConverter(dt: DataType): Any => Any = dt 
match {
+case ArrayType(elemType, _) =>
+  if (isPrimitive(elemType)) {
+v => {
+  val arrayValue = v.asInstanceOf[ArrayData]
+  new GenericArrayData(arrayValue.toArray[Any](elemType))
+}
+  } else {
+val elementConverter = generateSafeValueConverter(elemType)
+v => {
+  val arrayValue = v.asInstanceOf[ArrayData]
+  val result = new Array[Any](arrayValue.numElements())
+  arrayValue.foreach(elemType, (i, e) => {
+result(i) = elementConverter(e)
+  })
+  new GenericArrayData(result)
+}
+  }
+
+case st: StructType =>
+  val fieldTypes = st.fields.map(_.dataType)
+  val fieldConverters = fieldTypes.map(generateSafeValueConverter)
+  v => {
+val row = v.asInstanceOf[InternalRow]
+val ar = new Array[Any](row.numFields)
+var idx = 0
+while (idx < row.numFields) {
+  ar(idx) = fieldConverters(idx)(row.get(idx, fieldTypes(idx)))
+  idx += 1
+}
+new GenericInternalRow(ar)
+  }
+
+case MapType(keyType, valueType, _) =>
+  lazy val keyConverter = generateSafeValueConverter(keyType)
+  lazy val valueConverter = generateSafeValueConverter(valueType)
+  v => {
+val mapValue = v.asInstanceOf[MapData]
+val keys = mapValue.keyArray().toArray[Any](keyType)
+val values = mapValue.valueArray().toArray[Any](valueType)
+val convertedKeys =
+  if (isPrimitive(keyType)) keys else keys.map(keyConverter)
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-10-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r225521206
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, 
GenericArrayData, MapData}
+import org.apache.spark.sql.types._
+
+
+/**
+ * An interpreted version of a safe projection.
+ *
+ * @param expressions that produces the resulting fields. These 
expressions must be bound
+ *to a schema.
+ */
+class InterpretedSafeProjection(expressions: Seq[Expression]) extends 
Projection {
+
+  private[this] val mutableRow = new 
SpecificInternalRow(expressions.map(_.dataType))
+
+  private[this] val exprsWithWriters = expressions.zipWithIndex.filter {
+case (NoOp, _) => false
+case _ => true
+  }.map { case (e, i) =>
+val converter = generateSafeValueConverter(e.dataType)
+val writer = generateRowWriter(i, e.dataType)
+val f = if (!e.nullable) {
+  (v: Any) => writer(converter(v))
+} else {
+  (v: Any) => {
+if (v == null) {
+  mutableRow.setNullAt(i)
+} else {
+  writer(converter(v))
+}
+  }
+}
+(e, f)
+  }
+
+  private def isPrimitive(dataType: DataType): Boolean = dataType match {
+case BooleanType => true
+case ByteType => true
+case ShortType => true
+case IntegerType => true
+case LongType => true
+case FloatType => true
+case DoubleType => true
+case _ => false
+  }
+
+  private def generateSafeValueConverter(dt: DataType): Any => Any = dt 
match {
+case ArrayType(elemType, _) =>
+  if (isPrimitive(elemType)) {
--- End diff --

Let's not add this optimization at the beginning. We can add it later with 
a benchmark.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-10-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r225520812
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, 
GenericArrayData, MapData}
+import org.apache.spark.sql.types._
+
+
+/**
+ * An interpreted version of a safe projection.
+ *
+ * @param expressions that produces the resulting fields. These 
expressions must be bound
+ *to a schema.
+ */
+class InterpretedSafeProjection(expressions: Seq[Expression]) extends 
Projection {
+
+  private[this] val mutableRow = new 
SpecificInternalRow(expressions.map(_.dataType))
+
+  private[this] val exprsWithWriters = expressions.zipWithIndex.filter {
+case (NoOp, _) => false
+case _ => true
+  }.map { case (e, i) =>
+val converter = generateSafeValueConverter(e.dataType)
+val writer = generateRowWriter(i, e.dataType)
+val f = if (!e.nullable) {
+  (v: Any) => writer(converter(v))
+} else {
+  (v: Any) => {
+if (v == null) {
+  mutableRow.setNullAt(i)
+} else {
+  writer(converter(v))
+}
+  }
+}
+(e, f)
+  }
+
+  private def isPrimitive(dataType: DataType): Boolean = dataType match {
--- End diff --

`CodeGenerator.isPrimitiveType`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-10-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r225520290
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, 
GenericArrayData, MapData}
+import org.apache.spark.sql.types._
+
+
+/**
+ * An interpreted version of a safe projection.
+ *
+ * @param expressions that produces the resulting fields. These 
expressions must be bound
+ *to a schema.
+ */
+class InterpretedSafeProjection(expressions: Seq[Expression]) extends 
Projection {
+
+  private[this] val mutableRow = new 
SpecificInternalRow(expressions.map(_.dataType))
+
+  private[this] val exprsWithWriters = expressions.zipWithIndex.filter {
+case (NoOp, _) => false
--- End diff --

does `SafeProjection` need to handle `NoOp`? It's only used with 
`MutableProjection` in aggregate.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22724: [SPARK-25734][SQL] Literal should have a value correspon...

2018-10-16 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22724
  
`Literal` is an internal API, and AFAIK end-users can't construct an 
invalid `Literal` with public APIs. If they can, then it's a bug and we have a 
problem...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22708: [SPARK-21402][SQL] Fix java array of structs deserializa...

2018-10-16 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22708
  
lgtm


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22708: [SPARK-21402][SQL] Fix java array of structs deserializa...

2018-10-16 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22708
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22728: [SPARK-25736][SQL][TEST] add tests to verify the behavio...

2018-10-16 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22728
  
FYI, I tried both hive and presto, neither of them supports multi-column 
count.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22597: [SPARK-25579][SQL] Use quoted attribute names if needed ...

2018-10-16 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22597
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22724: [SPARK-25734][SQL] Literal should have a value correspon...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22724
  
LGTM pending jenkins


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22724: [SPARK-25734][SQL] Literal should have a value co...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22724#discussion_r225396907
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
 ---
@@ -196,6 +197,48 @@ object Literal {
 case other =>
   throw new RuntimeException(s"no default for type $dataType")
   }
+
+  private[expressions] def validateLiteralValue(value: Any, dataType: 
DataType): Unit = {
+def doValidate(v: Any, dataType: DataType): Boolean = dataType match {
+  case BooleanType => v.isInstanceOf[Boolean]
+  case ByteType => v.isInstanceOf[Byte]
+  case ShortType => v.isInstanceOf[Short]
+  case IntegerType | DateType => v.isInstanceOf[Int]
+  case LongType | TimestampType => v.isInstanceOf[Long]
+  case FloatType => v.isInstanceOf[Float]
+  case DoubleType => v.isInstanceOf[Double]
+  case _: DecimalType => v.isInstanceOf[Decimal]
+  case CalendarIntervalType => v.isInstanceOf[CalendarInterval]
+  case BinaryType => v.isInstanceOf[Array[Byte]]
+  case StringType => v.isInstanceOf[UTF8String]
+  case st: StructType =>
+v.isInstanceOf[InternalRow] && {
+  val row = v.asInstanceOf[InternalRow]
+  st.fields.map(_.dataType).zipWithIndex.forall {
+case (dt, i) => doValidate(row.get(i, dt), dt)
+  }
+}
+  case at: ArrayType =>
+v.isInstanceOf[GenericArrayData] && {
+  val ar = v.asInstanceOf[GenericArrayData].array
+  ar.isEmpty || doValidate(ar.head, at.elementType)
+}
+  case mt: MapType =>
+v.isInstanceOf[ArrayBasedMapData] && {
+  val map = v.asInstanceOf[ArrayBasedMapData]
+  map.numElements() == 0 || {
--- End diff --

yup


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22379
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22724: [SPARK-25734][SQL] Literal should have a value co...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22724#discussion_r225392951
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
 ---
@@ -196,6 +197,48 @@ object Literal {
 case other =>
   throw new RuntimeException(s"no default for type $dataType")
   }
+
+  private[expressions] def validateLiteralValue(value: Any, dataType: 
DataType): Unit = {
+def doValidate(v: Any, dataType: DataType): Boolean = dataType match {
+  case BooleanType => v.isInstanceOf[Boolean]
+  case ByteType => v.isInstanceOf[Byte]
+  case ShortType => v.isInstanceOf[Short]
+  case IntegerType | DateType => v.isInstanceOf[Int]
+  case LongType | TimestampType => v.isInstanceOf[Long]
+  case FloatType => v.isInstanceOf[Float]
+  case DoubleType => v.isInstanceOf[Double]
+  case _: DecimalType => v.isInstanceOf[Decimal]
+  case CalendarIntervalType => v.isInstanceOf[CalendarInterval]
+  case BinaryType => v.isInstanceOf[Array[Byte]]
+  case StringType => v.isInstanceOf[UTF8String]
+  case st: StructType =>
+v.isInstanceOf[InternalRow] && {
+  val row = v.asInstanceOf[InternalRow]
+  st.fields.map(_.dataType).zipWithIndex.forall {
+case (dt, i) => doValidate(row.get(i, dt), dt)
+  }
+}
+  case at: ArrayType =>
+v.isInstanceOf[GenericArrayData] && {
+  val ar = v.asInstanceOf[GenericArrayData].array
+  ar.isEmpty || doValidate(ar.head, at.elementType)
+}
+  case mt: MapType =>
+v.isInstanceOf[ArrayBasedMapData] && {
+  val map = v.asInstanceOf[ArrayBasedMapData]
+  map.numElements() == 0 || {
--- End diff --

we don't need this. The array validation already consider numElements


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22724: [SPARK-25734][SQL] Literal should have a value co...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22724#discussion_r225392843
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
 ---
@@ -196,6 +197,48 @@ object Literal {
 case other =>
   throw new RuntimeException(s"no default for type $dataType")
   }
+
+  private[expressions] def validateLiteralValue(value: Any, dataType: 
DataType): Unit = {
+def doValidate(v: Any, dataType: DataType): Boolean = dataType match {
+  case BooleanType => v.isInstanceOf[Boolean]
+  case ByteType => v.isInstanceOf[Byte]
+  case ShortType => v.isInstanceOf[Short]
+  case IntegerType | DateType => v.isInstanceOf[Int]
+  case LongType | TimestampType => v.isInstanceOf[Long]
+  case FloatType => v.isInstanceOf[Float]
+  case DoubleType => v.isInstanceOf[Double]
+  case _: DecimalType => v.isInstanceOf[Decimal]
+  case CalendarIntervalType => v.isInstanceOf[CalendarInterval]
+  case BinaryType => v.isInstanceOf[Array[Byte]]
+  case StringType => v.isInstanceOf[UTF8String]
+  case st: StructType =>
+v.isInstanceOf[InternalRow] && {
--- End diff --

can we do the same for array and map?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22724: [SPARK-25734][SQL] Literal should have a value co...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22724#discussion_r225392872
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
 ---
@@ -196,6 +197,48 @@ object Literal {
 case other =>
   throw new RuntimeException(s"no default for type $dataType")
   }
+
+  private[expressions] def validateLiteralValue(value: Any, dataType: 
DataType): Unit = {
+def doValidate(v: Any, dataType: DataType): Boolean = dataType match {
+  case BooleanType => v.isInstanceOf[Boolean]
+  case ByteType => v.isInstanceOf[Byte]
+  case ShortType => v.isInstanceOf[Short]
+  case IntegerType | DateType => v.isInstanceOf[Int]
+  case LongType | TimestampType => v.isInstanceOf[Long]
+  case FloatType => v.isInstanceOf[Float]
+  case DoubleType => v.isInstanceOf[Double]
+  case _: DecimalType => v.isInstanceOf[Decimal]
+  case CalendarIntervalType => v.isInstanceOf[CalendarInterval]
+  case BinaryType => v.isInstanceOf[Array[Byte]]
+  case StringType => v.isInstanceOf[UTF8String]
+  case st: StructType =>
+v.isInstanceOf[InternalRow] && {
+  val row = v.asInstanceOf[InternalRow]
+  st.fields.map(_.dataType).zipWithIndex.forall {
+case (dt, i) => doValidate(row.get(i, dt), dt)
+  }
+}
+  case at: ArrayType =>
+v.isInstanceOf[GenericArrayData] && {
+  val ar = v.asInstanceOf[GenericArrayData].array
+  ar.isEmpty || doValidate(ar.head, at.elementType)
--- End diff --

I think we need to validate all the elements


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22597: [SPARK-25579][SQL] Use quoted attribute names if ...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22597#discussion_r225373279
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
 ---
@@ -383,4 +385,17 @@ class OrcFilterSuite extends OrcTest with 
SharedSQLContext {
   )).get.toString
 }
   }
+
+  test("SPARK-25579 ORC PPD should support column names with dot") {
+import testImplicits._
+
+withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+  withTempDir { dir =>
+val path = new File(dir, "orc").getCanonicalPath
+Seq((1, 2), (3, 4)).toDF("col.dot.1", "col.dot.2").write.orc(path)
+val df = spark.read.orc(path).where("`col.dot.1` = 1 and 
`col.dot.2` = 2")
+checkAnswer(stripSparkFilter(df), Row(1, 2))
--- End diff --

ORC data source doesn't support nested column pruning yet.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22597: [SPARK-25579][SQL] Use quoted attribute names if ...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22597#discussion_r225373223
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
 ---
@@ -383,4 +385,17 @@ class OrcFilterSuite extends OrcTest with 
SharedSQLContext {
   )).get.toString
 }
   }
+
+  test("SPARK-25579 ORC PPD should support column names with dot") {
+import testImplicits._
+
+withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+  withTempDir { dir =>
+val path = new File(dir, "orc").getCanonicalPath
+Seq((1, 2), (3, 4)).toDF("col.dot.1", "col.dot.2").write.orc(path)
--- End diff --

Do not rely on implicit environment values, let's make the test as explicit 
as possible.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22719: [SPARK-25714] [BACKPORT-2.2] Fix Null Handling in the Op...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22719
  
thanks, merging to 2.2!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22718: [SPARK-25714] [BACKPORT-2.3] Fix Null Handling in the Op...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22718
  
thanks, merging to 2.3!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22724: [SPARK-25734][SQL] Literal should have a value co...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22724#discussion_r225368754
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
 ---
@@ -196,6 +197,48 @@ object Literal {
 case other =>
   throw new RuntimeException(s"no default for type $dataType")
   }
+
+  private[expressions] def validateLiteralValue(value: Any, dataType: 
DataType): Unit = {
+def doValidate(v: Any, dataType: DataType): Boolean = dataType match {
+  case BooleanType => v.isInstanceOf[Boolean]
+  case ByteType => v.isInstanceOf[Byte]
+  case ShortType => v.isInstanceOf[Short]
+  case IntegerType | DateType => v.isInstanceOf[Int]
+  case LongType | TimestampType => v.isInstanceOf[Long]
+  case FloatType => v.isInstanceOf[Float]
+  case DoubleType => v.isInstanceOf[Double]
+  case _: DecimalType => v.isInstanceOf[Decimal]
+  case CalendarIntervalType => v.isInstanceOf[CalendarInterval]
+  case BinaryType => v.isInstanceOf[Array[Byte]]
+  case StringType => v.isInstanceOf[UTF8String]
+  case st: StructType =>
+v.isInstanceOf[GenericInternalRow] && {
--- End diff --

this is definitely not true. can we do
```
v.isInstanceOf[InternalRow] && {
  val row = v.asInstanceOf[InternalRow]
  st.fields.map(_.dataType).zipWithIndex.foreach {
case (dt, i) => doValidate(row.get(i, dt), dt)
  }
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22731: [SPARK-25674][FOLLOW-UP] Update the stats for each Colum...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22731
  
LGTM, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22733: [SPARK-25738][SQL] Fix LOAD DATA INPATH for hdfs port

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22733
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22728: [SPARK-25736][SQL][TEST] add tests to verify the behavio...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22728
  
BTW MySQL doesn't support `count(a, b)` but supports `count(distinct a, 
b)`, the result is same as Spark.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r225218562
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -777,7 +777,6 @@ case class SchemaOfJson(
 }
 
 object JsonExprUtils {
-
   def evalSchemaExpr(exp: Expression): DataType = exp match {
--- End diff --

makes sense, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22724: [SPARK-25734][SQL] Literal should have a value co...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22724#discussion_r225218350
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala
 ---
@@ -196,6 +197,31 @@ object Literal {
 case other =>
   throw new RuntimeException(s"no default for type $dataType")
   }
+
+  private[expressions] def validateLiteralValue(v: Any, dataType: 
DataType): Unit = {
+def doValidate(v: Any, dataType: DataType): Boolean = dataType match {
+  case BooleanType => v.isInstanceOf[Boolean]
+  case ByteType => v.isInstanceOf[Byte]
+  case ShortType => v.isInstanceOf[Short]
+  case IntegerType | DateType => v.isInstanceOf[Int]
+  case LongType | TimestampType => v.isInstanceOf[Long]
+  case FloatType => v.isInstanceOf[Float]
+  case DoubleType => v.isInstanceOf[Double]
+  case _: DecimalType => v.isInstanceOf[Decimal]
+  case CalendarIntervalType => v.isInstanceOf[CalendarInterval]
+  case BinaryType => v.isInstanceOf[Array[Byte]]
+  case StringType => v.isInstanceOf[UTF8String]
+  case _: StructType => v.isInstanceOf[InternalRow]
+  case _: ArrayType => v.isInstanceOf[ArrayData]
+  case _: MapType => v.isInstanceOf[MapData]
--- End diff --

ah good point!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22728: [SPARK-25736][SQL][TEST] add tests to verify the behavio...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22728
  
cc @gatorsmile @mgaido91 @viirya 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22728: [SPARK-25736][SQL][TEST] add tests to verify the ...

2018-10-15 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/22728

[SPARK-25736][SQL][TEST] add tests to verify the behavior of multi-column 
count

## What changes were proposed in this pull request?

AFAIK multi-column count is not widely supported by the mainstream 
databases(postgres doesn't support), and the SQL standard doesn't define it 
clearly, as near as I can tell.

Since Spark supports it, we should clearly document the current behavior 
and add tests to verify it.

## How was this patch tested?

N/A


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark doc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22728.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22728


commit 62b4b84f135c2f71ecc8192deabec3d694b6bbc9
Author: Wenchen Fan 
Date:   2018-10-15T15:25:14Z

add tests to verify the behavior of count for corner cases




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22379
  
have you addressed 
https://github.com/apache/spark/pull/22379/files#r225033808 ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r225183977
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -777,7 +777,6 @@ case class SchemaOfJson(
 }
 
 object JsonExprUtils {
-
   def evalSchemaExpr(exp: Expression): DataType = exp match {
--- End diff --

do we still need it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22708: [SPARK-21402] Fix java array/map of structs deser...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22708#discussion_r225182746
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
 ---
@@ -271,32 +272,41 @@ object JavaTypeInference {
 
   case c if listType.isAssignableFrom(typeToken) =>
 val et = elementType(typeToken)
-MapObjects(
+UnresolvedMapObjects(
--- End diff --

yes please, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22029: [SPARK-24395][SQL] IN operator should return NULL when c...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22029
  
I don't have a strong opinion. cc @gatorsmile @hvanhovell @juliuszsompolski 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22029: [SPARK-24395][SQL] IN operator should return NULL when c...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22029
  
So, the goal here is to make the behavior consistent between multi-column 
IN-subquery and multi-column normal IN for Spark.

That said, I feel it's reasonable to change the behavior of `(a, b) in 
(struct_col1, struct_col2, ...)` to return null if a field is null, but it 
seems pretty weird to also apply this behavior to `input_struct_col in 
(struct_col1, struct_col2, ...)`. It's OK to treat the `(...)` syntax 
specially, but it's strange to treat struct type different.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22724: [SPARK-25734][SQL] Literal should have a value correspon...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22724
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22029: [SPARK-24395][SQL] IN operator should return NULL when c...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22029
  
> Oracle does the same.

So what's the behavior? return false?

> Hive behaves like Spark now (before and after the PR) for this case.

Again what's the behavior? return false?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Support par...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/20999
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22715: [SPARK-25727][SQL] Add outputOrdering to otherCopyArgs i...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22715
  
Hi @mgaido91 , since you are the major author of this part, do you have 
time to open a PR and move `outputOrdering` to the main constructor? Thanks in 
advance!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22713: [SPARK-25691][SQL] Use semantic equality in Alias...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22713#discussion_r225119359
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala 
---
@@ -112,8 +112,8 @@ object EliminateView extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
 // The child should have the same output attributes with the View 
operator, so we simply
 // remove the View operator.
-case View(_, output, child) =>
-  assert(output == child.output,
+case v @ View(_, output, child) =>
+  assert(v.sameOutput(child),
--- End diff --

+1 for adding a test case. BTW does it impact end-users? If it does we need 
to backport it to 2.4.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22708: [SPARK-21402] Fix java array/map of structs deser...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22708#discussion_r225118613
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
 ---
@@ -271,32 +272,41 @@ object JavaTypeInference {
 
   case c if listType.isAssignableFrom(typeToken) =>
 val et = elementType(typeToken)
-MapObjects(
+UnresolvedMapObjects(
--- End diff --

can we exclude other changes except this one? This one is very easy to 
reason about. We did the same thing in `ScalaReflection.`

We need more time to think about the map case, and fix it in 
`ScalaReflection` as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22561: [SPARK-25548][SQL]In the PruneFileSourcePartitions optim...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22561
  
Your rewrite is applied to the entire predicate expression tree, how about 
the case `NOT(p AND x)`? If `p` is true, `x` is false, then `NOT(p AND x)` is 
true. But `NOT(true AND true)` is false.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r225108214
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -523,35 +523,114 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitionsFilters: Seq[Seq[Expression]],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
   extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
+val timeZone = 
Option(sparkSession.sessionState.conf.sessionLocalTimeZone)
 val table = catalog.getTableMetadata(tableName)
+val partitionColumns = table.partitionColumnNames
+val partitionAttributes = table.partitionSchema.toAttributes.map(a => 
a.name -> a).toMap
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val resolvedSpecs = partitionsFilters.flatMap { filtersSpec =>
+  if (hasComplexFilters(filtersSpec)) {
+generatePartitionSpec(filtersSpec,
+  partitionColumns,
+  partitionAttributes,
+  table.identifier,
+  catalog,
+  sparkSession.sessionState.conf.resolver,
+  timeZone,
+  ifExists)
+  } else {
+val partitionSpec = filtersSpec.map {
+  case EqualTo(key: Attribute, Literal(value, StringType)) =>
+key.name -> value.toString
+}.toMap
+PartitioningUtils.normalizePartitionSpec(
+  partitionSpec,
+  partitionColumns,
+  table.identifier.quotedString,
+  sparkSession.sessionState.conf.resolver) :: Nil
+  }
 }
 
 catalog.dropPartitions(
--- End diff --

So the implementation here is similar to how hive implements it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22029: [SPARK-24395][SQL] IN operator should return NULL when c...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22029
  
Do you know how other databases behave for `(a, b) in (struct_col1, 
struct_col2, ...)` and `input_struct_col in (struct_col1, struct_col2, ...)`? 
Since `(a, b)` may be treated specially, we need to look at the behaviors 
carefully.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22713: [SPARK-25691][SQL] Use semantic equality in Alias...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22713#discussion_r225103000
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala
 ---
@@ -124,4 +124,11 @@ class RemoveRedundantAliasAndProjectSuite extends 
PlanTest with PredicateHelper
 val expected = Subquery(relation.select('a as "a", 'b).where('b < 
10).select('a).analyze)
 comparePlans(optimized, expected)
   }
+
+  test("SPARK-25691: RemoveRedundantProject works also with different 
cases") {
+val relation = LocalRelation('a.int, 'b.int)
+val query = relation.select('A, 'b).analyzeCaseInsensitive
+val optimized = Optimize.execute(query)
+comparePlans(optimized, relation)
--- End diff --

Spark can be case-sensitive or not w.r.t. the config, but Spark should 
always be case-preserving.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22514: [SPARK-25271][SQL] Hive ctas commands should use ...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22514#discussion_r225079174
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala ---
@@ -34,11 +34,16 @@ import org.apache.spark.sql.types._
  * @param tableDesc the metadata of the table to be created.
  * @param mode the data writing mode
  * @param query an optional logical plan representing data to write into 
the created table.
+ * @param useExternalSerde whether to use external serde to write data, 
e.g., Hive Serde. Currently
--- End diff --

This is too hacky. We should not leak hive specific knowledge to general 
logical plans.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r225074416
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 ---
@@ -293,6 +293,31 @@ class AstBuilder(conf: SQLConf) extends 
SqlBaseBaseVisitor[AnyRef] with Logging
 }
   }
 
+  /**
+   * Create a partition specification map with filters.
+   */
+  override def visitDropPartitionSpec(
+  ctx: DropPartitionSpecContext): Seq[Expression] = {
--- End diff --

nit: can we move `withOrigin(ctx)` here? i.e.
```
def xxx(): T = withOrigin(ctx) {
  ...
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r225073862
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
 ---
@@ -382,6 +382,30 @@ case class OuterReference(e: NamedExpression)
   override def newInstance(): NamedExpression = 
OuterReference(e.newInstance())
 }
 
+/**
+ * A place holder used to hold the name of the partition attributes 
specified when running commands
+ * involving partitions, eg. ALTER TABLE ... DROP PARTITIONS.
+ */
+case class PartitioningAttribute(name: String)
+  extends Attribute with Unevaluable {
+  override val exprId: ExprId = NamedExpression.newExprId
--- End diff --

even it's a fake attribute, we should not change the `exprId` when this 
expression gets copied. Can we move `exprId` to the constructor?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r225074108
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
 ---
@@ -382,6 +382,30 @@ case class OuterReference(e: NamedExpression)
   override def newInstance(): NamedExpression = 
OuterReference(e.newInstance())
 }
 
+/**
+ * A place holder used to hold the name of the partition attributes 
specified when running commands
+ * involving partitions, eg. ALTER TABLE ... DROP PARTITIONS.
+ */
+case class PartitioningAttribute(name: String)
+  extends Attribute with Unevaluable {
+  override val exprId: ExprId = NamedExpression.newExprId
+  // Not really needed and used. We just need a dataType to be used during 
analysis for resolving
+  // the expressions. The String type is used because all the literals in 
PARTITION operations are
+  // parsed as strings and eventually casted later.
+  override def dataType: DataType = StringType
--- End diff --

If it's not needed, can we throw exception here? We may need to override 
`toString` though.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20999: [SPARK-14922][SPARK-17732][SPARK-23866][SQL] Supp...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20999#discussion_r225076055
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -523,35 +523,114 @@ case class AlterTableRenamePartitionCommand(
  */
 case class AlterTableDropPartitionCommand(
 tableName: TableIdentifier,
-specs: Seq[TablePartitionSpec],
+partitionsFilters: Seq[Seq[Expression]],
 ifExists: Boolean,
 purge: Boolean,
 retainData: Boolean)
   extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
+val timeZone = 
Option(sparkSession.sessionState.conf.sessionLocalTimeZone)
 val table = catalog.getTableMetadata(tableName)
+val partitionColumns = table.partitionColumnNames
+val partitionAttributes = table.partitionSchema.toAttributes.map(a => 
a.name -> a).toMap
 DDLUtils.verifyAlterTableType(catalog, table, isView = false)
 DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER 
TABLE DROP PARTITION")
 
-val normalizedSpecs = specs.map { spec =>
-  PartitioningUtils.normalizePartitionSpec(
-spec,
-table.partitionColumnNames,
-table.identifier.quotedString,
-sparkSession.sessionState.conf.resolver)
+val resolvedSpecs = partitionsFilters.flatMap { filtersSpec =>
+  if (hasComplexFilters(filtersSpec)) {
+generatePartitionSpec(filtersSpec,
+  partitionColumns,
+  partitionAttributes,
+  table.identifier,
+  catalog,
+  sparkSession.sessionState.conf.resolver,
+  timeZone,
+  ifExists)
+  } else {
+val partitionSpec = filtersSpec.map {
+  case EqualTo(key: Attribute, Literal(value, StringType)) =>
+key.name -> value.toString
+}.toMap
+PartitioningUtils.normalizePartitionSpec(
+  partitionSpec,
+  partitionColumns,
+  table.identifier.quotedString,
+  sparkSession.sessionState.conf.resolver) :: Nil
+  }
 }
 
 catalog.dropPartitions(
--- End diff --

does hive have an API to drop partitions with a predicate? I think the 
current approach is very inefficient with non-equal partition predicates.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22263: [SPARK-25269][SQL] SQL interface support specify Storage...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22263
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/21322
  
Introducing a new concept which is similar to broadcast seems like an 
overkill. We can just update broadcast, to allow it to be memory-only.

However, there might be simpler solutions to fit your case, without 
broadcast. e.g.
```
val myObj = ...
rdd.mapPartitions { it =>
  try {
// process data
  } finally {
myObj.close()
  }
}
```

I think we need to clear define the use case and think about whether we 
need a new API or not.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22561: [SPARK-25548][SQL]In the PruneFileSourcePartitions optim...

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22561
  
Please describe the idea clearly in the PR description.

After reading the code, I think the idea is, for any predicate `p`, we can 
rewrite it to `p'` that only contains partition columns. The requirement is, 
partitions returned by the partition predicate `p`, must also be returned by 
`p'`.

We need to be careful here, to not introduce correctness issues. Can you 
prove that your rewrite is safe?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-15 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22379
  
I wouldn't make schema a `Columm`. `Column` means it's dynamic, while I 
think it should be a static value, to simplify the implementation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-14 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22379
  
BTW how would the `schema_of_csv` function help with `from_csv`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-14 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22379
  
Looks pretty good! just one minor commet


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r225033899
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
 ---
@@ -254,7 +256,7 @@ object TextInputCSVDataSource extends CSVDataSource {
 val header = makeSafeHeader(firstRow, caseSensitive, parsedOptions)
 val sampled: Dataset[String] = CSVUtils.sample(csv, parsedOptions)
 val tokenRDD = sampled.rdd.mapPartitions { iter =>
-  val filteredLines = CSVUtils.filterCommentAndEmpty(iter, 
parsedOptions)
+  val filteredLines = filterCommentAndEmpty(iter, parsedOptions)
--- End diff --

+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22379: [SPARK-25393][SQL] Adding new function from_csv()

2018-10-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22379#discussion_r225033808
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.csv._
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * Converts a CSV input string to a [[StructType]] with the specified 
schema.
+ */
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(csvStr, schema[, options]) - Returns a struct value with 
the given `csvStr` and `schema`.",
+  examples = """
+Examples:
+  > SELECT _FUNC_('1, 0.8', 'a INT, b DOUBLE');
+   {"a":1, "b":0.8}
+  > SELECT _FUNC_('26/08/2015', 'time Timestamp', 
map('timestampFormat', 'dd/MM/'))
+   {"time":2015-08-26 00:00:00.0}
+  """,
+  since = "3.0.0")
+// scalastyle:on line.size.limit
+case class CsvToStructs(
+schema: StructType,
+options: Map[String, String],
+child: Expression,
+timeZoneId: Option[String] = None)
+  extends UnaryExpression
+with TimeZoneAwareExpression
+with CodegenFallback
+with ExpectsInputTypes
+with NullIntolerant {
+
+  override def nullable: Boolean = child.nullable
+
+  // The CSV input data might be missing certain fields. We force the 
nullability
+  // of the user-provided schema to avoid data corruptions.
+  val nullableSchema: StructType = schema.asNullable
+
+  // Used in `FunctionRegistry`
+  def this(child: Expression, schema: Expression, options: Map[String, 
String]) =
+this(
+  schema = ExprUtils.evalSchemaExpr(schema),
+  options = options,
+  child = child,
+  timeZoneId = None)
+
+  def this(child: Expression, schema: Expression) = this(child, schema, 
Map.empty[String, String])
+
+  def this(child: Expression, schema: Expression, options: Expression) =
+this(
+  schema = ExprUtils.evalSchemaExpr(schema),
+  options = ExprUtils.convertToMapData(options),
+  child = child,
+  timeZoneId = None)
+
+  // This converts parsed rows to the desired output by the given schema.
+  @transient
+  lazy val converter = (rows: Iterator[InternalRow]) => {
+if (rows.hasNext) {
+  rows.next()
--- End diff --

what if `rows` have more than one row? shall we fail or shall we return 
null?

Up to my understanding it should fail. The parser should only return one 
row for struct type. If it doesn't, there must be a bug.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22029: [SPARK-24395][SQL] IN operator should return NULL when c...

2018-10-14 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22029
  
It looks to me that this is another instance of special handling `(a, b, 
..)`, like https://github.com/apache/spark/pull/21403

`(a, b) in (struct_col1, struct_col2, ...)` is different from 
`input_struct_col in (struct_col1, struct_col2, ...)`, right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22708: [SPARK-21402] Fix java array/map of structs deserializat...

2018-10-14 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22708
  
Can you explain how this happens? Why thhe fields of structs get mixed up?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22561: [SPARK-25548][SQL]In the PruneFileSourcePartition...

2018-10-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22561#discussion_r225030929
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
 ---
@@ -39,21 +40,31 @@ private[sql] object PruneFileSourcePartitions extends 
Rule[LogicalPlan] {
 _,
 _))
 if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined 
=>
+
+  val sparkSession = fsRelation.sparkSession
+  val partitionColumns =
+logicalRelation.resolve(
+  partitionSchema, sparkSession.sessionState.analyzer.resolver)
+  val partitionSet = AttributeSet(partitionColumns)
   // The attribute name of predicate could be different than the one 
in schema in case of
   // case insensitive, we should change them to match the one in 
schema, so we donot need to
   // worry about case sensitivity anymore.
   val normalizedFilters = filters.map { e =>
-e transform {
+e transformUp {
   case a: AttributeReference =>
 
a.withName(logicalRelation.output.find(_.semanticEquals(a)).get.name)
+  // Replace the nonPartitionOps field with true in the 
And(partitionOps, nonPartitionOps)
+  // to make the partition can be pruned
+  case and @And(left, right) =>
+val leftPartition = 
left.references.filter(partitionSet.contains(_))
+val rightPartition = 
right.references.filter(partitionSet.contains(_))
+if (leftPartition.size == left.references.size && 
rightPartition.size == 0) {
+  and.withNewChildren(Seq(left, Literal(true, BooleanType)))
+} else if (leftPartition.size == 0 && rightPartition.size == 
right.references.size) {
+  and.withNewChildren(Seq(Literal(true, BooleanType), right))
+} else and
 }
   }
-
-  val sparkSession = fsRelation.sparkSession
-  val partitionColumns =
-logicalRelation.resolve(
-  partitionSchema, sparkSession.sessionState.analyzer.resolver)
-  val partitionSet = AttributeSet(partitionColumns)
   val partitionKeyFilters =
--- End diff --

I think a simpler change is
```
val partitionKeyFilters = 
ExpressionSet(normalizedFilters.map(splitConjunctivePredicates).filter...)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22309: [SPARK-20384][SQL] Support value class in schema ...

2018-10-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22309#discussion_r225030497
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala 
---
@@ -125,6 +125,17 @@ object ScalaReflection extends ScalaReflection {
 case _ => false
   }
 
+  def isValueClass(tpe: `Type`): Boolean = {
+val notNull = !(tpe <:< localTypeOf[Null])
+notNull && definedByConstructorParams(tpe) && tpe <:< 
localTypeOf[AnyVal]
--- End diff --

why a value class must be a `Product`? Is it because there is no way to get 
the value class field name and type?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22309: [SPARK-20384][SQL] Support value class in schema ...

2018-10-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22309#discussion_r225030384
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala 
---
@@ -376,6 +387,23 @@ object ScalaReflection extends ScalaReflection {
   dataType = ObjectType(udt.getClass))
 Invoke(obj, "deserialize", ObjectType(udt.userClass), getPath :: 
Nil)
 
+  case t if isValueClass(t) =>
+// nested value class is treated as its underlying type
+// top level value class must be treated as a product
+val underlyingType = getUnderlyingTypeOf(t)
+val underlyingClsName = getClassNameFromType(underlyingType)
+val clsName = t.typeSymbol.asClass.fullName
--- End diff --

how about `getClassNameFromType(t)`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22309: [SPARK-20384][SQL] Support value class in schema ...

2018-10-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22309#discussion_r225030352
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala 
---
@@ -376,6 +387,23 @@ object ScalaReflection extends ScalaReflection {
   dataType = ObjectType(udt.getClass))
 Invoke(obj, "deserialize", ObjectType(udt.userClass), getPath :: 
Nil)
 
+  case t if isValueClass(t) =>
+// nested value class is treated as its underlying type
+// top level value class must be treated as a product
+val underlyingType = getUnderlyingTypeOf(t)
+val underlyingClsName = getClassNameFromType(underlyingType)
+val clsName = t.typeSymbol.asClass.fullName
+val newTypePath = s"""- Scala value class: 
$clsName($underlyingClsName)""" +:
+  walkedTypePath
+
+val arg = deserializerFor(underlyingType, path, newTypePath)
+if (path.isDefined) {
+  arg
--- End diff --

why can we skip the `NewInstance`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22697: [SPARK-25700][SQL][BRANCH-2.4] Partially revert append m...

2018-10-14 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/22697
  
thanks, merging to 2.4!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22713: [SPARK-25691][SQL] Use semantic equality in Optim...

2018-10-14 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22713#discussion_r225025827
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala
 ---
@@ -124,4 +124,11 @@ class RemoveRedundantAliasAndProjectSuite extends 
PlanTest with PredicateHelper
 val expected = Subquery(relation.select('a as "a", 'b).where('b < 
10).select('a).analyze)
 comparePlans(optimized, expected)
   }
+
+  test("SPARK-25691: RemoveRedundantProject works also with different 
cases") {
+val relation = LocalRelation('a.int, 'b.int)
+val query = relation.select('A, 'b).analyzeCaseInsensitive
+val optimized = Optimize.execute(query)
+comparePlans(optimized, relation)
--- End diff --

I agree that using `==` on attributes is error-prone, but we should update 
then one-by-one, to narrow down the scope and make sure the change is 
reasonable.

For instance, I don't think this is a valid case. If we optimize it, the 
final schema field names will change, which is a breaking change if this plan 
an input of a parquet writing plan. (the result parquet files will have a 
different schema)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    5   6   7   8   9   10   11   12   13   14   >