[GitHub] spark pull request #21472: [SPARK-24445][SQL] Schema in json format for from...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/21472#discussion_r192339647 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -747,8 +748,13 @@ case class StructsToJson( object JsonExprUtils { - def validateSchemaLiteral(exp: Expression): StructType = exp match { -case Literal(s, StringType) => CatalystSqlParser.parseTableSchema(s.toString) + def validateSchemaLiteral(exp: Expression): DataType = exp match { +case Literal(s, StringType) => + try { +DataType.fromJson(s.toString) --- End diff -- > How do they get the metadata ... Metadata is stored together with data in distributed fs and loaded by a standard facilities of language. > and how do they insert it into SQL? SQL statements are formed programmatically as strings, and loaded schemas are inserted in particular positions in the string ( you can think about it as quasiquotes in Scala). The formed sql statements are sent via JDBC to Spark. > Is that the only way to do it? Probably it is possible to convert schemas in JSON format to DDL format but: - it requires much more effort and time than just modifying 5 lines proposed in the PR - Schema in DDL supports only `StructType` as root types. It is not possible to specify `MapType` like in the test: https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala#L330-L345 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured Stream...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21477 **[Test build #91386 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91386/testReport)** for PR 21477 at commit [`f40dff6`](https://github.com/apache/spark/commit/f40dff64d968a8102d4e061602d007b9aaa63abd). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop iteration ...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21467 @e-dorigatti, can you maybe rebase and then start this with revert commit? That should make guys less confused. I would do squash commits first, revert it and then bring the squashed commit back. For example, `git reset HEAD^`, `git stash`, `git revert 0ebb0c0d4dd3e192464dc5e0e6f01efa55b945ed`, `git stash pop` with manual conflict resolution and `git push origin fix_udf_hack --force`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured Stream...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21477 **[Test build #91385 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91385/testReport)** for PR 21477 at commit [`0920260`](https://github.com/apache/spark/commit/0920260158ee85c6b1437378b505f74797a61ec9). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r192336364 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: Expression) } } + +object ArraySetLike { + val kindUnion = 1 + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = { +val array = new Array[Int](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 4L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { + UnsafeArrayData.fromPrimitiveArray(array) +} else { + new GenericArrayData(array) +} + } + + def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = { +val array = new Array[Long](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 8L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { + UnsafeArrayData.fromPrimitiveArray(array) +} else { + new GenericArrayData(array) +} + } + + def arrayUnion( + array1: ArrayData, + array2: ArrayData, + et: DataType, + ordering: Ordering[Any]): ArrayData = { +if (ordering == null) { + new GenericArrayData(array1.toObjectArray(et).union(array2.toObjectArray(et)) +.distinct.asInstanceOf[Array[Any]]) +} else { + val length = math.min(array1.numElements().toLong + array2.numElements().toLong, +ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) + val array = new Array[Any](length.toInt) + var pos = 0 + var hasNull = false + Seq(array1, array2).foreach(_.foreach(et, (_, v) => { +var found = false +if (v == null) { + if (hasNull) { +found = true + } else { +hasNull = true + } +} else { + var j = 0 + while (!found && j < pos) { +val va = array(j) +if (va != null && ordering.equiv(va, v)) { + found = true +} +j = j + 1 + } +} +if (!found) { + if (pos > MAX_ARRAY_LENGTH) { +throw new RuntimeException(s"Unsuccessful try to union arrays with $pos" + + s" elements due to exceeding the array size limit $MAX_ARRAY_LENGTH.") + } + array(pos) = v + pos = pos + 1 +} + })) + new GenericArrayData(array.slice(0, pos)) +} + } +} + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + def typeId: Int + + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + private def cn = left.dataType.asInstanceOf[ArrayType].containsNull || +right.dataType.asInstanceOf[ArrayType].containsNull + + @transient private lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient private lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } + + def intEval(ary: ArrayData, hs2: OpenHashSet[Int]): OpenHashSet[Int] + def longEval(ary: ArrayData, hs2: OpenHashSet[Long]): OpenHa
[GitHub] spark issue #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured Stream...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21477 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3755/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured Stream...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21477 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21467: [SPARK-23754][PYTHON][FOLLOWUP] Move UDF stop iteration ...
Github user e-dorigatti commented on the issue: https://github.com/apache/spark/pull/21467 @viirya we only want to reverd `udf.py` and the hack in `_get_argspec`. Did I miss anything there? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19602: [SPARK-22384][SQL] Refine partition pruning when attribu...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19602 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91383/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19602: [SPARK-22384][SQL] Refine partition pruning when attribu...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19602 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19602: [SPARK-22384][SQL] Refine partition pruning when attribu...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19602 **[Test build #91383 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91383/testReport)** for PR 19602 at commit [`98c2512`](https://github.com/apache/spark/commit/98c251235a1d0924a9606be82abf1005dca03e1a). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured Stream...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21477 **[Test build #91384 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91384/testReport)** for PR 21477 at commit [`701a455`](https://github.com/apache/spark/commit/701a45506d75169455384ec8eebd30e509591c30). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured Stream...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/21477 jenkins retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r192331296 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: Expression) } } + +object ArraySetLike { + val kindUnion = 1 + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = { +val array = new Array[Int](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 4L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { + UnsafeArrayData.fromPrimitiveArray(array) +} else { + new GenericArrayData(array) +} + } + + def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = { +val array = new Array[Long](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 8L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { --- End diff -- As I wrote a comment, since `UnsafeArrayData.fromPrimitiveArray()` uses `long[]`, this method can accept up to `Integer.MAX_VALUE * 8` (8 means `sizeof(long)`) as total byte size. Of course, conservatively, we limit the length by up to `Integer.MAX_VALUE`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r192330635 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: Expression) } } + +object ArraySetLike { + val kindUnion = 1 + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = { +val array = new Array[Int](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 4L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { + UnsafeArrayData.fromPrimitiveArray(array) +} else { + new GenericArrayData(array) +} + } + + def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = { +val array = new Array[Long](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 8L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { --- End diff -- `8` means of `sizeof(long)` in Java primitive. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21452: [MINOR][CORE] Log committer class used by HadoopMapRedCo...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21452 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91379/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21452: [MINOR][CORE] Log committer class used by HadoopMapRedCo...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21452 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21452: [MINOR][CORE] Log committer class used by HadoopMapRedCo...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21452 **[Test build #91379 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91379/testReport)** for PR 21452 at commit [`9881d9c`](https://github.com/apache/spark/commit/9881d9c6a2b1d56e69bb06ee27fd8706f6e0fe43). * This patch passes all tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `logInfo(s\"Using output committer class $` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21472: [SPARK-24445][SQL] Schema in json format for from...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21472#discussion_r192323643 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -747,8 +748,13 @@ case class StructsToJson( object JsonExprUtils { - def validateSchemaLiteral(exp: Expression): StructType = exp match { -case Literal(s, StringType) => CatalystSqlParser.parseTableSchema(s.toString) + def validateSchemaLiteral(exp: Expression): DataType = exp match { +case Literal(s, StringType) => + try { +DataType.fromJson(s.toString) --- End diff -- Usually they should be consistent but we don't necessarily support the obsolete functionality newly and consistently. I'm not sure how common it is to write the JSON literal as a schema via SQL. How do they get the metadata and how do they insert it into SQL? Is that the only way to do it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19602: [SPARK-22384][SQL] Refine partition pruning when attribu...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19602 In general cast is hard to be pushed into data source, e.g. `cast(a as string) = string` if a is int, how should data source handle it? In the meanwhile, I think we can omit most of the cast in the format of `attribute = literal`. e.g. `cast(byteCol as int) = 0`, we know `0` is within byte range, we can convert it to `byteCol = (byte) 0`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19602: [SPARK-22384][SQL] Refine partition pruning when ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19602#discussion_r192319924 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala --- @@ -207,65 +271,68 @@ class HiveClientSuite(version: String) } private def testMetastorePartitionFiltering( - filterString: String, + table: String, + filterExpr: Expression, expectedDs: Seq[Int], expectedH: Seq[Int], expectedChunks: Seq[String]): Unit = { testMetastorePartitionFiltering( - filterString, - (expectedDs, expectedH, expectedChunks) :: Nil, + table, + filterExpr, + Map("ds" -> expectedDs, "h" -> expectedH, "chunk" -> expectedChunks) :: Nil, identity) } private def testMetastorePartitionFiltering( - filterString: String, + table: String, + filterExpr: Expression, expectedDs: Seq[Int], expectedH: Seq[Int], expectedChunks: Seq[String], transform: Expression => Expression): Unit = { testMetastorePartitionFiltering( - filterString, - (expectedDs, expectedH, expectedChunks) :: Nil, + table, + filterExpr, + Map("ds" -> expectedDs, "h" -> expectedH, "chunk" -> expectedChunks) :: Nil, identity) } private def testMetastorePartitionFiltering( - filterString: String, - expectedPartitionCubes: Seq[(Seq[Int], Seq[Int], Seq[String])]): Unit = { -testMetastorePartitionFiltering(filterString, expectedPartitionCubes, identity) + table: String, + filterExpr: Expression, + expectedPartitionCubes: Seq[Map[String, Seq[Any]]]): Unit = { +testMetastorePartitionFiltering(table, filterExpr, expectedPartitionCubes, identity) } private def testMetastorePartitionFiltering( - filterString: String, - expectedPartitionCubes: Seq[(Seq[Int], Seq[Int], Seq[String])], + table: String, + filterExpr: Expression, + expectedPartitionCubes: Seq[Map[String, Seq[Any]]], transform: Expression => Expression): Unit = { -val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", "test"), +val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", table), Seq( -transform(parseExpression(filterString)) +transform(filterExpr) )) -val expectedPartitionCount = expectedPartitionCubes.map { - case (expectedDs, expectedH, expectedChunks) => -expectedDs.size * expectedH.size * expectedChunks.size -}.sum - -val expectedPartitions = expectedPartitionCubes.map { - case (expectedDs, expectedH, expectedChunks) => -for { - ds <- expectedDs - h <- expectedH - chunk <- expectedChunks -} yield Set( - "ds" -> ds.toString, - "h" -> h.toString, - "chunk" -> chunk -) -}.reduce(_ ++ _) +val expectedPartitionCount = expectedPartitionCubes.map(_.map(_._2.size).product).sum + +val expectedPartitions = expectedPartitionCubes.map(getPartitionsFromCube(_)).reduce(_ ++ _) val actualFilteredPartitionCount = filteredPartitions.size assert(actualFilteredPartitionCount == expectedPartitionCount, s"Expected $expectedPartitionCount partitions but got $actualFilteredPartitionCount") -assert(filteredPartitions.map(_.spec.toSet).toSet == expectedPartitions.toSet) +assert(filteredPartitions.map(_.spec).toSet == expectedPartitions.toSet) + } + + private def getPartitionsFromCube(cube: Map[String, Seq[Any]]): Seq[Map[String, String]] = { +cube.map { + case (k: String, pts: Seq[Any]) => pts.map(pt => (k, pt.toString)) +}.foldLeft(Seq(Seq[(String, String)]()))((seq0, seq1) => { --- End diff -- this is hard to read, please use loop and mutable states directly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19602: [SPARK-22384][SQL] Refine partition pruning when ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19602#discussion_r192319393 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala --- @@ -59,38 +61,62 @@ class HiveClientSuite(version: String) "h" -> h.toString, "chunk" -> chunk ), storageFormat) -assert(partitions.size == testPartitionCount) +assert(partitions0.size == testPartitionCount0) client.createPartitions( - "default", "test", partitions, ignoreIfExists = false) + "default", "test0", partitions0, ignoreIfExists = false) + +val partitions1 = + for { +pt <- 0 until 10 +chunk <- Seq("aa", "ab", "ba", "bb") + } yield CatalogTablePartition(Map( +"pt" -> pt.toString, +"chunk" -> chunk + ), storageFormat) +assert(partitions1.size == testPartitionCount1) + +client.createPartitions( + "default", "test1", partitions1, ignoreIfExists = false) + client } + private def pAttr(table: String, name: String): Attribute = { +val partTypes = client.getTable("default", table).partitionSchema.fields +.map(field => (field.name, field.dataType)).toMap +partTypes.get(name) match { + case Some(dt) => AttributeReference(name, dt)() + case None => +fail(s"Illegal name of partition attribute: $name") +} + } + override def beforeAll() { super.beforeAll() client = init(true) } test(s"getPartitionsByFilter returns all partitions when $tryDirectSqlKey=false") { val client = init(false) -val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", "test"), - Seq(parseExpression("ds=20170101"))) +val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", "test0"), + Seq(EqualTo(pAttr("test0", "ds"), Literal(20170101, IntegerType --- End diff -- we can import `org.apache.spark.sql.catalyst.dsl.expressions._` to simplify expression creation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19602: [SPARK-22384][SQL] Refine partition pruning when attribu...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/19602 And also I think we have same problem for datasource table. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21470: [SPARK-24443][SQL] comparison should accept struc...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21470#discussion_r192314729 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -803,18 +803,60 @@ object TypeCoercion { e.copy(left = Cast(e.left, TimestampType)) } - case b @ BinaryOperator(left, right) if left.dataType != right.dataType => -findTightestCommonType(left.dataType, right.dataType).map { commonType => - if (b.inputType.acceptsType(commonType)) { -// If the expression accepts the tightest common type, cast to that. -val newLeft = if (left.dataType == commonType) left else Cast(left, commonType) -val newRight = if (right.dataType == commonType) right else Cast(right, commonType) -b.withNewChildren(Seq(newLeft, newRight)) - } else { -// Otherwise, don't do anything with the expression. -b - } -}.getOrElse(b) // If there is no applicable conversion, leave expression unchanged. + case b @ BinaryOperator(left, right) + if !BinaryOperator.sameType(left.dataType, right.dataType) => +(left.dataType, right.dataType) match { + case (StructType(fields1), StructType(fields2)) => +val commonTypes = scala.collection.mutable.ArrayBuffer.empty[DataType] +val len = fields1.length +var i = 0 +var continue = fields1.length == fields2.length +while (i < len && continue) { + val commonType = findTightestCommonType(fields1(i).dataType, fields2(i).dataType) + if (commonType.isDefined) { +commonTypes += commonType.get + } else { +continue = false + } + i += 1 +} + +if (continue) { + val newLeftST = new StructType(fields1.zip(commonTypes).map { +case (f, commonType) => f.copy(dataType = commonType) + }) + val newLeft = if (left.dataType == newLeftST) left else Cast(left, newLeftST) + + val newRightST = new StructType(fields2.zip(commonTypes).map { +case (f, commonType) => f.copy(dataType = commonType) + }) + val newRight = if (right.dataType == newRightST) right else Cast(right, newRightST) + + if (b.inputType.acceptsType(newLeftST) && b.inputType.acceptsType(newRightST)) { --- End diff -- Is it possible `b` only accepts one side (e.g., only `newLeftST`) but doesn't accept other side? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21470: [SPARK-24443][SQL] comparison should accept struc...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21470#discussion_r192314292 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -803,18 +803,60 @@ object TypeCoercion { e.copy(left = Cast(e.left, TimestampType)) } - case b @ BinaryOperator(left, right) if left.dataType != right.dataType => -findTightestCommonType(left.dataType, right.dataType).map { commonType => - if (b.inputType.acceptsType(commonType)) { -// If the expression accepts the tightest common type, cast to that. -val newLeft = if (left.dataType == commonType) left else Cast(left, commonType) -val newRight = if (right.dataType == commonType) right else Cast(right, commonType) -b.withNewChildren(Seq(newLeft, newRight)) - } else { -// Otherwise, don't do anything with the expression. -b - } -}.getOrElse(b) // If there is no applicable conversion, leave expression unchanged. + case b @ BinaryOperator(left, right) + if !BinaryOperator.sameType(left.dataType, right.dataType) => +(left.dataType, right.dataType) match { + case (StructType(fields1), StructType(fields2)) => +val commonTypes = scala.collection.mutable.ArrayBuffer.empty[DataType] +val len = fields1.length +var i = 0 +var continue = fields1.length == fields2.length +while (i < len && continue) { + val commonType = findTightestCommonType(fields1(i).dataType, fields2(i).dataType) --- End diff -- What about nested structs? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19602: [SPARK-22384][SQL] Refine partition pruning when attribu...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/19602 @cloud-fan Sorry for late reply, so busy these days. In current change: 1. I follow `Cast.mayTruncate` strictly when extract partition Attribute; 2. I created new test data in `HiveClientSuite` and `testMetastorePartitionFiltering` can be used for validation for tables with different partitions schema. If needed, I can create more tests -- different binary comparisons and different datatypes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19602: [SPARK-22384][SQL] Refine partition pruning when attribu...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19602 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19602: [SPARK-22384][SQL] Refine partition pruning when attribu...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19602 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3754/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19602: [SPARK-22384][SQL] Refine partition pruning when ...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/19602#discussion_r192312477 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala --- @@ -207,65 +271,68 @@ class HiveClientSuite(version: String) } private def testMetastorePartitionFiltering( - filterString: String, + table: String, + filterExpr: Expression, expectedDs: Seq[Int], expectedH: Seq[Int], expectedChunks: Seq[String]): Unit = { testMetastorePartitionFiltering( - filterString, - (expectedDs, expectedH, expectedChunks) :: Nil, + table, + filterExpr, + Map("ds" -> expectedDs, "h" -> expectedH, "chunk" -> expectedChunks) :: Nil, identity) } private def testMetastorePartitionFiltering( - filterString: String, + table: String, + filterExpr: Expression, expectedDs: Seq[Int], expectedH: Seq[Int], expectedChunks: Seq[String], transform: Expression => Expression): Unit = { testMetastorePartitionFiltering( - filterString, - (expectedDs, expectedH, expectedChunks) :: Nil, + table, + filterExpr, + Map("ds" -> expectedDs, "h" -> expectedH, "chunk" -> expectedChunks) :: Nil, identity) } private def testMetastorePartitionFiltering( - filterString: String, - expectedPartitionCubes: Seq[(Seq[Int], Seq[Int], Seq[String])]): Unit = { -testMetastorePartitionFiltering(filterString, expectedPartitionCubes, identity) + table: String, + filterExpr: Expression, + expectedPartitionCubes: Seq[Map[String, Seq[Any]]]): Unit = { +testMetastorePartitionFiltering(table, filterExpr, expectedPartitionCubes, identity) } private def testMetastorePartitionFiltering( - filterString: String, - expectedPartitionCubes: Seq[(Seq[Int], Seq[Int], Seq[String])], + table: String, + filterExpr: Expression, + expectedPartitionCubes: Seq[Map[String, Seq[Any]]], --- End diff -- With this change, number of partition names in `expectedPartitionCubes` is not necessarily to be 3. And schema of `expectedPartitionCubes` is like Seq[Map[partition name, partition values]] --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org