[GitHub] spark pull request #23045: [SPARK-26071][SQL] disallow map as map key
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23045#discussion_r234494542 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -521,13 +521,18 @@ case class MapEntries(child: Expression) extends UnaryExpression with ExpectsInp case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpression { override def checkInputDataTypes(): TypeCheckResult = { -var funcName = s"function $prettyName" +val funcName = s"function $prettyName" if (children.exists(!_.dataType.isInstanceOf[MapType])) { TypeCheckResult.TypeCheckFailure( s"input to $funcName should all be of type map, but it's " + children.map(_.dataType.catalogString).mkString("[", ", ", "]")) } else { - TypeUtils.checkForSameTypeInputExpr(children.map(_.dataType), funcName) + val sameTypeCheck = TypeUtils.checkForSameTypeInputExpr(children.map(_.dataType), funcName) + if (sameTypeCheck.isFailure) { +sameTypeCheck + } else { +TypeUtils.checkForMapKeyType(dataType.keyType) --- End diff -- see https://github.com/apache/spark/pull/23045/files#diff-3f19ec3d15dcd8cd42bb25dde1c5c1a9R20 . The child may be read from parquet files, so map of map is still possible. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23054: [SPARK-26085][SQL] Key attribute of non-struct ty...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23054#discussion_r234476607 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala --- @@ -459,7 +460,11 @@ class KeyValueGroupedDataset[K, V] private[sql]( columns.map(_.withInputType(vExprEnc, dataAttributes).named) val keyColumn = if (!kExprEnc.isSerializedAsStruct) { assert(groupingAttributes.length == 1) - groupingAttributes.head + if (SQLConf.get.aliasNonStructGroupingKey) { --- End diff -- we should do the lias when config is true... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23043: [SPARK-26021][SQL] replace minus zero with zero i...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23043#discussion_r234476361 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala --- @@ -723,4 +723,32 @@ class DataFrameAggregateSuite extends QueryTest with SharedSQLContext { "grouping expressions: [current_date(None)], value: [key: int, value: string], " + "type: GroupBy]")) } + + test("SPARK-26021: Double and Float 0.0/-0.0 should be equal when grouping") { +val colName = "i" +def groupByCollect(df: DataFrame): Array[Row] = { + df.groupBy(colName).count().collect() +} +def assertResult[T](result: Array[Row], zero: T)(implicit ordering: Ordering[T]): Unit = { + assert(result.length == 1) + // using compare since 0.0 == -0.0 is true + assert(ordering.compare(result(0).getAs[T](0), zero) == 0) --- End diff -- Instead of checking the result, I prefer the code snippet in the JIRA ticket, which is more obvious about where is the problem. Let's run a group-by query, with both 0.0 and -0.0 in the input. Then we check the number of result rows, as ideally 0.0 and -0.0 is same, so we should only have one group(one result row). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23043: [SPARK-26021][SQL] replace minus zero with zero i...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23043#discussion_r234475978 --- Diff: common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java --- @@ -157,4 +159,15 @@ public void heapMemoryReuse() { Assert.assertEquals(onheap4.size(), 1024 * 1024 + 7); Assert.assertEquals(obj3, onheap4.getBaseObject()); } + + @Test + // SPARK-26021 + public void writeMinusZeroIsReplacedWithZero() { +byte[] doubleBytes = new byte[Double.BYTES]; +byte[] floatBytes = new byte[Float.BYTES]; +Platform.putDouble(doubleBytes, Platform.BYTE_ARRAY_OFFSET, -0.0d); +Platform.putFloat(floatBytes, Platform.BYTE_ARRAY_OFFSET, -0.0f); +Assert.assertEquals(0, Double.compare(0.0d, ByteBuffer.wrap(doubleBytes).getDouble())); --- End diff -- are you sure this test fails before the fix? IIUC `0.0 == -0.0` is ture, but they have different binary format --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23043: [SPARK-26021][SQL] replace minus zero with zero i...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23043#discussion_r234476055 --- Diff: common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java --- @@ -157,4 +159,15 @@ public void heapMemoryReuse() { Assert.assertEquals(onheap4.size(), 1024 * 1024 + 7); Assert.assertEquals(obj3, onheap4.getBaseObject()); } + + @Test + // SPARK-26021 + public void writeMinusZeroIsReplacedWithZero() { +byte[] doubleBytes = new byte[Double.BYTES]; +byte[] floatBytes = new byte[Float.BYTES]; +Platform.putDouble(doubleBytes, Platform.BYTE_ARRAY_OFFSET, -0.0d); +Platform.putFloat(floatBytes, Platform.BYTE_ARRAY_OFFSET, -0.0f); +Assert.assertEquals(0, Double.compare(0.0d, ByteBuffer.wrap(doubleBytes).getDouble())); --- End diff -- BTW thanks for adding the unit test! It's a good complementary to the end-to-end test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23043: [SPARK-26021][SQL] replace minus zero with zero i...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23043#discussion_r234475858 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java --- @@ -120,6 +120,9 @@ public static float getFloat(Object object, long offset) { } public static void putFloat(Object object, long offset, float value) { +if(value == -0.0f) { --- End diff -- I'm fine to put this trick here, shall we also move the IsNaN logic to here as well? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23025: [SPARK-26024][SQL]: Update documentation for repa...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23025#discussion_r234475550 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -2789,6 +2789,12 @@ class Dataset[T] private[sql]( * When no explicit sort order is specified, "ascending nulls first" is assumed. * Note, the rows are not sorted in each partition of the resulting Dataset. * + * + * Note that due to performance reasons this method uses sampling to estimate the ranges. + * Hence, the output may not be consistent, since sampling can return different values. + * The sample size can be controlled by setting the value of the parameter + * `spark.sql.execution.rangeExchange.sampleSizePerPartition`. --- End diff -- It's not a parameter but a config. So I'd like to propose ``` The sample size can be controlled by the config `xxx` ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23025: [SPARK-26024][SQL]: Update documentation for repartition...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23025 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23054: [SPARK-26085][SQL] Key attribute of primitive typ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23054#discussion_r234475321 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1594,6 +1594,15 @@ object SQLConf { "WHERE, which does not follow SQL standard.") .booleanConf .createWithDefault(false) + + val LEGACY_ATOMIC_KEY_ATTRIBUTE_GROUP_BY_KEY = +buildConf("spark.sql.legacy.atomicKeyAttributeGroupByKey") --- End diff -- `spark.sql.legacy.dataset.aliasNonStructGroupingKey`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23054: [SPARK-26085][SQL] Key attribute of primitive typ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23054#discussion_r234475156 --- Diff: docs/sql-migration-guide-upgrade.md --- @@ -17,6 +17,9 @@ displayTitle: Spark SQL Upgrading Guide - The `ADD JAR` command previously returned a result set with the single value 0. It now returns an empty result set. + - In Spark version 2.4 and earlier, `Dataset.groupByKey` results to a grouped dataset with key attribute wrongly named as "value", if the key is atomic type, e.g. int, string, etc. This is counterintuitive and makes the schema of aggregation queries weird. For example, the schema of `ds.groupByKey(...).count()` is `(value, count)`. Since Spark 3.0, we name the grouping attribute to "key". The old behaviour is preserved under a newly added configuration `spark.sql.legacy.atomicKeyAttributeGroupByKey` with a default value of `false`. --- End diff -- I realized that, only struct type key has the `key` alias. So here we should say: `if the key is non-struct type, e.g. int, string, array, etc.` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23079: [SPARK-26107][SQL] Extend ReplaceNullWithFalseInP...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23079#discussion_r234474562 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -767,6 +767,15 @@ object ReplaceNullWithFalse extends Rule[LogicalPlan] { replaceNullWithFalse(cond) -> value } cw.copy(branches = newBranches) + case af @ ArrayFilter(_, lf @ LambdaFunction(func, _, _)) => --- End diff -- shall we add a `withNewFunctions` method in `HigherOrderFunction`? Then we can simplify this rule to ``` case f: HigherOrderFunction => f.withNewFunctions(f.functions.map(replaceNullWithFalse)) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23042: [SPARK-26070][SQL] add rule for implicit type coe...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23042#discussion_r234401696 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -138,6 +138,11 @@ object TypeCoercion { case (DateType, TimestampType) => if (conf.compareDateTimestampInTimestamp) Some(TimestampType) else Some(StringType) +// to support a popular use case of tables using Decimal(X, 0) for long IDs instead of strings +// see SPARK-26070 for more details +case (n: DecimalType, s: StringType) if n.scale == 0 => Some(DecimalType(n.precision, n.scale)) --- End diff -- > no implicit cast at all Is that too strict? I feel it's OK to compare an int with long. Maybe we should come up with a list of "definitely safe" type coercions, and allow them only. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23054: [SPARK-26085][SQL] Key attribute of primitive typ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23054#discussion_r234401319 --- Diff: docs/sql-migration-guide-upgrade.md --- @@ -17,6 +17,8 @@ displayTitle: Spark SQL Upgrading Guide - The `ADD JAR` command previously returned a result set with the single value 0. It now returns an empty result set. + - In Spark version 2.4 and earlier, the key attribute is wrongly named as "value" for primitive key type when doing typed aggregation on Dataset. This attribute is now named as "key" since Spark 3.0 like complex key type. --- End diff -- ``` In Spark version 2.4 and earlier, `Dataset.groupByKey` results to a grouped dataset with key attribute wrongly named as "value", if the `Dataset` element is of atomic type, e.g. int, string, etc. This is counterintuitive and makes the schema of aggregation queries weird. For example, the schema of `ds.groupByKey(...).count()` is `(value, count)`. Since Spark 3.0, we name the grouping attribute to "key". ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22547: [SPARK-25528][SQL] data source V2 read side API refactor...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22547 I was stuck with some personal business recently, I'll send a PR for batch source after the weekend. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23040: [SPARK-26068][Core]ChunkedByteBufferInputStream should h...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23040 also cc @jiangxb1987 @zsxwing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23040: [SPARK-26068][Core]ChunkedByteBufferInputStream should h...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23040 LGTM except one comment --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23040: [SPARK-26068][Core]ChunkedByteBufferInputStream s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23040#discussion_r234395227 --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala --- @@ -222,7 +222,7 @@ private[spark] class ChunkedByteBufferInputStream( dispose: Boolean) extends InputStream { - private[this] var chunks = chunkedByteBuffer.getChunks().iterator + private[this] var chunks = chunkedByteBuffer.getChunks().filter(_.hasRemaining).iterator --- End diff -- can you add a comment above, saying that we do this filter because `read` assumes `chunks` has no empty chunk? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23043: [SPARK-26021][SQL] replace minus zero with zero in Unsaf...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23043 `UnsafeRow.set` is not the only place to write float/double as binary data, can you check other places like UnsafeWriter? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23043: [SPARK-26021][SQL] replace minus zero with zero in Unsaf...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23043 Looking at `UnsafeRow.putFloat`, it normalizes the value of `Float.NaN`. I think we should do the same there for `-0.0`, and other related places (check how we handle Float.NaN). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23043: [SPARK-26021][SQL] replace minus zero with zero in Unsaf...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23043 Before rushing to a fix that replaces -0.0 to 0.0, I'd like to know how this bug happens. One possible reason might be, 0.0 and -0.0 have different binary format. Spark use unsafe API to write float/double, maybe we can investigate that first. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23054: [SPARK-26085][SQL] Key attribute of primitive type under...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23054 makes sense to me. This is a behavior change right? Shall we write a migration guide? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23042: [SPARK-26070][SQL] add rule for implicit type coe...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23042#discussion_r234091858 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -138,6 +138,11 @@ object TypeCoercion { case (DateType, TimestampType) => if (conf.compareDateTimestampInTimestamp) Some(TimestampType) else Some(StringType) +// to support a popular use case of tables using Decimal(X, 0) for long IDs instead of strings +// see SPARK-26070 for more details +case (n: DecimalType, s: StringType) if n.scale == 0 => Some(DecimalType(n.precision, n.scale)) --- End diff -- CC @gatorsmile @mgaido91 I think it's time to look at the SQL standard and other mainstream databases, and see how shall we update the type coercions rules with safe mode. What do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23044: [SPARK-26073][SQL][FOLLOW-UP] remove invalid comment as ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23044 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23046: [SPARK-23207][SQL][FOLLOW-UP] Use `SQLConf.get.enableRad...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23046 thanks, merging to master/2.4! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23046: [SPARK-23207][SQL][FOLLOW-UP] Use `SQLConf.get.en...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23046#discussion_r234088968 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -280,7 +280,7 @@ object ShuffleExchangeExec { } // The comparator for comparing row hashcode, which should always be Integer. val prefixComparator = PrefixComparators.LONG - val canUseRadixSort = SparkEnv.get.conf.get(SQLConf.RADIX_SORT_ENABLED) + val canUseRadixSort = SQLConf.get.enableRadixSort --- End diff -- It's a small bug fix, so no need to backport to all the branches. I think 2.4 is good enough --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23040: [SPARK-26068][Core]ChunkedByteBufferInputStream should h...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23040 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23029: [SPARK-26055][CORE] InterfaceStability annotations shoul...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23029 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22989: [SPARK-25986][Build] Add rules to ban throw Error...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22989#discussion_r233821654 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/UnsafeAlignedOffset.java --- @@ -39,7 +39,9 @@ public static int getSize(Object object, long offset) { case 8: return (int)Platform.getLong(object, offset); default: +// checkstyle.off: RegexpSinglelineJava throw new AssertionError("Illegal UAO_SIZE"); --- End diff -- yea, that's exactly the use case of `IllegalStateException`, which can also pass the style check here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23043: [SPARK-26021][SQL] replace minus zero with zero in Unsaf...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23043 This only works for attribute, not literal or intermedia result. Is there a better place to fix it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23040: [SPARK-26068][Core]ChunkedByteBufferInputStream should h...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23040 It's good to fix a potential bug, can you add a unit test? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23044: [SPARK-26073][SQL][FOLLOW-UP] remove invalid comment as ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23044 add to whitelist --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23044: [SPARK-26073][SQL][FOLLOW-UP] remove invalid comment as ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23044 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23044: [SPARK-26073][SQL][FOLLOW-UP] remove invalid comm...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23044#discussion_r233819066 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala --- @@ -76,8 +76,6 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR row: String, ordering: Seq[SortOrder]): Seq[ExprCode] = { ctx.INPUT_ROW = row -// to use INPUT_ROW we must make sure currentVars is null -ctx.currentVars = null --- End diff -- I think the conclusion was to keep it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23046: [SPARK-23207][SQL][FOLLOW-UP] Use `SQLConf.get.enableRad...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23046 good catch! LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23035: [SPARK-26057][SQL] Transform also analyzed plans when de...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23035 thanks, merging to master/2.4! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23042: [SPARK-26070][SQL] add rule for implicit type coe...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23042#discussion_r233816966 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -138,6 +138,11 @@ object TypeCoercion { case (DateType, TimestampType) => if (conf.compareDateTimestampInTimestamp) Some(TimestampType) else Some(StringType) +// to support a popular use case of tables using Decimal(X, 0) for long IDs instead of strings +// see SPARK-26070 for more details +case (n: DecimalType, s: StringType) if n.scale == 0 => Some(DecimalType(n.precision, n.scale)) --- End diff -- what if the decimal is (1, 0) and the string is something like `.`? The string can be anything: a very big integer, a fraction with many digits after the dot, etc. I don't think there is a perfect solution, casting to double is the best we can do here. I'd suggest end users to manually do the cast which fits their data best. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23045: [SPARK-26071][SQL] disallow map as map key
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23045 cc @gatorsmile @dongjoon-hyun @viirya --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23045: [SPARK-26071][SQL] disallow map as map key
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/23045 [SPARK-26071][SQL] disallow map as map key ## What changes were proposed in this pull request? Due to implementation limitation, currently Spark can't compare or do equality check between map types. As a result, map values can't appear in EQUAL or comparison expressions, can't be grouping key, etc. The more important thing is, map loop up needs to do equality check of the map key, and thus can't support map as map key when looking up values from a map. Thus it's not useful to have map as map key. This PR proposes to stop users from creating maps using map type as key. The list of expressions that are updated: `CreateMap`, `MapFromArrays`, `MapFromEntries`, `MapConcat`, `TransformKeys`. I manually checked all the places that create `MapType`, and came up with this list. Note that, maps with map type key still exist, via reading from parquet files, converting from scala/java map, etc. This PR is not to completely forbid map as map key, but to avoid creating it by Spark itself. Motivation: when I was trying to fix the duplicate key problem, I found it's impossible to do it with map type map key. I think it's reasonable to avoid map type map key for builtin functions. ## How was this patch tested? updated test You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark map-key Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23045.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23045 commit 3ff0cd592c52839d0aac739b44cee0cf02e951bc Author: Wenchen Fan Date: 2018-11-15T10:23:58Z disallow map as map key --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22976: [SPARK-25974][SQL]Optimizes Generates bytecode fo...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22976#discussion_r233787377 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala --- @@ -133,30 +126,26 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR returnType = "int", makeSplitFunction = { body => s""" - InternalRow ${ctx.INPUT_ROW} = null; // Holds current row being evaluated. - $body - return 0; -""" + |$body + |return 0; +""".stripMargin }, foldFunctions = { funCalls => funCalls.zipWithIndex.map { case (funCall, i) => val comp = ctx.freshName("comp") s""" -int $comp = $funCall; -if ($comp != 0) { - return $comp; -} - """ +|int $comp = $funCall; +|if ($comp != 0) { +| return $comp; +|} + """.stripMargin }.mkString }) ctx.currentVars = oldCurrentVars ctx.INPUT_ROW = oldInputRow // make sure INPUT_ROW is declared even if splitExpressions // returns an inlined block --- End diff -- sorry didn't see this comment when merging. feel free to send a follow-up. thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22976: [SPARK-25974][SQL]Optimizes Generates bytecode for order...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22976 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22989: [SPARK-25986][Build] Add rules to ban throw Error...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22989#discussion_r233706605 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/UnsafeAlignedOffset.java --- @@ -52,7 +54,9 @@ public static void putSize(Object object, long offset, int value) { Platform.putLong(object, offset, value); break; default: +// checkstyle.off: RegexpSinglelineJava throw new AssertionError("Illegal UAO_SIZE"); --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22989: [SPARK-25986][Build] Add rules to ban throw Error...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22989#discussion_r233706517 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/UnsafeAlignedOffset.java --- @@ -39,7 +39,9 @@ public static int getSize(Object object, long offset) { case 8: return (int)Platform.getLong(object, offset); default: +// checkstyle.off: RegexpSinglelineJava throw new AssertionError("Illegal UAO_SIZE"); --- End diff -- shall we throw `IllegalStateException` here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23035: [SPARK-26057][SQL] Transform also analyzed plans ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23035#discussion_r233696401 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -2554,4 +2554,34 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { checkAnswer(swappedDf.filter($"key"($"map") > "a"), Row(2, Map(2 -> "b"))) } + + test("SPARK-26057: attribute deduplication on already analyzed plans") { +withTempView("cc", "p", "c") { + val df1 = Seq(("1-1", "sp", 6)).toDF("id", "layout", "n") + df1.createOrReplaceTempView("cc") + val df2 = Seq(("sp", 1)).toDF("layout", "ts") + df2.createOrReplaceTempView("p") + val df3 = Seq(("1-1", "sp", 3)).toDF("id", "layout", "ts") + df3.createOrReplaceTempView("c") + spark.sql( +""" + |SELECT cc.id, cc.layout, count(*) as m + |FROM cc + |JOIN p USING(layout) + |WHERE EXISTS( + | SELECT 1 + | FROM c + | WHERE c.id = cc.id AND c.layout = cc.layout AND c.ts > p.ts) + |GROUP BY cc.id, cc.layout +""".stripMargin).createOrReplaceTempView("pcc") + val res = spark.sql( --- End diff -- good catch on the problem! Do you think it's possible to simplify the test? I think we just need a temp view with subquery, and use it in a join. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23035: [SPARK-26057][SQL] Transform also analyzed plans ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23035#discussion_r233695765 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -2554,4 +2554,34 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { checkAnswer(swappedDf.filter($"key"($"map") > "a"), Row(2, Map(2 -> "b"))) } + + test("SPARK-26057: attribute deduplication on already analyzed plans") { +withTempView("cc", "p", "c") { --- End diff -- if we don't care about naming, how about `a, b, c` instead of `cc, p, c`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23029: [SPARK-26055][CORE] InterfaceStability annotations shoul...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23029 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23029: [SPARK-26055][CORE] InterfaceStability annotation...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23029#discussion_r233322079 --- Diff: common/tags/src/main/java/org/apache/spark/annotation/InterfaceStability.java --- @@ -17,7 +17,7 @@ package org.apache.spark.annotation; -import java.lang.annotation.Documented; +import java.lang.annotation.*; --- End diff -- Actually we need to import 5 classes, that's why my IDE turned it into * automatically... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23029: [SPARK-26055][CORE] InterfaceStability annotation...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/23029 [SPARK-26055][CORE] InterfaceStability annotations should be retained at runtime ## What changes were proposed in this pull request? It's good to have annotations available at runtime, so that tools like MiMa can detect them and deal with then specially. e.g. we don't want to track compatibility for unstable classes. This PR makes `InterfaceStability` annotations to be retained at runtime, to be consistent with `Experimental` and `DeveloperApi` ## How was this patch tested? N/A You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark annotation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23029.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23029 commit b71dd6d86378a086ea66915b4041d844248eaacb Author: Wenchen Fan Date: 2018-11-14T05:39:00Z InterfaceStability annotations should be retained at runtime --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23029: [SPARK-26055][CORE] InterfaceStability annotations shoul...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23029 cc @rxin @srowen @vanzin @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21957: [SPARK-24994][SQL] When the data type of the fiel...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21957#discussion_r233287374 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -269,7 +269,8 @@ case class FileSourceScanExec( } @transient - private val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter) + private val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy. +translateFilter(_, !relation.fileFormat.isInstanceOf[ParquetSource])) --- End diff -- I don't think we accept changes like this. If this is specific to parquet, do it in `ParquetFilters`. And I still prefer to normalize the filters and remove unnecessary cast, before pushing filters down to data sources. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22518: [SPARK-25482][SQL] Avoid pushdown of subqueries to data ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22518 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22961: [SPARK-25947][SQL] Reduce memory usage in ShuffleExchang...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22961 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22962#discussion_r233117222 --- Diff: python/pyspark/taskcontext.py --- @@ -147,8 +147,8 @@ def __init__(self): @classmethod def _getOrCreate(cls): """Internal function to get or create global BarrierTaskContext.""" -if cls._taskContext is None: -cls._taskContext = BarrierTaskContext() +if not isinstance(cls._taskContext, BarrierTaskContext): +cls._taskContext = object.__new__(cls) --- End diff -- could you add some comments to explain it? so that people won't get confused again. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22961: [SPARK-25947][SQL] Reduce memory usage in ShuffleExchang...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22961 cool thanks! LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22962: [SPARK-25921][PySpark] Fix barrier task run witho...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22962#discussion_r233033846 --- Diff: python/pyspark/taskcontext.py --- @@ -147,8 +147,8 @@ def __init__(self): @classmethod def _getOrCreate(cls): """Internal function to get or create global BarrierTaskContext.""" -if cls._taskContext is None: -cls._taskContext = BarrierTaskContext() +if not isinstance(cls._taskContext, BarrierTaskContext): +cls._taskContext = object.__new__(cls) --- End diff -- ah good point! @xuanyuanking can you send a small followup? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23004: [SPARK-26004][SQL] InMemoryTable support StartsWi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23004#discussion_r233033597 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala --- @@ -237,6 +237,13 @@ case class InMemoryTableScanExec( if list.forall(ExtractableLiteral.unapply(_).isDefined) && list.nonEmpty => list.map(l => statsFor(a).lowerBound <= l.asInstanceOf[Literal] && l.asInstanceOf[Literal] <= statsFor(a).upperBound).reduce(_ || _) + +case StartsWith(a: AttributeReference, ExtractableLiteral(l)) => + statsFor(a).lowerBound.substr(0, Length(l)) <= l && +l <= statsFor(a).upperBound.substr(0, Length(l)) +case StartsWith(ExtractableLiteral(l), a: AttributeReference) => --- End diff -- same question --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22518: [SPARK-25482][SQL] Avoid pushdown of subqueries t...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22518#discussion_r233032721 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala --- @@ -1268,4 +1269,16 @@ class SubquerySuite extends QueryTest with SharedSQLContext { assert(getNumSortsInQuery(query5) == 1) } } + + test("SPARK-25482: Forbid pushdown to dattasources of filters containing subqueries") { --- End diff -- `dattasources` typo --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22518: [SPARK-25482][SQL] Avoid pushdown of subqueries t...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22518#discussion_r233032650 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala --- @@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{expressions, InternalRow} -import org.apache.spark.sql.catalyst.expressions.{Expression, ExprId, InSet, Literal, PlanExpression} +import org.apache.spark.sql.catalyst.expressions.{Expression, ExprId, InSet, Literal, NamedExpression, PlanExpression} --- End diff -- unnecessary change --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22962: [SPARK-25921][PySpark] Fix barrier task run without Barr...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22962 LGTM, merging to master/2.4! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23004: [SPARK-26004][SQL] InMemoryTable support StartsWi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23004#discussion_r232945864 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala --- @@ -237,6 +237,13 @@ case class InMemoryTableScanExec( if list.forall(ExtractableLiteral.unapply(_).isDefined) && list.nonEmpty => list.map(l => statsFor(a).lowerBound <= l.asInstanceOf[Literal] && l.asInstanceOf[Literal] <= statsFor(a).upperBound).reduce(_ || _) + +case StartsWith(a: AttributeReference, ExtractableLiteral(l)) => --- End diff -- can you add some comment to explain it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22944: [SPARK-25942][SQL] Aggregate expressions shouldn'...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22944#discussion_r232941317 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala --- @@ -1556,6 +1556,20 @@ class DatasetSuite extends QueryTest with SharedSQLContext { df.where($"city".contains(new java.lang.Character('A'))), Seq(Row("Amsterdam"))) } + + test("SPARK-25942: typed aggregation on primitive type") { +val ds = Seq(1, 2, 3).toDS() + +val agg = ds.groupByKey(_ >= 2) + .agg(sum("value").as[Long], sum($"value" + 1).as[Long]) +assert(agg.collect() === Seq((false, 1, 2), (true, 5, 7))) + } + + test("SPARK-25942: typed aggregation on product type") { +val ds = Seq((1, 2), (2, 3), (3, 4)).toDS() +val agg = ds.groupByKey(x => x).agg(sum("_1").as[Long], sum($"_2" + 1).as[Long]) +assert(agg.collect().sorted === Seq(((1, 2), 1, 3), ((2, 3), 2, 4), ((3, 4), 3, 5))) --- End diff -- can you try `((1, 2), 1L, 3L)` instead of `((1, 2), 1, 3)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22977: [SPARK-26030][BUILD] Bump previousSparkVersion in MimaBu...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22977 since this PR only touches mima, and the jenkins already passed the mima check, I'm going to merge it to master, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22518: [SPARK-25482][SQL] Avoid pushdown of subqueries to data ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22518 BTW can you include a simple benchmark to show this problem? e.g. just run a query in spark-shell, and post the result before and after this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22518: [SPARK-25482][SQL] Avoid pushdown of subqueries to data ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22518 I'd like to merge this simple PR first, to address the performance problem (unnecessary subquery execution). Let's create a new ticket for subquery filter pushing to data source, and have more people to attend the discussion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22518: [SPARK-25482][SQL] Avoid pushdown of subqueries t...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22518#discussion_r232906707 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala --- @@ -47,7 +47,8 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { case a: AttributeReference => a.withName(logicalRelation.output.find(_.semanticEquals(a)).get.name) } - } + }.filterNot(SubqueryExpression.hasSubquery) --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22518: [SPARK-25482][SQL] Avoid pushdown of subqueries t...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22518#discussion_r232906743 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala --- @@ -1268,4 +1269,16 @@ class SubquerySuite extends QueryTest with SharedSQLContext { assert(getNumSortsInQuery(query5) == 1) } } + + test("SPARK-25482: Reuse same Subquery in order to execute it only once") { --- End diff -- let's update the test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22518: [SPARK-25482][SQL] Avoid pushdown of subqueries t...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22518#discussion_r232906652 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala --- @@ -155,15 +155,14 @@ object FileSourceStrategy extends Strategy with Logging { case a: AttributeReference => a.withName(l.output.find(_.semanticEquals(a)).get.name) } - } + }.filterNot(SubqueryExpression.hasSubquery) --- End diff -- shall we do the filter before the `map`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22961: [SPARK-25947][SQL] Reduce memory usage in Shuffle...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22961#discussion_r232906123 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -214,13 +214,22 @@ object ShuffleExchangeExec { override def getPartition(key: Any): Int = key.asInstanceOf[Int] } case RangePartitioning(sortingExpressions, numPartitions) => -// Internally, RangePartitioner runs a job on the RDD that samples keys to compute -// partition bounds. To get accurate samples, we need to copy the mutable keys. +// Extract only fields used for sorting to avoid collecting large fields that does not +// affect sorting result when deciding partition bounds in RangePartitioner val rddForSampling = rdd.mapPartitionsInternal { iter => + val projection = +UnsafeProjection.create(sortingExpressions.map(_.child), outputAttributes) val mutablePair = new MutablePair[InternalRow, Null]() - iter.map(row => mutablePair.update(row.copy(), null)) + // Internally, RangePartitioner runs a job on the RDD that samples keys to compute + // partition bounds. To get accurate samples, we need to copy the mutable keys. + iter.map(row => mutablePair.update(projection(row).copy(), null)) } -implicit val ordering = new LazilyGeneratedOrdering(sortingExpressions, outputAttributes) +// Construct ordering on extracted sort key. +val orderingAttributes = sortingExpressions.zipWithIndex.map { case (ord, i) => + ord.copy(child = BoundReference(i, ord.dataType, ord.nullable)) +} +implicit val ordering: Ordering[InternalRow] = + new LazilyGeneratedOrdering(orderingAttributes) --- End diff -- yea, let's follow the previous style: https://github.com/apache/spark/pull/22961/files#diff-3ceee31a3da1b7c7132f666126fbL223 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22944: [SPARK-25942][SQL] Aggregate expressions shouldn'...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22944#discussion_r232905784 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala --- @@ -1556,6 +1556,20 @@ class DatasetSuite extends QueryTest with SharedSQLContext { df.where($"city".contains(new java.lang.Character('A'))), Seq(Row("Amsterdam"))) } + + test("SPARK-25942: typed aggregation on primitive type") { +val ds = Seq(1, 2, 3).toDS() + +val agg = ds.groupByKey(_ >= 2) + .agg(sum("value").as[Long], sum($"value" + 1).as[Long]) +assert(agg.collect() === Seq((false, 1, 2), (true, 5, 7))) + } + + test("SPARK-25942: typed aggregation on product type") { +val ds = Seq((1, 2), (2, 3), (3, 4)).toDS() +val agg = ds.groupByKey(x => x).agg(sum("_1").as[Long], sum($"_2" + 1).as[Long]) +assert(agg.collect().sorted === Seq(((1, 2), 1, 3), ((2, 3), 2, 4), ((3, 4), 3, 5))) --- End diff -- can we use `checkAnswer`/`CheckDataset`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23002: [SPARK-26003] Improve SQLAppStatusListener.aggregateMetr...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23002 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22977: [SPARK-26030][BUILD] Bump previousSparkVersion in...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22977#discussion_r232886260 --- Diff: project/MimaExcludes.scala --- @@ -164,7 +212,50 @@ object MimaExcludes { ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.validationIndicatorCol"), // [SPARK-23042] Use OneHotEncoderModel to encode labels in MultilayerPerceptronClassifier - ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ml.classification.LabelConverter") + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ml.classification.LabelConverter"), + +// [SPARK-21842][MESOS] Support Kerberos ticket renewal and creation in Mesos --- End diff -- these changes are cherry-picked from https://github.com/apache/spark/pull/23015 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23015: [SPARK-26029][BUILD][2.4] Bump previousSparkVersion in M...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23015 thanks, merging to 2.4! Since we have more violates in the master branch, I did not forward port it, and I'll cherry-pick it in another PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23015: [SPARK-26029][BUILD][2.4] Bump previousSparkVersi...
Github user cloud-fan closed the pull request at: https://github.com/apache/spark/pull/23015 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22518: [SPARK-25482][SQL] Avoid pushdown of subqueries t...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22518#discussion_r232737458 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala --- @@ -1268,4 +1269,16 @@ class SubquerySuite extends QueryTest with SharedSQLContext { assert(getNumSortsInQuery(query5) == 1) } } + + test("SPARK-25482: Reuse same Subquery in order to execute it only once") { +withTempView("t1", "t2") { + sql("create temporary view t1(a int) using parquet") + sql("create temporary view t2(b int) using parquet") + val plan = sql("select * from t2 where b > (select max(a) from t1)") --- End diff -- ah sorry I misread the code. Unless the subquery is rewritten into join, we must wait for all subqueries to be finished before executing the plan. We can rewrite scalar subquery in data source filters into literal, to make it work with the filter pushdown API. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22518: [SPARK-25482][SQL] Avoid pushdown of subqueries t...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22518#discussion_r232729788 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala --- @@ -1268,4 +1269,16 @@ class SubquerySuite extends QueryTest with SharedSQLContext { assert(getNumSortsInQuery(query5) == 1) } } + + test("SPARK-25482: Reuse same Subquery in order to execute it only once") { +withTempView("t1", "t2") { + sql("create temporary view t1(a int) using parquet") + sql("create temporary view t2(b int) using parquet") + val plan = sql("select * from t2 where b > (select max(a) from t1)") --- End diff -- > The subquery should be executed anyway sooner or later, right? Yes, but we could execute scan and subquery at the same time (2 spark jobs running together), instead of executing them serialized. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22518: [SPARK-25482][SQL] ReuseSubquery can be useless w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22518#discussion_r232720903 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala --- @@ -1268,4 +1269,16 @@ class SubquerySuite extends QueryTest with SharedSQLContext { assert(getNumSortsInQuery(query5) == 1) } } + + test("SPARK-25482: Reuse same Subquery in order to execute it only once") { +withTempView("t1", "t2") { + sql("create temporary view t1(a int) using parquet") + sql("create temporary view t2(b int) using parquet") + val plan = sql("select * from t2 where b > (select max(a) from t1)") --- End diff -- I think you are right about it, but it also means the data source scan must wait until the subquery is finished. We need to make tradeoffs carefully. I'd suggest we open a new ticket about scalar subquery filter pushdown to data source, and forbid it here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22944: [SPARK-25942][SQL] Aggregate expressions shouldn'...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22944#discussion_r232699342 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala --- @@ -1556,6 +1556,20 @@ class DatasetSuite extends QueryTest with SharedSQLContext { df.where($"city".contains(new java.lang.Character('A'))), Seq(Row("Amsterdam"))) } + + test("SPARK-25942: typed aggregation on primitive type") { +val ds = Seq(1, 2, 3).toDS() + +val agg = ds.groupByKey(_ >= 2) + .agg(sum("value").as[Long], sum($"value" + 1).as[Long]) --- End diff -- BTW, if we have to do it in analyzer, can we remove `TypedAggregateExpression.withInputInfo` and put all the logic in analyzer? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22944: [SPARK-25942][SQL] Aggregate expressions shouldn'...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22944#discussion_r232698607 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala --- @@ -1556,6 +1556,20 @@ class DatasetSuite extends QueryTest with SharedSQLContext { df.where($"city".contains(new java.lang.Character('A'))), Seq(Row("Amsterdam"))) } + + test("SPARK-25942: typed aggregation on primitive type") { +val ds = Seq(1, 2, 3).toDS() + +val agg = ds.groupByKey(_ >= 2) + .agg(sum("value").as[Long], sum($"value" + 1).as[Long]) --- End diff -- ah i see. Can you add a comment in `TypedColumn.withInputType` and say that untyped normal aggregate expressions are handled in the analyzer directly? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23002: [SPARK-26003] Improve SQLAppStatusListener.aggregateMetr...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23002 LGTM, also cc @gengliangwang --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22518: [SPARK-25482][SQL] ReuseSubquery can be useless w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22518#discussion_r232688360 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala --- @@ -1268,4 +1269,16 @@ class SubquerySuite extends QueryTest with SharedSQLContext { assert(getNumSortsInQuery(query5) == 1) } } + + test("SPARK-25482: Reuse same Subquery in order to execute it only once") { +withTempView("t1", "t2") { + sql("create temporary view t1(a int) using parquet") + sql("create temporary view t2(b int) using parquet") + val plan = sql("select * from t2 where b > (select max(a) from t1)") --- End diff -- Spark pushing filters doesn't mean the data source can always handle them and give perf improvement. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22518: [SPARK-25482][SQL] ReuseSubquery can be useless w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22518#discussion_r232687865 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala --- @@ -1268,4 +1269,16 @@ class SubquerySuite extends QueryTest with SharedSQLContext { assert(getNumSortsInQuery(query5) == 1) } } + + test("SPARK-25482: Reuse same Subquery in order to execute it only once") { +withTempView("t1", "t2") { + sql("create temporary view t1(a int) using parquet") + sql("create temporary view t2(b int) using parquet") + val plan = sql("select * from t2 where b > (select max(a) from t1)") --- End diff -- is there any data source can support subquery filter? for data source v1/v2, the public `Filter` API does not support subquery. For file source, they don't support subquery filter either. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22518: [SPARK-25482][SQL] ReuseSubquery can be useless w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22518#discussion_r232668569 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala --- @@ -1268,4 +1269,16 @@ class SubquerySuite extends QueryTest with SharedSQLContext { assert(getNumSortsInQuery(query5) == 1) } } + + test("SPARK-25482: Reuse same Subquery in order to execute it only once") { +withTempView("t1", "t2") { + sql("create temporary view t1(a int) using parquet") + sql("create temporary view t2(b int) using parquet") + val plan = sql("select * from t2 where b > (select max(a) from t1)") --- End diff -- Do we only have a problem when we have subquery in data source filter? If that's the case, I would suggest not pushdown subquery filter into data source. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22944: [SPARK-25942][SQL] Aggregate expressions shouldn'...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22944#discussion_r232666996 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala --- @@ -1556,6 +1556,20 @@ class DatasetSuite extends QueryTest with SharedSQLContext { df.where($"city".contains(new java.lang.Character('A'))), Seq(Row("Amsterdam"))) } + + test("SPARK-25942: typed aggregation on primitive type") { +val ds = Seq(1, 2, 3).toDS() + +val agg = ds.groupByKey(_ >= 2) + .agg(sum("value").as[Long], sum($"value" + 1).as[Long]) --- End diff -- looking at `KeyValueGroupedDataset.dataAttributes`, seems we do want to resolve things based on the plan below `AppendColumns`. And in `KeyValueGroupedDataset#aggUntyped`, we do resolve expressions based on `dataAttributes`. Do you why it doesn't work? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22944: [SPARK-25942][SQL] Aggregate expressions shouldn'...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22944#discussion_r232665302 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala --- @@ -1556,6 +1556,20 @@ class DatasetSuite extends QueryTest with SharedSQLContext { df.where($"city".contains(new java.lang.Character('A'))), Seq(Row("Amsterdam"))) } + + test("SPARK-25942: typed aggregation on primitive type") { +val ds = Seq(1, 2, 3).toDS() + +val agg = ds.groupByKey(_ >= 2) + .agg(sum("value").as[Long], sum($"value" + 1).as[Long]) --- End diff -- ah i see your point. For untyped API, `df.groupBy...agg...` produces one plan node, but the typed API `df.groupByKey...agg...` produces 2 plan nodes. makes sense to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23015: [BUILD][2.4] Bump previousSparkVersion in MimaBui...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23015#discussion_r232661121 --- Diff: project/MimaExcludes.scala --- @@ -105,7 +105,50 @@ object MimaExcludes { ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasValidationIndicatorCol.validationIndicatorCol"), // [SPARK-23042] Use OneHotEncoderModel to encode labels in MultilayerPerceptronClassifier - ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ml.classification.LabelConverter") + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ml.classification.LabelConverter"), + +// [SPARK-21842][MESOS] Support Kerberos ticket renewal and creation in Mesos + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.getDateOfNextUpdate"), + +// [SPARK-23366] Improve hot reading path in ReadAheadInputStream + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.io.ReadAheadInputStream.this"), + +// [SPARK-22941][CORE] Do not exit JVM when submit fails with in-process launcher. + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkSubmit.addJarToClasspath"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkSubmit.mergeFileLists"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment$default$2"), + +// Data Source V2 API changes --- End diff -- I tried to exclude them from mima check, but unfortunately the Unstable/Evolving/Stable annotations have a bug: they are not retained in the class file like `Experimental` annotation, so mima is not aware of them. I'll fix it in another PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23015: [BUILD][2.4] Bump previousSparkVersion in MimaBuild.scal...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/23015 cc @srowen @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23015: [BUILD][2.4] Bump previousSparkVersion in MimaBui...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/23015 [BUILD][2.4] Bump previousSparkVersion in MimaBuild.scala to be 2.3.0 ## What changes were proposed in this pull request? Although it's a little late, we should still update mima for branch 2.4, to avoid future breaking changes. Note that, when merging, we should forward port to master branch, so that the excluding rules is still in `v24excludes`. ## How was this patch tested? N/A You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark mima-2.4 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23015.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23015 commit 3d5e7b0ff8c23b51ae84d15f045a503954721d5d Author: Wenchen Fan Date: 2018-11-12T13:38:37Z Bump previousSparkVersion in MimaBuild.scala to be 2.3.0 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22200: [SPARK-25208][SQL] Loosen Cast.forceNullable for ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22200#discussion_r232572693 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala --- @@ -154,6 +154,15 @@ object Cast { fromPrecedence >= 0 && fromPrecedence < toPrecedence } + def canNullSafeCastToDecimal(from: DataType, to: DecimalType): Boolean = from match { +case from: BooleanType if to.isWiderThan(DecimalType.BooleanDecimal) => true +case from: NumericType if to.isWiderThan(from) => true +case from: DecimalType => + // truncating or precision lose + (to.precision - to.scale) > (from.precision - from.scale) --- End diff -- why it's not `>=` but `>`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21732: [SPARK-24762][SQL] Enable Option of Product encoders
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21732 LGTM except a few comments, good job! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22961: [SPARK-25947][SQL] Reduce memory usage in ShuffleExchang...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22961 do you have some benchmark numbers? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22961: [SPARK-25947][SQL] Reduce memory usage in ShuffleExchang...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22961 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22961: [SPARK-25947][SQL] Reduce memory usage in Shuffle...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22961#discussion_r232564430 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala --- @@ -214,13 +214,22 @@ object ShuffleExchangeExec { override def getPartition(key: Any): Int = key.asInstanceOf[Int] } case RangePartitioning(sortingExpressions, numPartitions) => -// Internally, RangePartitioner runs a job on the RDD that samples keys to compute -// partition bounds. To get accurate samples, we need to copy the mutable keys. +// Extract only fields used for sorting to avoid collecting large fields that does not +// affect sorting result when deciding partition bounds in RangePartitioner val rddForSampling = rdd.mapPartitionsInternal { iter => + val projection = +UnsafeProjection.create(sortingExpressions.map(_.child), outputAttributes) val mutablePair = new MutablePair[InternalRow, Null]() - iter.map(row => mutablePair.update(row.copy(), null)) + // Internally, RangePartitioner runs a job on the RDD that samples keys to compute + // partition bounds. To get accurate samples, we need to copy the mutable keys. + iter.map(row => mutablePair.update(projection(row).copy(), null)) } -implicit val ordering = new LazilyGeneratedOrdering(sortingExpressions, outputAttributes) +// Construct ordering on extracted sort key. +val orderingAttributes = sortingExpressions.zipWithIndex.map { case (ord, i) => + ord.copy(child = BoundReference(i, ord.dataType, ord.nullable)) +} +implicit val ordering: Ordering[InternalRow] = + new LazilyGeneratedOrdering(orderingAttributes) --- End diff -- style nit: this can be merged to the previous line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21732: [SPARK-24762][SQL] Enable Option of Product encod...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21732#discussion_r232563132 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala --- @@ -1556,6 +1547,69 @@ class DatasetSuite extends QueryTest with SharedSQLContext { df.where($"city".contains(new java.lang.Character('A'))), Seq(Row("Amsterdam"))) } + + test("SPARK-24762: Enable top-level Option of Product encoders") { +val data = Seq(Some((1, "a")), Some((2, "b")), None) +val ds = data.toDS() + +checkDataset( + ds, + data: _*) + +val schema = new StructType().add( + "value", + new StructType() +.add("_1", IntegerType, nullable = false) +.add("_2", StringType, nullable = true), + nullable = true) + +assert(ds.schema == schema) + +val nestedOptData = Seq(Some((Some((1, "a")), 2.0)), Some((Some((2, "b")), 3.0))) +val nestedDs = nestedOptData.toDS() + +checkDataset( + nestedDs, + nestedOptData: _*) + +val nestedSchema = StructType(Seq( + StructField("value", StructType(Seq( +StructField("_1", StructType(Seq( + StructField("_1", IntegerType, nullable = false), + StructField("_2", StringType, nullable = true, +StructField("_2", DoubleType, nullable = false) + )), nullable = true) +)) +assert(nestedDs.schema == nestedSchema) + } + + test("SPARK-24762: Resolving Option[Product] field") { +val ds = Seq((1, ("a", 1.0)), (2, ("b", 2.0))).toDS().as[(Int, Option[(String, Double)])] +checkDataset(ds, + (1, Some(("a", 1.0))), (2, Some(("b", 2.0 + } + + test("SPARK-24762: select Option[Product] field") { +val ds = Seq(("a", 1), ("b", 2), ("c", 3)).toDS() + .select(expr("struct(_2, _2 + 1)").as[Option[(Int, Int)]]) +checkDataset(ds, + Some((1, 2)), Some((2, 3)), Some((3, 4))) + } + + test("SPARK-24762: joinWith on Option[Product]") { +val ds1 = Seq(Some((1, 2)), Some((2, 3))).toDS().as("a") --- End diff -- ditto, let's test None --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21732: [SPARK-24762][SQL] Enable Option of Product encod...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21732#discussion_r232562929 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala --- @@ -1556,6 +1547,69 @@ class DatasetSuite extends QueryTest with SharedSQLContext { df.where($"city".contains(new java.lang.Character('A'))), Seq(Row("Amsterdam"))) } + + test("SPARK-24762: Enable top-level Option of Product encoders") { +val data = Seq(Some((1, "a")), Some((2, "b")), None) +val ds = data.toDS() + +checkDataset( + ds, + data: _*) + +val schema = new StructType().add( + "value", + new StructType() +.add("_1", IntegerType, nullable = false) +.add("_2", StringType, nullable = true), + nullable = true) + +assert(ds.schema == schema) + +val nestedOptData = Seq(Some((Some((1, "a")), 2.0)), Some((Some((2, "b")), 3.0))) +val nestedDs = nestedOptData.toDS() + +checkDataset( + nestedDs, + nestedOptData: _*) + +val nestedSchema = StructType(Seq( + StructField("value", StructType(Seq( +StructField("_1", StructType(Seq( + StructField("_1", IntegerType, nullable = false), + StructField("_2", StringType, nullable = true, +StructField("_2", DoubleType, nullable = false) + )), nullable = true) +)) +assert(nestedDs.schema == nestedSchema) + } + + test("SPARK-24762: Resolving Option[Product] field") { +val ds = Seq((1, ("a", 1.0)), (2, ("b", 2.0))).toDS().as[(Int, Option[(String, Double)])] +checkDataset(ds, + (1, Some(("a", 1.0))), (2, Some(("b", 2.0 + } + + test("SPARK-24762: select Option[Product] field") { +val ds = Seq(("a", 1), ("b", 2), ("c", 3)).toDS() + .select(expr("struct(_2, _2 + 1)").as[Option[(Int, Int)]]) --- End diff -- can we also test null values here? e.g. `if(_2 > 2, struct..., null)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21732: [SPARK-24762][SQL] Enable Option of Product encod...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21732#discussion_r232561288 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala --- @@ -1556,6 +1547,69 @@ class DatasetSuite extends QueryTest with SharedSQLContext { df.where($"city".contains(new java.lang.Character('A'))), Seq(Row("Amsterdam"))) } + + test("SPARK-24762: Enable top-level Option of Product encoders") { +val data = Seq(Some((1, "a")), Some((2, "b")), None) +val ds = data.toDS() + +checkDataset( + ds, + data: _*) + +val schema = new StructType().add( + "value", + new StructType() +.add("_1", IntegerType, nullable = false) +.add("_2", StringType, nullable = true), + nullable = true) + +assert(ds.schema == schema) + +val nestedOptData = Seq(Some((Some((1, "a")), 2.0)), Some((Some((2, "b")), 3.0))) +val nestedDs = nestedOptData.toDS() + +checkDataset( + nestedDs, + nestedOptData: _*) + +val nestedSchema = StructType(Seq( + StructField("value", StructType(Seq( +StructField("_1", StructType(Seq( + StructField("_1", IntegerType, nullable = false), + StructField("_2", StringType, nullable = true, +StructField("_2", DoubleType, nullable = false) + )), nullable = true) +)) +assert(nestedDs.schema == nestedSchema) + } + + test("SPARK-24762: Resolving Option[Product] field") { +val ds = Seq((1, ("a", 1.0)), (2, ("b", 2.0))).toDS().as[(Int, Option[(String, Double)])] --- End diff -- we can put a null value in the input data? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21732: [SPARK-24762][SQL] Enable Option of Product encod...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21732#discussion_r232560471 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala --- @@ -198,7 +189,7 @@ case class ExpressionEncoder[T]( val serializer: Seq[NamedExpression] = { val clsName = Utils.getSimpleName(clsTag.runtimeClass) -if (isSerializedAsStruct) { +if (isSerializedAsStruct && !isOptionType) { --- End diff -- if we always check them together, how about putting `!isOptionType` inside `isSerializedAsStruct`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22518: [SPARK-25482][SQL] ReuseSubquery can be useless w...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22518#discussion_r232558384 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala --- @@ -1268,4 +1269,16 @@ class SubquerySuite extends QueryTest with SharedSQLContext { assert(getNumSortsInQuery(query5) == 1) } } + + test("SPARK-25482: Reuse same Subquery in order to execute it only once") { +withTempView("t1", "t2") { + sql("create temporary view t1(a int) using parquet") + sql("create temporary view t2(b int) using parquet") + val plan = sql("select * from t2 where b > (select max(a) from t1)") --- End diff -- sorry it has been a long time and I don't quite remember the context. What was the problem we are trying to fix? This test looks nothing related to subquery reuse. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22887: [SPARK-25880][CORE] user set's hadoop conf should not ov...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22887 looks reasonable, cc @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22944: [SPARK-25942][SQL] Aggregate expressions shouldn'...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22944#discussion_r232556359 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala --- @@ -1556,6 +1556,20 @@ class DatasetSuite extends QueryTest with SharedSQLContext { df.where($"city".contains(new java.lang.Character('A'))), Seq(Row("Amsterdam"))) } + + test("SPARK-25942: typed aggregation on primitive type") { +val ds = Seq(1, 2, 3).toDS() + +val agg = ds.groupByKey(_ >= 2) + .agg(sum("value").as[Long], sum($"value" + 1).as[Long]) --- End diff -- I think we should not make decisions for users. For untyped APIs, users can refer the grouping columns in the aggregate expressions, I think the typed APIs should be same. For this particular case, currrently spark allows grouping columns inside aggregate functions, so the `value` here is indeed ambiguous. There is nothing we can do, but fail and ask users to add alias. BTW, we should check other databases and see if "grouping columns inside aggregate functions" should be allowed, --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22429: [SPARK-25440][SQL] Dumping query execution info to a fil...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22429 This is hard to review, do you mean we should add `maxFields: Option[Int]` to all the string related methods? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22976: [SPARK-25974][SQL]Optimizes Generates bytecode for order...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/22976 LGTM except one comment, cc @rednaxelafx --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org