[GitHub] spark pull request #21472: [SPARK-24445][SQL] Schema in json format for from...

2018-06-01 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21472#discussion_r192339647
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -747,8 +748,13 @@ case class StructsToJson(
 
 object JsonExprUtils {
 
-  def validateSchemaLiteral(exp: Expression): StructType = exp match {
-case Literal(s, StringType) => 
CatalystSqlParser.parseTableSchema(s.toString)
+  def validateSchemaLiteral(exp: Expression): DataType = exp match {
+case Literal(s, StringType) =>
+  try {
+DataType.fromJson(s.toString)
--- End diff --

> How do they get the metadata ...

Metadata is stored together with data in distributed fs and loaded by a 
standard facilities of language. 

> and how do they insert it into SQL?

SQL statements are formed programmatically as  strings, and loaded schemas 
are inserted in particular positions in the string ( you can think about it as 
quasiquotes in Scala). The formed sql statements are sent via JDBC to Spark.

> Is that the only way to do it?

Probably it is possible to convert schemas in JSON format to DDL format but:
- it requires much more effort and time than just modifying 5 lines 
proposed in the PR
- Schema in DDL supports only `StructType` as root types. It is not 
possible to specify `MapType` like in the test: 
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala#L330-L345
 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured Stream...

2018-06-01 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21477
  
**[Test build #91386 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91386/testReport)**
 for PR 21477 at commit 
[`f40dff6`](https://github.com/apache/spark/commit/f40dff64d968a8102d4e061602d007b9aaa63abd).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop iteration ...

2018-06-01 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/21467
  
@e-dorigatti, can you maybe rebase and then start this with revert commit? 
That should make guys less confused. I would do squash commits first, revert it 
and then bring the squashed commit back. For example, `git reset HEAD^`, `git 
stash`, `git revert 0ebb0c0d4dd3e192464dc5e0e6f01efa55b945ed`, `git stash pop` 
with manual conflict resolution and `git push origin fix_udf_hack --force`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured Stream...

2018-06-01 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21477
  
**[Test build #91385 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91385/testReport)**
 for PR 21477 at commit 
[`0920260`](https://github.com/apache/spark/commit/0920260158ee85c6b1437378b505f74797a61ec9).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function

2018-06-01 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21061#discussion_r192336364
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: 
Expression)
   }
 
 }
+
+object ArraySetLike {
+  val kindUnion = 1
+
+  private val MAX_ARRAY_LENGTH: Int = 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
+
+  def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = {
+val array = new Array[Int](hs.size)
+var pos = hs.nextPos(0)
+var i = 0
+while (pos != OpenHashSet.INVALID_POS) {
+  array(i) = hs.getValue(pos)
+  pos = hs.nextPos(pos + 1)
+  i += 1
+}
+
+val numBytes = 4L * array.length
+val unsafeArraySizeInBytes = 
UnsafeArrayData.calculateHeaderPortionInBytes(array.length) +
+  
org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes)
+// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max 
elements * 8 bytes can be used
+if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) {
+  UnsafeArrayData.fromPrimitiveArray(array)
+} else {
+  new GenericArrayData(array)
+}
+  }
+
+  def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = {
+val array = new Array[Long](hs.size)
+var pos = hs.nextPos(0)
+var i = 0
+while (pos != OpenHashSet.INVALID_POS) {
+  array(i) = hs.getValue(pos)
+  pos = hs.nextPos(pos + 1)
+  i += 1
+}
+
+val numBytes = 8L * array.length
+val unsafeArraySizeInBytes = 
UnsafeArrayData.calculateHeaderPortionInBytes(array.length) +
+  
org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes)
+// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max 
elements * 8 bytes can be used
+if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) {
+  UnsafeArrayData.fromPrimitiveArray(array)
+} else {
+  new GenericArrayData(array)
+}
+  }
+
+  def arrayUnion(
+  array1: ArrayData,
+  array2: ArrayData,
+  et: DataType,
+  ordering: Ordering[Any]): ArrayData = {
+if (ordering == null) {
+  new 
GenericArrayData(array1.toObjectArray(et).union(array2.toObjectArray(et))
+.distinct.asInstanceOf[Array[Any]])
+} else {
+  val length = math.min(array1.numElements().toLong + 
array2.numElements().toLong,
+ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)
+  val array = new Array[Any](length.toInt)
+  var pos = 0
+  var hasNull = false
+  Seq(array1, array2).foreach(_.foreach(et, (_, v) => {
+var found = false
+if (v == null) {
+  if (hasNull) {
+found = true
+  } else {
+hasNull = true
+  }
+} else {
+  var j = 0
+  while (!found && j < pos) {
+val va = array(j)
+if (va != null && ordering.equiv(va, v)) {
+  found = true
+}
+j = j + 1
+  }
+}
+if (!found) {
+  if (pos > MAX_ARRAY_LENGTH) {
+throw new RuntimeException(s"Unsuccessful try to union arrays 
with $pos" +
+  s" elements due to exceeding the array size limit 
$MAX_ARRAY_LENGTH.")
+  }
+  array(pos) = v
+  pos = pos + 1
+}
+  }))
+  new GenericArrayData(array.slice(0, pos))
+}
+  }
+}
+
+abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast {
+  def typeId: Int
+
+  override def dataType: DataType = left.dataType
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val typeCheckResult = super.checkInputDataTypes()
+if (typeCheckResult.isSuccess) {
+  
TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType,
+s"function $prettyName")
+} else {
+  typeCheckResult
+}
+  }
+
+  private def cn = left.dataType.asInstanceOf[ArrayType].containsNull ||
+right.dataType.asInstanceOf[ArrayType].containsNull
+
+  @transient private lazy val ordering: Ordering[Any] =
+TypeUtils.getInterpretedOrdering(elementType)
+
+  @transient private lazy val elementTypeSupportEquals = elementType match 
{
+case BinaryType => false
+case _: AtomicType => true
+case _ => false
+  }
+
+  def intEval(ary: ArrayData, hs2: OpenHashSet[Int]): OpenHashSet[Int]
+  def longEval(ary: ArrayData, hs2: OpenHashSet[Long]): OpenHa

[GitHub] spark issue #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured Stream...

2018-06-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21477
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3755/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured Stream...

2018-06-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21477
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop iteration ...

2018-06-01 Thread e-dorigatti
Github user e-dorigatti commented on the issue:

https://github.com/apache/spark/pull/21467
  
@viirya we only want to reverd `udf.py` and the hack in `_get_argspec`. Did 
I miss anything there?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19602: [SPARK-22384][SQL] Refine partition pruning when attribu...

2018-06-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19602
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91383/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19602: [SPARK-22384][SQL] Refine partition pruning when attribu...

2018-06-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19602
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19602: [SPARK-22384][SQL] Refine partition pruning when attribu...

2018-06-01 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19602
  
**[Test build #91383 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91383/testReport)**
 for PR 19602 at commit 
[`98c2512`](https://github.com/apache/spark/commit/98c251235a1d0924a9606be82abf1005dca03e1a).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured Stream...

2018-06-01 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21477
  
**[Test build #91384 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91384/testReport)**
 for PR 21477 at commit 
[`701a455`](https://github.com/apache/spark/commit/701a45506d75169455384ec8eebd30e509591c30).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured Stream...

2018-06-01 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/21477
  
jenkins retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function

2018-06-01 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21061#discussion_r192331296
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: 
Expression)
   }
 
 }
+
+object ArraySetLike {
+  val kindUnion = 1
+
+  private val MAX_ARRAY_LENGTH: Int = 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
+
+  def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = {
+val array = new Array[Int](hs.size)
+var pos = hs.nextPos(0)
+var i = 0
+while (pos != OpenHashSet.INVALID_POS) {
+  array(i) = hs.getValue(pos)
+  pos = hs.nextPos(pos + 1)
+  i += 1
+}
+
+val numBytes = 4L * array.length
+val unsafeArraySizeInBytes = 
UnsafeArrayData.calculateHeaderPortionInBytes(array.length) +
+  
org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes)
+// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max 
elements * 8 bytes can be used
+if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) {
+  UnsafeArrayData.fromPrimitiveArray(array)
+} else {
+  new GenericArrayData(array)
+}
+  }
+
+  def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = {
+val array = new Array[Long](hs.size)
+var pos = hs.nextPos(0)
+var i = 0
+while (pos != OpenHashSet.INVALID_POS) {
+  array(i) = hs.getValue(pos)
+  pos = hs.nextPos(pos + 1)
+  i += 1
+}
+
+val numBytes = 8L * array.length
+val unsafeArraySizeInBytes = 
UnsafeArrayData.calculateHeaderPortionInBytes(array.length) +
+  
org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes)
+// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max 
elements * 8 bytes can be used
+if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) {
--- End diff --

As I wrote a comment, since `UnsafeArrayData.fromPrimitiveArray()` uses 
`long[]`, this method can accept up to `Integer.MAX_VALUE * 8` (8 means 
`sizeof(long)`) as total byte size.
Of course, conservatively, we limit the length by up to `Integer.MAX_VALUE`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function

2018-06-01 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21061#discussion_r192330635
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: 
Expression)
   }
 
 }
+
+object ArraySetLike {
+  val kindUnion = 1
+
+  private val MAX_ARRAY_LENGTH: Int = 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
+
+  def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = {
+val array = new Array[Int](hs.size)
+var pos = hs.nextPos(0)
+var i = 0
+while (pos != OpenHashSet.INVALID_POS) {
+  array(i) = hs.getValue(pos)
+  pos = hs.nextPos(pos + 1)
+  i += 1
+}
+
+val numBytes = 4L * array.length
+val unsafeArraySizeInBytes = 
UnsafeArrayData.calculateHeaderPortionInBytes(array.length) +
+  
org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes)
+// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max 
elements * 8 bytes can be used
+if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) {
+  UnsafeArrayData.fromPrimitiveArray(array)
+} else {
+  new GenericArrayData(array)
+}
+  }
+
+  def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = {
+val array = new Array[Long](hs.size)
+var pos = hs.nextPos(0)
+var i = 0
+while (pos != OpenHashSet.INVALID_POS) {
+  array(i) = hs.getValue(pos)
+  pos = hs.nextPos(pos + 1)
+  i += 1
+}
+
+val numBytes = 8L * array.length
+val unsafeArraySizeInBytes = 
UnsafeArrayData.calculateHeaderPortionInBytes(array.length) +
+  
org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes)
+// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max 
elements * 8 bytes can be used
+if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) {
--- End diff --

`8` means of `sizeof(long)` in Java primitive.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21452: [MINOR][CORE] Log committer class used by HadoopMapRedCo...

2018-06-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21452
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91379/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21452: [MINOR][CORE] Log committer class used by HadoopMapRedCo...

2018-06-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21452
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21452: [MINOR][CORE] Log committer class used by HadoopMapRedCo...

2018-06-01 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21452
  
**[Test build #91379 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91379/testReport)**
 for PR 21452 at commit 
[`9881d9c`](https://github.com/apache/spark/commit/9881d9c6a2b1d56e69bb06ee27fd8706f6e0fe43).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `logInfo(s\"Using output committer class $`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21472: [SPARK-24445][SQL] Schema in json format for from...

2018-06-01 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21472#discussion_r192323643
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -747,8 +748,13 @@ case class StructsToJson(
 
 object JsonExprUtils {
 
-  def validateSchemaLiteral(exp: Expression): StructType = exp match {
-case Literal(s, StringType) => 
CatalystSqlParser.parseTableSchema(s.toString)
+  def validateSchemaLiteral(exp: Expression): DataType = exp match {
+case Literal(s, StringType) =>
+  try {
+DataType.fromJson(s.toString)
--- End diff --

Usually they should be consistent but we don't necessarily support the 
obsolete functionality newly and consistently. I'm not sure how common it is to 
write the JSON literal as a schema via SQL. How do they get the metadata and 
how do they insert it into SQL? Is that the only way to do it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19602: [SPARK-22384][SQL] Refine partition pruning when attribu...

2018-06-01 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/19602
  
In general cast is hard to be pushed into data source, e.g. `cast(a as 
string) = string` if a is int, how should data source handle it?

In the meanwhile, I think we can omit most of the cast in the format of 
`attribute = literal`. e.g. `cast(byteCol as int) = 0`, we know `0` is within 
byte range, we can convert it to `byteCol = (byte) 0`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19602: [SPARK-22384][SQL] Refine partition pruning when ...

2018-06-01 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19602#discussion_r192319924
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala 
---
@@ -207,65 +271,68 @@ class HiveClientSuite(version: String)
   }
 
   private def testMetastorePartitionFiltering(
-  filterString: String,
+  table: String,
+  filterExpr: Expression,
   expectedDs: Seq[Int],
   expectedH: Seq[Int],
   expectedChunks: Seq[String]): Unit = {
 testMetastorePartitionFiltering(
-  filterString,
-  (expectedDs, expectedH, expectedChunks) :: Nil,
+  table,
+  filterExpr,
+  Map("ds" -> expectedDs, "h" -> expectedH, "chunk" -> expectedChunks) 
:: Nil,
   identity)
   }
 
   private def testMetastorePartitionFiltering(
-  filterString: String,
+  table: String,
+  filterExpr: Expression,
   expectedDs: Seq[Int],
   expectedH: Seq[Int],
   expectedChunks: Seq[String],
   transform: Expression => Expression): Unit = {
 testMetastorePartitionFiltering(
-  filterString,
-  (expectedDs, expectedH, expectedChunks) :: Nil,
+  table,
+  filterExpr,
+  Map("ds" -> expectedDs, "h" -> expectedH, "chunk" -> expectedChunks) 
:: Nil,
   identity)
   }
 
   private def testMetastorePartitionFiltering(
-  filterString: String,
-  expectedPartitionCubes: Seq[(Seq[Int], Seq[Int], Seq[String])]): 
Unit = {
-testMetastorePartitionFiltering(filterString, expectedPartitionCubes, 
identity)
+  table: String,
+  filterExpr: Expression,
+  expectedPartitionCubes: Seq[Map[String, Seq[Any]]]): Unit = {
+testMetastorePartitionFiltering(table, filterExpr, 
expectedPartitionCubes, identity)
   }
 
   private def testMetastorePartitionFiltering(
-  filterString: String,
-  expectedPartitionCubes: Seq[(Seq[Int], Seq[Int], Seq[String])],
+  table: String,
+  filterExpr: Expression,
+  expectedPartitionCubes: Seq[Map[String, Seq[Any]]],
   transform: Expression => Expression): Unit = {
-val filteredPartitions = 
client.getPartitionsByFilter(client.getTable("default", "test"),
+val filteredPartitions = 
client.getPartitionsByFilter(client.getTable("default", table),
   Seq(
-transform(parseExpression(filterString))
+transform(filterExpr)
   ))
 
-val expectedPartitionCount = expectedPartitionCubes.map {
-  case (expectedDs, expectedH, expectedChunks) =>
-expectedDs.size * expectedH.size * expectedChunks.size
-}.sum
-
-val expectedPartitions = expectedPartitionCubes.map {
-  case (expectedDs, expectedH, expectedChunks) =>
-for {
-  ds <- expectedDs
-  h <- expectedH
-  chunk <- expectedChunks
-} yield Set(
-  "ds" -> ds.toString,
-  "h" -> h.toString,
-  "chunk" -> chunk
-)
-}.reduce(_ ++ _)
+val expectedPartitionCount = 
expectedPartitionCubes.map(_.map(_._2.size).product).sum
+
+val expectedPartitions = 
expectedPartitionCubes.map(getPartitionsFromCube(_)).reduce(_ ++ _)
 
 val actualFilteredPartitionCount = filteredPartitions.size
 
 assert(actualFilteredPartitionCount == expectedPartitionCount,
   s"Expected $expectedPartitionCount partitions but got 
$actualFilteredPartitionCount")
-assert(filteredPartitions.map(_.spec.toSet).toSet == 
expectedPartitions.toSet)
+assert(filteredPartitions.map(_.spec).toSet == 
expectedPartitions.toSet)
+  }
+
+  private def getPartitionsFromCube(cube: Map[String, Seq[Any]]): 
Seq[Map[String, String]] = {
+cube.map {
+  case (k: String, pts: Seq[Any]) => pts.map(pt => (k, pt.toString))
+}.foldLeft(Seq(Seq[(String, String)]()))((seq0, seq1) => {
--- End diff --

this is hard to read, please use loop and mutable states directly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19602: [SPARK-22384][SQL] Refine partition pruning when ...

2018-06-01 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19602#discussion_r192319393
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala 
---
@@ -59,38 +61,62 @@ class HiveClientSuite(version: String)
 "h" -> h.toString,
 "chunk" -> chunk
   ), storageFormat)
-assert(partitions.size == testPartitionCount)
+assert(partitions0.size == testPartitionCount0)
 
 client.createPartitions(
-  "default", "test", partitions, ignoreIfExists = false)
+  "default", "test0", partitions0, ignoreIfExists = false)
+
+val partitions1 =
+  for {
+pt <- 0 until 10
+chunk <- Seq("aa", "ab", "ba", "bb")
+  } yield CatalogTablePartition(Map(
+"pt" -> pt.toString,
+"chunk" -> chunk
+  ), storageFormat)
+assert(partitions1.size == testPartitionCount1)
+
+client.createPartitions(
+  "default", "test1", partitions1, ignoreIfExists = false)
+
 client
   }
 
+  private def pAttr(table: String, name: String): Attribute = {
+val partTypes = client.getTable("default", 
table).partitionSchema.fields
+.map(field => (field.name, field.dataType)).toMap
+partTypes.get(name) match {
+  case Some(dt) => AttributeReference(name, dt)()
+  case None =>
+fail(s"Illegal name of partition attribute: $name")
+}
+  }
+
   override def beforeAll() {
 super.beforeAll()
 client = init(true)
   }
 
   test(s"getPartitionsByFilter returns all partitions when 
$tryDirectSqlKey=false") {
 val client = init(false)
-val filteredPartitions = 
client.getPartitionsByFilter(client.getTable("default", "test"),
-  Seq(parseExpression("ds=20170101")))
+val filteredPartitions = 
client.getPartitionsByFilter(client.getTable("default", "test0"),
+  Seq(EqualTo(pAttr("test0", "ds"), Literal(20170101, IntegerType
--- End diff --

we can import `org.apache.spark.sql.catalyst.dsl.expressions._` to simplify 
expression creation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19602: [SPARK-22384][SQL] Refine partition pruning when attribu...

2018-06-01 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/19602
  
And also I think we have same problem for datasource table.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21470: [SPARK-24443][SQL] comparison should accept struc...

2018-06-01 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21470#discussion_r192314729
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -803,18 +803,60 @@ object TypeCoercion {
   e.copy(left = Cast(e.left, TimestampType))
 }
 
-  case b @ BinaryOperator(left, right) if left.dataType != 
right.dataType =>
-findTightestCommonType(left.dataType, right.dataType).map { 
commonType =>
-  if (b.inputType.acceptsType(commonType)) {
-// If the expression accepts the tightest common type, cast to 
that.
-val newLeft = if (left.dataType == commonType) left else 
Cast(left, commonType)
-val newRight = if (right.dataType == commonType) right else 
Cast(right, commonType)
-b.withNewChildren(Seq(newLeft, newRight))
-  } else {
-// Otherwise, don't do anything with the expression.
-b
-  }
-}.getOrElse(b)  // If there is no applicable conversion, leave 
expression unchanged.
+  case b @ BinaryOperator(left, right)
+  if !BinaryOperator.sameType(left.dataType, right.dataType) =>
+(left.dataType, right.dataType) match {
+  case (StructType(fields1), StructType(fields2)) =>
+val commonTypes = 
scala.collection.mutable.ArrayBuffer.empty[DataType]
+val len = fields1.length
+var i = 0
+var continue = fields1.length == fields2.length
+while (i < len && continue) {
+  val commonType = findTightestCommonType(fields1(i).dataType, 
fields2(i).dataType)
+  if (commonType.isDefined) {
+commonTypes += commonType.get
+  } else {
+continue = false
+  }
+  i += 1
+}
+
+if (continue) {
+  val newLeftST = new StructType(fields1.zip(commonTypes).map {
+case (f, commonType) => f.copy(dataType = commonType)
+  })
+  val newLeft = if (left.dataType == newLeftST) left else 
Cast(left, newLeftST)
+
+  val newRightST = new StructType(fields2.zip(commonTypes).map 
{
+case (f, commonType) => f.copy(dataType = commonType)
+  })
+  val newRight = if (right.dataType == newRightST) right else 
Cast(right, newRightST)
+
+  if (b.inputType.acceptsType(newLeftST) && 
b.inputType.acceptsType(newRightST)) {
--- End diff --

Is it possible `b` only accepts one side (e.g., only `newLeftST`) but 
doesn't accept other side?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21470: [SPARK-24443][SQL] comparison should accept struc...

2018-06-01 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21470#discussion_r192314292
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -803,18 +803,60 @@ object TypeCoercion {
   e.copy(left = Cast(e.left, TimestampType))
 }
 
-  case b @ BinaryOperator(left, right) if left.dataType != 
right.dataType =>
-findTightestCommonType(left.dataType, right.dataType).map { 
commonType =>
-  if (b.inputType.acceptsType(commonType)) {
-// If the expression accepts the tightest common type, cast to 
that.
-val newLeft = if (left.dataType == commonType) left else 
Cast(left, commonType)
-val newRight = if (right.dataType == commonType) right else 
Cast(right, commonType)
-b.withNewChildren(Seq(newLeft, newRight))
-  } else {
-// Otherwise, don't do anything with the expression.
-b
-  }
-}.getOrElse(b)  // If there is no applicable conversion, leave 
expression unchanged.
+  case b @ BinaryOperator(left, right)
+  if !BinaryOperator.sameType(left.dataType, right.dataType) =>
+(left.dataType, right.dataType) match {
+  case (StructType(fields1), StructType(fields2)) =>
+val commonTypes = 
scala.collection.mutable.ArrayBuffer.empty[DataType]
+val len = fields1.length
+var i = 0
+var continue = fields1.length == fields2.length
+while (i < len && continue) {
+  val commonType = findTightestCommonType(fields1(i).dataType, 
fields2(i).dataType)
--- End diff --

What about nested structs?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19602: [SPARK-22384][SQL] Refine partition pruning when attribu...

2018-06-01 Thread jinxing64
Github user jinxing64 commented on the issue:

https://github.com/apache/spark/pull/19602
  
@cloud-fan 
Sorry for late reply, so busy these days.
In current change:
1. I follow `Cast.mayTruncate` strictly when extract partition Attribute;
2. I created new test data in `HiveClientSuite` and 
`testMetastorePartitionFiltering` can be used for validation for tables with 
different partitions schema.

If needed, I can create more tests -- different binary comparisons and 
different datatypes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19602: [SPARK-22384][SQL] Refine partition pruning when attribu...

2018-06-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19602
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19602: [SPARK-22384][SQL] Refine partition pruning when attribu...

2018-06-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19602
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3754/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19602: [SPARK-22384][SQL] Refine partition pruning when ...

2018-06-01 Thread jinxing64
Github user jinxing64 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19602#discussion_r192312477
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala 
---
@@ -207,65 +271,68 @@ class HiveClientSuite(version: String)
   }
 
   private def testMetastorePartitionFiltering(
-  filterString: String,
+  table: String,
+  filterExpr: Expression,
   expectedDs: Seq[Int],
   expectedH: Seq[Int],
   expectedChunks: Seq[String]): Unit = {
 testMetastorePartitionFiltering(
-  filterString,
-  (expectedDs, expectedH, expectedChunks) :: Nil,
+  table,
+  filterExpr,
+  Map("ds" -> expectedDs, "h" -> expectedH, "chunk" -> expectedChunks) 
:: Nil,
   identity)
   }
 
   private def testMetastorePartitionFiltering(
-  filterString: String,
+  table: String,
+  filterExpr: Expression,
   expectedDs: Seq[Int],
   expectedH: Seq[Int],
   expectedChunks: Seq[String],
   transform: Expression => Expression): Unit = {
 testMetastorePartitionFiltering(
-  filterString,
-  (expectedDs, expectedH, expectedChunks) :: Nil,
+  table,
+  filterExpr,
+  Map("ds" -> expectedDs, "h" -> expectedH, "chunk" -> expectedChunks) 
:: Nil,
   identity)
   }
 
   private def testMetastorePartitionFiltering(
-  filterString: String,
-  expectedPartitionCubes: Seq[(Seq[Int], Seq[Int], Seq[String])]): 
Unit = {
-testMetastorePartitionFiltering(filterString, expectedPartitionCubes, 
identity)
+  table: String,
+  filterExpr: Expression,
+  expectedPartitionCubes: Seq[Map[String, Seq[Any]]]): Unit = {
+testMetastorePartitionFiltering(table, filterExpr, 
expectedPartitionCubes, identity)
   }
 
   private def testMetastorePartitionFiltering(
-  filterString: String,
-  expectedPartitionCubes: Seq[(Seq[Int], Seq[Int], Seq[String])],
+  table: String,
+  filterExpr: Expression,
+  expectedPartitionCubes: Seq[Map[String, Seq[Any]]],
--- End diff --

With this change, number of partition names in `expectedPartitionCubes`  is 
not necessarily to be 3. And schema of `expectedPartitionCubes` is like 
Seq[Map[partition name, partition values]]


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4