[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22198#discussion_r212843948 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala --- @@ -191,6 +195,48 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { assert(plan2.collect { case p: BroadcastHashJoinExec => p }.size == 1) } + test("SPARK-25121 Supports multi-part names for broadcast hint resolution") { +val (table1Name, table2Name) = ("t1", "t2") +withTempDatabase { dbName => + withTable(table1Name, table2Name) { +withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") { + spark.range(50).write.saveAsTable(s"$dbName.$table1Name") + spark.range(100).write.saveAsTable(s"$dbName.$table2Name") + // First, makes sure a join is not broadcastable + val plan = sql(s"SELECT * FROM $dbName.$table1Name, $dbName.$table2Name " + + s"WHERE $table1Name.id = $table2Name.id") +.queryExecution.executedPlan + assert(plan.collect { case p: BroadcastHashJoinExec => p }.size == 0) + + // Uses multi-part table names for broadcast hints + def checkIfHintApplied(tableName: String, hintTableName: String): Unit = { --- End diff -- yea, I'll fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22204: [SPARK-25196][SQL] Analyze column statistics in cached q...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/22204 ping --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22198: [SPARK-25121][SQL] Supports multi-part table names for b...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/22198 Thanks, @dongjoon-hyun! I'll check and merge that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22198: [SPARK-25121][SQL] Supports multi-part table names for b...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/22198 aha, I see. IMO we need to apply the hint in the case, too. I'll fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21931: [SPARK-24978][SQL]Add spark.sql.fast.hash.aggregate.row....
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21931 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22232: [SPARK-25237][SQL]remove updateBytesReadWithFileS...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22232#discussion_r212796191 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala --- @@ -208,7 +199,6 @@ class FileScanRDD( override def close(): Unit = { updateBytesRead() -updateBytesReadWithFileSize() --- End diff -- aha, I see. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22232: [SPARK-25237][SQL]remove updateBytesReadWithFileSize bec...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/22232 I'm not sure we can test the case though, for example, how about the sequence below? ``` import org.apache.spark.TaskContext spark.range(10).selectExpr("id AS c0", "rand() AS c1").write.parquet("/tmp/t1") val df = spark.read.parquet("/tmp/t1") val fileScanRdd = df.repartition(1).queryExecution.executedPlan.children(0).children(0).execute() fileScanRdd.mapPartitions { part => println(s"Initial bytesRead=${TaskContext.get.taskMetrics().inputMetrics.bytesRead}") TaskContext.get.addTaskCompletionListener[Unit] { taskCtx => // Check if the metric is correct? println(s"Total bytesRead=${TaskContext.get.taskMetrics().inputMetrics.bytesRead}") } part }.collect ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22232: [SPARK-25237][SQL]remove updateBytesReadWithFileSize bec...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/22232 btw, can you clean up the title and the description..? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22232: [SPARK-25237][SQL]remove updateBytesReadWithFileSize bec...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/22232 Is it difficult to add tests for checking the metric in the case `select * from t limit 1`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22232: [SPARK-25237][SQL]remove updateBytesReadWithFileS...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22232#discussion_r212793049 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala --- @@ -208,7 +199,6 @@ class FileScanRDD( override def close(): Unit = { updateBytesRead() -updateBytesReadWithFileSize() --- End diff -- If we just remove this `updateBytesReadWithFileSize`, the issue in the description can be solved? We need to remove `updateBytesReadWithFileSize` in the line 142, too? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22198: [SPARK-25121][SQL] Supports multi-part table names for b...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/22198 @dilipbiswal @gatorsmile ping --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22204: [SPARK-25196][SQL] Analyze column statistics in cached q...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/22204 @gatorsmile ping --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22141: [SPARK-25154][SQL] Support NOT IN sub-queries ins...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22141#discussion_r212785260 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala --- @@ -137,13 +137,21 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { plan: LogicalPlan): (Option[Expression], LogicalPlan) = { var newPlan = plan val newExprs = exprs.map { e => - e transformUp { + e transformDown { case Exists(sub, conditions, _) => val exists = AttributeReference("exists", BooleanType, nullable = false)() // Deduplicate conflicting attributes if any. newPlan = dedupJoin( Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And))) exists +case (Not(InSubquery(values, ListQuery(sub, conditions, _, _ => + val exists = AttributeReference("exists", BooleanType, nullable = false)() --- End diff -- yea, it's ok to keep the current one. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22218: [SPARK-25228][CORE]Add executor CPU time metric.
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22218#discussion_r212784518 --- Diff: core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala --- @@ -73,6 +75,13 @@ class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0) } + // Dropwizard metrics gauge measuring the executor's process (JVM) CPU time. + // The value is returned in nanoseconds, the method return -1 if this operation is not supported. + val osMXBean = ManagementFactory.getOperatingSystemMXBean.asInstanceOf[OperatingSystemMXBean] + metricRegistry.register(MetricRegistry.name("executorCPUTime" ), new Gauge[Long] { +override def getValue: Long = osMXBean.getProcessCpuTime() --- End diff -- This metric is useful for users? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22227: [SPARK-25202] [Core] Implements split with limit ...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/7#discussion_r212783216 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala --- @@ -232,30 +232,41 @@ case class RLike(left: Expression, right: Expression) extends StringRegexExpress * Splits str around pat (pattern is a regular expression). */ @ExpressionDescription( - usage = "_FUNC_(str, regex) - Splits `str` around occurrences that match `regex`.", + usage = "_FUNC_(str, regex, limit) - Splits `str` around occurrences that match `regex`." + +"The `limit` parameter controls the number of times the pattern is applied and " + +"therefore affects the length of the resulting array. If the limit n is " + +"greater than zero then the pattern will be applied at most n - 1 times, " + +"the array's length will be no greater than n, and the array's last entry " + +"will contain all input beyond the last matched delimiter. If n is " + +"non-positive then the pattern will be applied as many times as " + +"possible and the array can have any length. If n is zero then the " + +"pattern will be applied as many times as possible, the array can " + +"have any length, and trailing empty strings will be discarded.", examples = """ Examples: - > SELECT _FUNC_('oneAtwoBthreeC', '[ABC]'); + > SELECT _FUNC_('oneAtwoBthreeC', '[ABC]', -1); ["one","two","three",""] +| > SELECT _FUNC_('oneAtwoBthreeC', '[ABC]', 2); + | ["one","twoBthreeC"] """) -case class StringSplit(str: Expression, pattern: Expression) - extends BinaryExpression with ImplicitCastInputTypes { +case class StringSplit(str: Expression, pattern: Expression, limit: Expression) --- End diff -- For test coverage, better to add tests in `string-functions.sql` for the two cases: two arguments and three arguments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22227: [SPARK-25202] [Core] Implements split with limit ...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/7#discussion_r212783068 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala --- @@ -2554,7 +2554,27 @@ object functions { * @since 1.5.0 */ def split(str: Column, pattern: String): Column = withExpr { -StringSplit(str.expr, lit(pattern).expr) +StringSplit(str.expr, lit(pattern).expr, lit(-1).expr) + } + + /** + * Splits str around pattern (pattern is a regular expression) up to `limit-1` times. + * + * The limit parameter controls the number of times the pattern is applied and therefore + * affects the length of the resulting array. If the limit n is greater than zero then the + * pattern will be applied at most n - 1 times, the array's length will be no greater than + * n, and the array's last entry will contain all input beyond the last matched delimiter. + * If n is non-positive then the pattern will be applied as many times as possible and the + * array can have any length. If n is zero then the pattern will be applied as many times as + * possible, the array can have any length, and trailing empty strings will be discarded. + * + * @note Pattern is a string representation of the regular expression. + * + * @group string_funcs + * @since 1.5.0 + */ + def split(str: Column, pattern: String, limit: Int): Column = withExpr { +StringSplit(str.expr, lit(pattern).expr, lit(limit).expr) --- End diff -- nit: better to directly use `Literal` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22227: [SPARK-25202] [Core] Implements split with limit ...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/7#discussion_r212782784 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala --- @@ -232,30 +232,41 @@ case class RLike(left: Expression, right: Expression) extends StringRegexExpress * Splits str around pat (pattern is a regular expression). */ @ExpressionDescription( - usage = "_FUNC_(str, regex) - Splits `str` around occurrences that match `regex`.", + usage = "_FUNC_(str, regex, limit) - Splits `str` around occurrences that match `regex`." + --- End diff -- Can you refine the description and the format along with the others, e.g., `RLike` https://github.com/apache/spark/blob/ceb3f41238c8731606164cea5c45a0b87bb5d6f2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala#L78 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22227: [SPARK-25202] [Core] Implements split with limit ...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/7#discussion_r212782576 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala --- @@ -2554,7 +2554,27 @@ object functions { * @since 1.5.0 */ def split(str: Column, pattern: String): Column = withExpr { -StringSplit(str.expr, lit(pattern).expr) +StringSplit(str.expr, lit(pattern).expr, lit(-1).expr) + } + + /** + * Splits str around pattern (pattern is a regular expression) up to `limit-1` times. --- End diff -- Drop `up to `limit-1` times` in the first line? That's because the behaviour depends on values described below. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22227: [SPARK-25202] [Core] Implements split with limit ...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/7#discussion_r212781563 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala --- @@ -2554,7 +2554,27 @@ object functions { * @since 1.5.0 */ def split(str: Column, pattern: String): Column = withExpr { -StringSplit(str.expr, lit(pattern).expr) +StringSplit(str.expr, lit(pattern).expr, lit(-1).expr) + } + + /** + * Splits str around pattern (pattern is a regular expression) up to `limit-1` times. + * + * The limit parameter controls the number of times the pattern is applied and therefore + * affects the length of the resulting array. If the limit n is greater than zero then the + * pattern will be applied at most n - 1 times, the array's length will be no greater than + * n, and the array's last entry will contain all input beyond the last matched delimiter. + * If n is non-positive then the pattern will be applied as many times as possible and the + * array can have any length. If n is zero then the pattern will be applied as many times as + * possible, the array can have any length, and trailing empty strings will be discarded. + * + * @note Pattern is a string representation of the regular expression. + * + * @group string_funcs + * @since 1.5.0 --- End diff -- `1.5.0` -> `2.4.0` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22227: [SPARK-25202] [Core] Implements split with limit sql fun...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/7 Can you add tests in `StringFunctionsSuite`, too? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22227: [SPARK-25202] [Core] Implements split with limit sql fun...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/7 @gatorsmile @ueshin can you trigger this test? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22227: [SPARK-25202] [Core] Implements split with limit sql fun...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/7 not `[CORE]` but `[SQL]` in the title. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22204: [SPARK-25196][SQL] Analyze column statistics in c...
GitHub user maropu opened a pull request: https://github.com/apache/spark/pull/22204 [SPARK-25196][SQL] Analyze column statistics in cached query ## What changes were proposed in this pull request? This pr proposed a new API to analyze column statistics in cached query. In common usecases, users read catalog table data, join/aggregate them, and then cache the result for following quries. But, the current optimization of the queries depends on non-existing or inaccurate column statistics of the cached data because we are only allowed to analyze column statistics in catalog tables via ANALYZE commands. To solve this issue, this pr added `analyzeColumnCacheQuery` in `CacheManager to analyze column statistics in already-cached query; ``` scala> spark.range(1000).selectExpr("id % 33 AS c0", "rand() AS c1", "0 AS c2").write.saveAsTable("t") scala> sql("ANALYZE TABLE t COMPUTE STATISTICS FOR COLUMNS c0, c1, c2") scala> val cacheManager = spark.sharedState.cacheManager scala> def printColumnStats(data: org.apache.spark.sql.DataFrame) = { | data.queryExecution.optimizedPlan.stats.attributeStats.foreach { | case (k, v) => println(s"[$k]: $v") | } | } scala> def df() = spark.table("t").groupBy("c0").agg(count("c1").as("v1"), sum("c2").as("v2")) // Prints column statistics in catalog table `t` scala> printColumnStats(spark.table("t")) [c0#7073L]: ColumnStat(Some(33),Some(0),Some(32),Some(0),Some(8),Some(8),Some(Histogram(3.937007874015748,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@209c0be5))) [c1#7074]: ColumnStat(Some(997),Some(5.958619423369615E-4),Some(0.9988009488973438),Some(0),Some(8),Some(8),Some(Histogram(3.937007874015748,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@4ef69c53))) [c2#7075]: ColumnStat(Some(1),Some(0),Some(0),Some(0),Some(4),Some(4),Some(Histogram(3.937007874015748,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@7cbaf548))) // Prints column statistics on query result `df` scala> printColumnStats(df()) [c0#7073L]: ColumnStat(Some(33),Some(0),Some(32),Some(0),Some(8),Some(8),Some(Histogram(3.937007874015748,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@209c0be5))) // Prints column statistics on cached data of `df` scala> printColumnStats(df().cache) // A new API described above scala> cacheManager.analyzeColumnCacheQuery(df(), "v1" :: "v2" :: Nil) // Then, prints again scala> printColumnStats(df()) [v1#7101L]: ColumnStat(Some(2),Some(30),Some(31),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@e2ff893))) [v2#7103L]: ColumnStat(Some(1),Some(0),Some(0),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@1498a4d))) scala> cacheManager.analyzeColumnCacheQuery(df(), "c0" :: Nil) scala> printColumnStats(df()) [v1#7101L]: ColumnStat(Some(2),Some(30),Some(31),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@e2ff893))) [v2#7103L]: ColumnStat(Some(1),Some(0),Some(0),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@1498a4d))) [c0#7073L]: ColumnStat(Some(33),Some(0),Some(32),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@626bcfc8))) ``` ## How was this patch tested? Added tests in `CachedTableSuite`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/maropu/spark SPARK-25196 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22204.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22204 commit fcc53c71c3d623a559e499e1148efc54d0e6 Author: Takeshi Yamamuro Date: 2018-08-22T11:59:29Z Fix --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22198#discussion_r212249623 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala --- @@ -191,6 +195,39 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { assert(plan2.collect { case p: BroadcastHashJoinExec => p }.size == 1) } + test("SPARK-25121 Supports multi-part names for broadcast hint resolution") { --- End diff -- Would it be better to move the three tests below into `DataFrameHintSuite`? - test("broadcast join hint using broadcast function") - test("broadcast join hint using Dataset.hint") - test("SPARK-25121 Supports multi-part names for broadcast hint resolution") --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22198: [SPARK-25121][SQL] Supports multi-part table name...
GitHub user maropu opened a pull request: https://github.com/apache/spark/pull/22198 [SPARK-25121][SQL] Supports multi-part table names for broadcast hint resolution ## What changes were proposed in this pull request? This pr fixed code to respect a database name for broadcast table hint resolution. Currently, spark ignores a database name in multi-part names; ``` scala> sql("CREATE DATABASE testDb") scala> spark.range(10).write.saveAsTable("testDb.t") // without this patch scala> spark.range(10).join(spark.table("testDb.t"), "id").hint("broadcast", "testDb.t").explain == Physical Plan == *(2) Project [id#24L] +- *(2) BroadcastHashJoin [id#24L], [id#26L], Inner, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) : +- *(1) Range (0, 10, step=1, splits=4) +- *(2) Project [id#26L] +- *(2) Filter isnotnull(id#26L) +- *(2) FileScan parquet testdb.t[id#26L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-2.3.1-bin-hadoop2.7/spark-warehouse..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct // with this patch scala> spark.range(10).join(spark.table("testDb.t"), "id").hint("broadcast", "testDb.t").explain == Physical Plan == *(2) Project [id#3L] +- *(2) BroadcastHashJoin [id#3L], [id#5L], Inner, BuildRight :- *(2) Range (0, 10, step=1, splits=4) +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])) +- *(1) Project [id#5L] +- *(1) Filter isnotnull(id#5L) +- *(1) FileScan parquet testdb.t[id#5L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-master/spark-warehouse/testdb.db/t], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct ``` ## How was this patch tested? Added tests in `DataFrameJoinSuite`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/maropu/spark SPARK-25121 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22198.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22198 commit d2be6920ba1cc052e9d5d8364cf48375cea8ba44 Author: Takeshi Yamamuro Date: 2018-08-23T07:20:51Z Fix --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22153: [SPARK-23034][SQL] Show RDD/relation names in RDD/In-Mem...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/22153 my bad, this pr doesn't affect cache tables in webui. I'll drop these. Actually, this affects hive tables and rdds only; ``` >> Hive table case sql("CREATE TABLE t(c1 int) USING hive") sql("INSERT INTO t VALUES(1)") spark.table("t").show() >> RDD case import org.apache.spark.sql._ import org.apache.spark.sql.types._ val data = spark.sparkContext.parallelize(Row(1, "abc") :: Nil).setName("existing RDD1") val df = spark.createDataFrame(data, StructType.fromDDL("c0 int, c1 string")) df.show() ``` > spark-v2.3.1 for hive tables https://user-images.githubusercontent.com/692303/44500677-cb55d180-a6c4-11e8-97e9-25b88b351b0a.png;> > master w/this pr for hive tables https://user-images.githubusercontent.com/692303/44500676-cb55d180-a6c4-11e8-9602-1cfbea6d8267.png;> > spark-v2.3.1 for rdds https://user-images.githubusercontent.com/692303/44500731-05bf6e80-a6c5-11e8-83dd-ed7f1ab2d658.png;> > master w/this pr for rdds https://user-images.githubusercontent.com/692303/44500741-11ab3080-a6c5-11e8-8c18-e1cc66be0f09.png;> --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to handle...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/20345 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to handle...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/20345 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21770: [SPARK-24806][SQL] Brush up generated code so that JDK c...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21770 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to handle...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/20345 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22154: [SPARK-23711][SPARK-25140][SQL] Catch correct exceptions...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/22154 (it it jus a note) btw, currently, if expr codegen fails, the many error messages could happen in both a driver side and executor sides. I feel this is a little noisy for users. I think it'd be super nice if we could validate if all the expr codegen can works well in a driver side in a similar way of `WholeStageCodegen`. I don't have a nice idea now though... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22141: [SPARK-25154][SQL] Support NOT IN sub-queries ins...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22141#discussion_r211801302 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala --- @@ -137,13 +137,21 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { plan: LogicalPlan): (Option[Expression], LogicalPlan) = { var newPlan = plan val newExprs = exprs.map { e => - e transformUp { + e transformDown { case Exists(sub, conditions, _) => val exists = AttributeReference("exists", BooleanType, nullable = false)() // Deduplicate conflicting attributes if any. newPlan = dedupJoin( Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And))) exists +case (Not(InSubquery(values, ListQuery(sub, conditions, _, _ => + val exists = AttributeReference("exists", BooleanType, nullable = false)() --- End diff -- nit: There are duplicate codes in `case (Not(InSubqyer...` and `case InSubquery...`. Can we make a simple helper method to remove these? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to handle...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/20345 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22154: [SPARK-23711][SPARK-25140][SQL] Catch correct exc...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22154#discussion_r211792988 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallback.scala --- @@ -63,7 +49,10 @@ abstract class CodeGeneratorWithInterpretedFallback[IN, OUT] { try { createCodeGeneratedObject(in) } catch { - case CodegenError(_) => createInterpretedObject(in) + case _: Exception => +// We should have already seen the error message in `CodeGenerator` +logWarning("Expr codegen error and falling back to interpreter mode") --- End diff -- + 1 to keep the current message. `CodeGenerator` already has printed many infos for errors. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22154: [SPARK-23711][SPARK-25140][SQL] Catch correct exc...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22154#discussion_r211787035 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallback.scala --- @@ -63,7 +49,10 @@ abstract class CodeGeneratorWithInterpretedFallback[IN, OUT] { try { createCodeGeneratedObject(in) } catch { - case CodegenError(_) => createInterpretedObject(in) + case _: Exception => --- End diff -- ok. better to fix the WholeStageCodegenExec, too? https://github.com/apache/spark/blob/60af2501e1afc00192c779f2736a4e3de12428fa/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L585 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22154: [SPARK-23711][SPARK-25140][SQL] Catch correct exceptions...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/22154 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22153: [SPARK-23034][SQL] Show RDD/relation names in RDD/In-Mem...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/22153 @gatorsmile @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22163: [SPARK-25166][CORE]Reduce the number of write operations...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/22163 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22168: [SPARK-24985][SQL][WIP] Fix OOM in Full Outer Joi...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22168#discussion_r211577003 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -1099,7 +,7 @@ private class SortMergeFullOuterJoinScanner( def advanceNext(): Boolean = { // If we already buffered some matching rows, use them directly -if (leftIndex <= leftMatches.size || rightIndex <= rightMatches.size) { +if (leftIndex <= leftMatches.length || rightIndex <= rightMatches.length) { --- End diff -- Why did you change size -> length? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITIO...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/19691 @DazhuangSu Can you resolve the conflict? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22154: [SPARK-23711][SPARK-25140][SQL] Catch correct exceptions...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/22154 @rednaxelafx Thanks for your checks ;) addressed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22154: [SPARK-23711][SPARK-25140][SQL] Catch correct exc...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22154#discussion_r211565872 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala --- @@ -40,4 +55,13 @@ class CodeGeneratorWithInterpretedFallbackSuite extends SparkFunSuite with PlanT assert(obj.isInstanceOf[InterpretedUnsafeProjection]) } } + + test("fallback to the interpreter mode") { +val input = Seq(IntegerType).zipWithIndex.map(x => BoundReference(x._2, x._1, true)) +val fallback = CodegenObjectFactoryMode.FALLBACK.toString +withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> fallback) { + val obj = FailedCodegenProjection.createObject(input) + assert(obj.isInstanceOf[InterpretedUnsafeProjection]) --- End diff -- yea, we could. If other reviewers say +1, I'll update (either's fine to me). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22154: [SPARK-23711][SPARK-25140][SQL] Catch correct exc...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22154#discussion_r211565120 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala --- @@ -40,4 +55,13 @@ class CodeGeneratorWithInterpretedFallbackSuite extends SparkFunSuite with PlanT assert(obj.isInstanceOf[InterpretedUnsafeProjection]) } } + + test("fallback to the interpreter mode") { +val input = Seq(IntegerType).zipWithIndex.map(x => BoundReference(x._2, x._1, true)) --- End diff -- ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17400: [SPARK-19981][SQL] Respect aliases in output part...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/17400#discussion_r211561522 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala --- @@ -321,6 +321,58 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { } } + private def updatePartitioningByAliases(exprs: Seq[NamedExpression], partioning: Partitioning) +: Partitioning = { +val aliasSeq = exprs.flatMap(_.collectFirst { --- End diff -- This pr only focuses on aliases, so the point you described above is out-of-scope in this pr. IMO more complicated cases should be fixed in follow-ups. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22153: [SPARK-23034][SQL] Show RDD/relation names in RDD/In-Mem...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/22153 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22154: [SPARK-23711][SPARK-25140][SQL] Catch correct exc...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22154#discussion_r211451705 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala --- @@ -180,7 +180,10 @@ object UnsafeProjection try { GenerateUnsafeProjection.generate(unsafeExprs, subexpressionEliminationEnabled) } catch { - case CodegenError(_) => InterpretedUnsafeProjection.createProjection(unsafeExprs) + case _: Exception => +// We should have already see error message in `CodeGenerator` +logError("Expr codegen error and falls back to interpreter mode") --- End diff -- oh, I missed... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to handle...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/20345 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22154: [SPARK-23711][SPARK-25140][SQL] Catch correct exc...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22154#discussion_r211435227 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallback.scala --- @@ -63,7 +49,10 @@ abstract class CodeGeneratorWithInterpretedFallback[IN, OUT] { try { createCodeGeneratedObject(in) } catch { - case CodegenError(_) => createInterpretedObject(in) + case _: Exception => +// We should have already see error message in `CodeGenerator` +logError("Expr codegen disabled and falls back to the interpreter mode") --- End diff -- ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22153: [SPARK-23034][SQL] Show RDD/relation names in RDD/In-Mem...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/22153 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22154: [SPARK-23711][SPARK-25140][SQL] Catch correct exceptions...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/22154 cc: @gatorsmile @cloud-fan @viirya @rednaxelafx --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22154: [SPARK-23711][SPARK-25140][SQL] Catch correct exceptions...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/22154 IIUC `CacheLoader` throws `ExecutionException ` when `CodeGenerator.doCompile` throws `InternalCompilerException` or `CompileException`; https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L1305 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22154: [SPARK-23711][SPARK-25140][SQL] Catch correct exc...
GitHub user maropu opened a pull request: https://github.com/apache/spark/pull/22154 [SPARK-23711][SPARK-25140][SQL] Catch correct exceptions when expr codegen fails ## What changes were proposed in this pull request? This pr is to fix bugs when expr codegen fails; we need to catch `java.util.concurrent.ExecutionException` instead of `InternalCompilerException` and `CompileException` . This handling is the same with the `WholeStageCodegenExec ` one: https://github.com/apache/spark/blob/60af2501e1afc00192c779f2736a4e3de12428fa/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L585 ## How was this patch tested? Added tests in `CodeGeneratorWithInterpretedFallbackSuite` You can merge this pull request into a Git repository by running: $ git pull https://github.com/maropu/spark SPARK-25140 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22154.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22154 commit b31dd28afbdd8d1078d9acf7785c18dc55afd9c2 Author: Takeshi Yamamuro Date: 2018-08-20T15:11:45Z Fix --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22153: [SPARK-23034][SQL] Show RDD/relation names in RDD...
GitHub user maropu opened a pull request: https://github.com/apache/spark/pull/22153 [SPARK-23034][SQL] Show RDD/relation names in RDD/In-Memory/Hive table scan nodes ## What changes were proposed in this pull request? This pr proposed to show RDD/relation names in RDD/In-Memory/Hive table scan nodes. This change made these names show up in the webUI and explain results. For example; ``` scala> Seq((1, 2)).toDF("a", "b").write.saveAsTable("t") scala> spark.catalog.cacheTable("t") scala> spark.table("t").explain() == Physical Plan == *(1) Scan in-memory t [a#11, b#12] ^^^ +- InMemoryRelation [a#11, b#12], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) FileScan parquet default.t[a#11,b#12] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/maropu/Repositories/spark/spark-master/spark-warehouse/t], PartitionFilters: [], PushedFilters: [], ReadSchema: struct ``` https://user-images.githubusercontent.com/692303/44336267-ef8d9480-a4b1-11e8-8b0b-25df55aa2208.png;> Closes #20226 ## How was this patch tested? Added tests in `DataFrameSuite`, `DatasetSuite`, and `HiveExplainSuite` You can merge this pull request into a Git repository by running: $ git pull https://github.com/maropu/spark pr20226 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22153.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22153 commit edb998387ee477fb54b2939d712ba3961ebc42c3 Author: Tejas Patil Date: 2018-01-11T00:11:06Z [SPARK-23034][Hive][UI] Display tablename for `HiveTableScan` node in UI commit 3de116a3b99e62bd12c2762641a0c5bd88a53977 Author: Takeshi Yamamuro Date: 2018-08-20T04:03:14Z Fix --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20226: [SPARK-23034][SQL] Override `nodeName` for all *ScanExec...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/20226 sure, will do, too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17400: [SPARK-19981][SQL] Update output partitioning info. when...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/17400 ok, thanks. I'll resume this work after the freeze. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17400: [SPARK-19981][SQL] Update output partitioning info. when...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/17400 If possible, could you describe that problem in your case to encourage this work? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17400: [SPARK-19981][SQL] Update output partitioning info. when...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/17400 I think that's because the priority is not much high. This issue causes any problem in your query? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22008: [SPARK-24928][SQL] Optimize cross join according ...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22008#discussion_r208802495 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -158,8 +158,9 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) ConvertToLocalRelation, PropagateEmptyRelation) :+ // The following batch should be executed after batch "Join Reorder" and "LocalRelation". -Batch("Check Cartesian Products", Once, - CheckCartesianProducts) :+ +Batch("Check and Optimize Cartesian Products", Once, + CheckCartesianProducts, + ReorderCrossJoinOperands) :+ --- End diff -- IMO this optimization should be located in `CostBasedJoinReorder` or `ReorderJoin`. Why is this an independent rule? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22036: [SPARK-25028][SQL] Avoid NPE when analyzing parti...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22036#discussion_r208795446 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala --- @@ -204,6 +204,24 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } } + test("SPARK-25028: column stats collection for null partitioning columns") { +val table = "analyze_partition_with_null" +withTempDir { dir => + withTable(table) { +sql(s""" + |CREATE TABLE $table (name string, value string) + |USING PARQUET + |PARTITIONED BY (name) + |LOCATION '${dir.toURI}'""".stripMargin) +val df = Seq(("a", null), ("b", null)).toDF("value", "name") --- End diff -- super nit: better to add a non-null partition value, e.g., `val df = Seq(("a", null), ("b", null), ("c", "1")).toDF("value", "name")`? btw, why is this a reverse column order (not "name", "value", but "value", "name")? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21860: [SPARK-24901][SQL]Merge the codegen of RegularHashMap an...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21860 btw, we still need the impl. of the vectorized hash map (the comment says this is for test and benchmark only) in future releases? @hvanhovell @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21860: [SPARK-24901][SQL]Merge the codegen of RegularHas...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21860#discussion_r208788502 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala --- @@ -232,6 +232,25 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { } } + test("SPARK-24901 check merge FastHashMap and RegularHashMap generate code max size") { +var twoLevelMaxCodeSize: Int = 0 +val caseNumber = 80 +// merge fastHashMap and regularHashMap generate code max size +val codeWithLongFunctions = genGroupByCode(caseNumber) +val (_, maxCodeSize) = CodeGenerator.compile(codeWithLongFunctions) + +// master fastHashMap and regularHashMap generate code max size +withSQLConf("spark.sql.codegen.aggregate.map.twolevel.enabled" -> "true", + "spark.sql.codegen.aggregate.map.vectorized.enable" -> "true") { + val codeWithLongFunction1 = genGroupByCode(caseNumber) + val (_, maxCodeSize1) = CodeGenerator.compile(codeWithLongFunction1) + // maxCodeSize1: 27062 + twoLevelMaxCodeSize = maxCodeSize1 +} + +assert(2 * maxCodeSize < twoLevelMaxCodeSize) + } --- End diff -- We need this test? I think it's ok to pass the existing tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21868: [SPARK-24906][SQL] Adaptively enlarge split / partition ...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21868 Is this a parquet-specific issue? e.g., how about ORC? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22018: [SPARK-25038][SQL] Accelerate Spark Plan generation when...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/22018 Can you narrow down the title and description? I thinks the current one is obscure.. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22018: [SPARK-25038][SQL] Accelerate Spark Plan generati...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22018#discussion_r208783652 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala --- @@ -297,7 +297,7 @@ object InMemoryFileIndex extends Logging { val missingFiles = mutable.ArrayBuffer.empty[String] val filteredLeafStatuses = allLeafStatuses.filterNot( status => shouldFilterOut(status.getPath.getName)) -val resolvedLeafStatuses = filteredLeafStatuses.flatMap { +val resolvedLeafStatuses = filteredLeafStatuses.par.flatMap { --- End diff -- btw, is this a right approach? I a little confuse this with the current parallel partition discovery path.. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21860: [SPARK-24901][SQL]Merge the codegen of RegularHas...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21860#discussion_r207809333 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala --- @@ -232,6 +232,23 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { } } + test("SPARK-24901 check merge FastHashMap and RegularHashMap generate code max size") { +val caseNumber = 80 +// merge fastHashMap and regularHashMap generate code max size +val codeWithLongFunctions = genGroupByCode(caseNumber) +val (_, maxCodeSize) = CodeGenerator.compile(codeWithLongFunctions) +assert(maxCodeSize < 13500) --- End diff -- What does `13500` means? The current max code size? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21931: [SPARK-24978][SQL]Add spark.sql.fast.hash.aggregate.row....
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21931 cc: @cloud-fan @hvanhovell --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21931: [SPARK-24978][SQL]Add spark.sql.fast.hash.aggrega...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21931#discussion_r207802603 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1437,6 +1437,15 @@ object SQLConf { .intConf .createWithDefault(20) + val FAST_HASH_AGGREGATE_MAX_ROWS_CAPACITY_BIT = +buildConf("spark.sql.fast.hash.aggregate.row.max.capacity.bit") + .internal() + .doc("Capacity for the max number of rows to be held in memory by the fast hash aggregate " + +"product operator (e.g: configuration 16 capacity size is 65536).") + .intConf + .checkValue(bit => bit >= 1 && bit <= 30, "The bit value must be in [1, 30].") --- End diff -- We need to accept these small values, e.g., 2^1, 2^2, ..? I think these are meaningless... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r207758427 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala --- @@ -1647,6 +1647,60 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { assert(result10.first.schema(0).dataType === expectedType10) } + test("array_intersect functions") { +val df1 = Seq((Array(1, 2, 4), Array(4, 2))).toDF("a", "b") +val ans1 = Row(Seq(2, 4)) +checkAnswer(df1.select(array_intersect($"a", $"b")), ans1) +checkAnswer(df1.selectExpr("array_intersect(a, b)"), ans1) + +val df2 = Seq((Array[Integer](1, 2, null, 4, 5), Array[Integer](-5, 4, null, 2, -1))) + .toDF("a", "b") +val ans2 = Row(Seq(2, null, 4)) +checkAnswer(df2.select(array_intersect($"a", $"b")), ans2) +checkAnswer(df2.selectExpr("array_intersect(a, b)"), ans2) + +val df3 = Seq((Array(1L, 2L, 4L), Array(4L, 2L))).toDF("a", "b") +val ans3 = Row(Seq(2L, 4L)) +checkAnswer(df3.select(array_intersect($"a", $"b")), ans3) +checkAnswer(df3.selectExpr("array_intersect(a, b)"), ans3) + +val df4 = Seq( + (Array[java.lang.Long](1L, 2L, null, 4L, 5L), Array[java.lang.Long](-5L, 4L, null, 2L, -1L))) + .toDF("a", "b") +val ans4 = Row(Seq(2L, null, 4L)) +checkAnswer(df4.select(array_intersect($"a", $"b")), ans4) +checkAnswer(df4.selectExpr("array_intersect(a, b)"), ans4) + +val df5 = Seq((Array("c", null, "a", "f"), Array("b", "a", null, "g"))).toDF("a", "b") +val ans5 = Row(Seq(null, "a")) +checkAnswer(df5.select(array_intersect($"a", $"b")), ans5) +checkAnswer(df5.selectExpr("array_intersect(a, b)"), ans5) + +val df6 = Seq((null, null)).toDF("a", "b") +intercept[AnalysisException] { --- End diff -- Could you also check the error message? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21608: [SPARK-24626] [SQL] Improve location size calculation in...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21608 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22002: [FOLLOW-UP][SPARK-23772][SQL] Provide an option to ignor...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/22002 LGTM cc: @HyukjinKwon --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22002: [FOLLOW-UP][SPARK-23772][SQL] Provide an option t...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22002#discussion_r207754359 --- Diff: python/pyspark/sql/readwriter.py --- @@ -267,7 +267,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat, timestampFormat=timestampFormat, multiLine=multiLine, allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep, -samplingRatio=samplingRatio, encoding=encoding) +samplingRatio=samplingRatio, dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding) --- End diff -- oh... good catch. thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to handle...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/20345 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21668: [SPARK-24690][SQL] Add a new config to control plan stat...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21668 @cloud-fan ping --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to handle...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/20345 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21964: [SPARK-24788][SQL] RelationalGroupedDataset.toString wit...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21964 @gatorsmile ping --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21933: [SPARK-24917][CORE] make chunk size configurable
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21933 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21964: [SPARK-24788][SQL] RelationalGroupedDataset.toString wit...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21964 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21754: [SPARK-24705][SQL] ExchangeCoordinator broken whe...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21754#discussion_r207207259 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala --- @@ -83,16 +83,17 @@ import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan} * - post-shuffle partition 3: pre-shuffle partition 3 and 4 (size 50 MB) */ class ExchangeCoordinator( -numExchanges: Int, advisoryTargetPostShuffleInputSize: Long, minNumPostShufflePartitions: Option[Int] = None) extends Logging { // The registered Exchange operators. private[this] val exchanges = ArrayBuffer[ShuffleExchangeExec]() + private[this] lazy val numExchanges = exchanges.size --- End diff -- ok, I'll update --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21969: [SPARK-24945][SQL] Switching to uniVocity 2.7.3
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21969 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21754: [SPARK-24705][SQL] ExchangeCoordinator broken whe...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21754#discussion_r207195382 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala --- @@ -83,16 +83,17 @@ import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan} * - post-shuffle partition 3: pre-shuffle partition 3 and 4 (size 50 MB) */ class ExchangeCoordinator( -numExchanges: Int, advisoryTargetPostShuffleInputSize: Long, minNumPostShufflePartitions: Option[Int] = None) extends Logging { // The registered Exchange operators. private[this] val exchanges = ArrayBuffer[ShuffleExchangeExec]() + private[this] lazy val numExchanges = exchanges.size --- End diff -- ya, we can do so, But, I used `lazy val` there because IMO that made us easily notice some bugs about an illegal method call order. For example, all the exchange should be registered before `ExchangeCoordinator.postShuffleRDD` called first time. If a new exchange is wrongly registered after `ExchangeCoordinator.postShuffleRDD` called, the assertion fails. WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21754: [SPARK-24705][SQL] ExchangeCoordinator broken whe...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21754#discussion_r207192928 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala --- @@ -117,10 +118,6 @@ class ExchangeCoordinator( */ def estimatePartitionStartIndices( mapOutputStatistics: Array[MapOutputStatistics]): Array[Int] = { -// If we have mapOutputStatistics.length < numExchange, it is because we do not submit -// a stage when the number of partitions of this dependency is 0. -assert(mapOutputStatistics.length <= numExchanges) --- End diff -- To pass the existing tests for `estimatePartitionStartIndices`, e.g., https://github.com/apache/spark/pull/21754/files#diff-3cd46a3f60c5352282bd3f2c9efff7fcR61. As another approach, we might add a dummy `ShuffleExchange` in `ExhcnageCoordinator` there. But, building `ShuffleExchange` is troublesome in the test suite without `SharedSparkSession`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21964: [SPARK-24788][SQL] RelationalGroupedDataset.toString wit...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21964 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21931: [SPARK-24978][SQL]Add spark.sql.fast.hash.aggregate.row....
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21931 What does the benchmark result suggest? The result should be `1048576` by default? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21754: [SPARK-24705][SQL] ExchangeCoordinator broken when dupli...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21754 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21754: [SPARK-24705][SQL] ExchangeCoordinator broken whe...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21754#discussion_r207178639 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala --- @@ -83,16 +83,17 @@ import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan} * - post-shuffle partition 3: pre-shuffle partition 3 and 4 (size 50 MB) */ class ExchangeCoordinator( -numExchanges: Int, advisoryTargetPostShuffleInputSize: Long, minNumPostShufflePartitions: Option[Int] = None) extends Logging { // The registered Exchange operators. private[this] val exchanges = ArrayBuffer[ShuffleExchangeExec]() --- End diff -- In the current fix, this `exchanges` doesn't already have reused exchanges, e.g., in the example case described above, `exchanges.size` already has been `1`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21944: [SPARK-24988][SQL]Add a castBySchema method which...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21944#discussion_r207160479 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -1367,6 +1367,22 @@ class Dataset[T] private[sql]( }: _*) } + /** + * Casts all the values of the current Dataset following the types of a specific StructType. + * This method works also with nested structTypes. + * + * @group typedrel + * @since 2.4.0 + */ + def castBySchema(schema: StructType): DataFrame = { + assert(schema.fields.map(_.name).toList.sameElements(this.schema.fields.map(_.name).toList), + "schema should have the same fields as the original schema") + +selectExpr(schema.map( --- End diff -- -1 (I think it is a pretty sensitive issue to add a new api in `Dataset`) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21957: [SPARK-24994][SQL] When the data type of the fiel...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21957#discussion_r207155086 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala --- @@ -436,8 +436,9 @@ object DataSourceStrategy { * * @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`. */ - protected[sql] def translateFilter(predicate: Expression): Option[Filter] = { -predicate match { + protected[sql] def translateFilter(predicate: Expression, + filterCast: Boolean = true): Option[Filter] = { --- End diff -- nit: style issue (you'd be better to check the style in other places again); ``` protected[sql] def translateFilter( predicate: Expression, filterCast: Boolean = true): Option[Filter] = { ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21754: [SPARK-24705][SQL] ExchangeCoordinator broken when dupli...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21754 How about the fix based on [the suggestion](https://github.com/apache/spark/pull/21754/commits/f961760f64ceabd582bd78fd2b383f1405988816)? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21754: [SPARK-24705][SQL] ExchangeCoordinator broken when dupli...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21754 yea, I think that is another approach to fix this issue. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21964: [SPARK-24788][SQL] RelationalGroupedDataset.toString wit...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21964 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21941: [SPARK-24966][SQL] Implement precedence rules for set op...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21941 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21941: [SPARK-24966][SQL] Implement precedence rules for set op...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21941 no idea, but `HiveClientSuites` seems flaky: https://issues.apache.org/jira/browse/SPARK-23622 (the error message is different though...) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21754: [SPARK-24705][SQL] ExchangeCoordinator broken whe...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21754#discussion_r207116871 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala --- @@ -278,6 +278,25 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { try f(spark) finally spark.stop() } + def withSparkSession(pairs: (String, String)*)(f: SparkSession => Unit): Unit = { --- End diff -- ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21754: [SPARK-24705][SQL] ExchangeCoordinator broken when dupli...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21754 Oh, my bad. I just wanted to say; `EnsureRequirements ` sets `2` in ExchangeCoordinator, then the number changes from `2` to `1`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21892: [SPARK-24945][SQL] Switching to uniVocity 2.7.2
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21892 Also, can you update the description? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21963: [SPARK-21274][FOLLOW-UP][SQL] Enable support of MINUS AL...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21963 Probably, IMO we need a new jira for this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21964: [SPARK-24788][SQL] RelationalGroupedDataset.toString wit...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21964 @gatorsmile `KeyValueGroupedDataset` has the same issue? It seems there is no chance for `KeyValueGroupedDataset` to have unresolved exprs. https://github.com/apache/spark/pull/21752#discussion_r204883667 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21964: [SPARK-24788][SQL] RelationalGroupedDataset.toStr...
GitHub user maropu opened a pull request: https://github.com/apache/spark/pull/21964 [SPARK-24788][SQL] RelationalGroupedDataset.toString with unresolved exprs should not fail ## What changes were proposed in this pull request? In the current master, `toString` throws an exception when `RelationalGroupedDataset` has unresolved expressions; ``` scala> spark.range(0, 10).groupBy("id") res4: org.apache.spark.sql.RelationalGroupedDataset = RelationalGroupedDataset: [grouping expressions: [id: bigint], value: [id: bigint], type: GroupBy] scala> spark.range(0, 10).groupBy('id) org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'id at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105) at org.apache.spark.sql.RelationalGroupedDataset$$anonfun$12.apply(RelationalGroupedDataset.scala:474) at org.apache.spark.sql.RelationalGroupedDataset$$anonfun$12.apply(RelationalGroupedDataset.scala:473) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.RelationalGroupedDataset.toString(RelationalGroupedDataset.scala:473) at scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:332) at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:337) at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:345) ``` Closes #21752 ## How was this patch tested? Added tests in `DataFrameAggregateSuite`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/maropu/spark SPARK-24788 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21964.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21964 commit 465e7624073016d01ae6d3c5df501bf9b2c6410b Author: Chris Horn Date: 2018-07-11T21:25:26Z SPARK-24788 failing test case commit e995b0bf2824593532056ff0048e65e8a33e5aad Author: Chris Horn Date: 2018-07-11T21:25:54Z SPARK-24788 fixed UnresolvedException when toString an unresolved grouping expression commit 5213635d595f76261a8387e5a5135ebd9bcfa8d9 Author: Chris Horn Date: 2018-07-13T19:09:35Z simplify test description; remove whitespace commit 2e48604ff9aadebc4f7f3f8edeee252722967da9 Author: Chris Horn Date: 2018-07-13T19:22:07Z do not use Matchers commit c4e7490f1762aff5ae5b7126adb9ddd8d987a77d Author: Takeshi Yamamuro Date: 2018-08-02T06:20:34Z Fix --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21754: [SPARK-24705][SQL] ExchangeCoordinator broken whe...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21754#discussion_r207106352 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala --- @@ -52,6 +52,14 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan // Ignore this wrapper for canonicalizing. override def doCanonicalize(): SparkPlan = child.canonicalized + override protected def doPrepare(): Unit = { +child match { + case shuffleExchange @ ShuffleExchangeExec(_, _, Some(coordinator)) => +coordinator.registerExchange(shuffleExchange) --- End diff -- updated --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21754#discussion_r207105078 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala --- @@ -52,6 +52,14 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan // Ignore this wrapper for canonicalizing. override def doCanonicalize(): SparkPlan = child.canonicalized + override protected def doPrepare(): Unit = { +child match { + case shuffleExchange @ ShuffleExchangeExec(_, _, Some(coordinator)) => +coordinator.registerExchange(shuffleExchange) --- End diff -- sorry to confuse you, but I'm working on the issue only in this pr. Probably, the title is obscure, so I'll update soon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org