[GitHub] [spark] AnywalkerGiser opened a new pull request, #36559: [SPARK-39176][PYSPARK] Fixed a problem with pyspark serializing pre-1970 datetime in windows
AnywalkerGiser opened a new pull request, #36559: URL: https://github.com/apache/spark/pull/36559 ### What changes were proposed in this pull request? Fix problems with pyspark in Windows: 1. Fixed datetime conversion to timestamp before 1970; 2. Fixed datetime conversion when timestamp is negative; 3. Adding a test script. ### Why are the changes needed? Pyspark has problems serializing pre-1970 times in Windows. An exception occurs when executing the following code under Windows: ```python rdd = sc.parallelize([('a', datetime(1957, 1, 9, 0, 0)), ('b', datetime(2014, 1, 27, 0, 0))]) df = spark.createDataFrame(rdd, ["id", "date"]) df.show() df.printSchema() print(df.collect()) ``` ```bash File "...\spark\python\lib\pyspark.zip\pyspark\sql\types.py", line 195, in toInternal else time.mktime(dt.timetuple())) OverflowError: mktime argument out of range at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ... 1 more ``` and ```bash File ...\spark\python\lib\pyspark.zip\pyspark\sql\types.py, in fromInternal: Line 207: return datetime.datetime.fromtimestamp(ts // 100).replace(microsecond=ts % 100) OSError: [Errno 22] Invalid argument ``` After updating the code, the above code was run successfully! ```bash +---+---+ | id| date| +---+---+ | a|1957-01-08 16:00:00| | b|2014-01-26 16:00:00| +---+---+ root |-- id: string (nullable = true) |-- date: timestamp (nullable = true) [Row(id='a', date=datetime.datetime(1957, 1, 9, 0, 0)), Row(id='b', date=datetime.datetime(2014, 1, 27, 0, 0))] ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New and existing test suites -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36541: [SPARK-39180][SQL] Simplify the planning of limit and offset
cloud-fan commented on code in PR #36541: URL: https://github.com/apache/spark/pull/36541#discussion_r873386778 ## sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala: ## @@ -82,52 +82,45 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object SpecialLimits extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case ReturnAnswer(rootPlan) => rootPlan match { -case Limit(IntegerLiteral(limit), Sort(order, true, child)) Review Comment: handled by `planTakeOrdered` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AnywalkerGiser closed pull request #36559: [SPARK-39176][PYSPARK] Fixed a problem with pyspark serializing pre-1970 datetime in windows
AnywalkerGiser closed pull request #36559: [SPARK-39176][PYSPARK] Fixed a problem with pyspark serializing pre-1970 datetime in windows URL: https://github.com/apache/spark/pull/36559 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #36541: [SPARK-39180][SQL] Simplify the planning of limit and offset
beliefer commented on code in PR #36541: URL: https://github.com/apache/spark/pull/36541#discussion_r873401681 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala: ## @@ -1303,12 +1303,25 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr copy(child = newChild) } +object OffsetAndLimit { + def unapply(p: GlobalLimit): Option[(Int, Int, LogicalPlan)] = { +p match { + // Optimizer pushes local limit through offset, so we need to match the plan this way. + case GlobalLimit(IntegerLiteral(globalLimit), + Offset(IntegerLiteral(offset), + LocalLimit(IntegerLiteral(localLimit), child))) + if globalLimit + offset == localLimit => +Some((offset, globalLimit, child)) Review Comment: In fact, we not use globalLimit in physical plan. It seems we can return localLimit here. Then we can avoid `+` in physical plan. ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala: ## @@ -1303,12 +1303,25 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr copy(child = newChild) } +object OffsetAndLimit { + def unapply(p: GlobalLimit): Option[(Int, Int, LogicalPlan)] = { +p match { + // Optimizer pushes local limit through offset, so we need to match the plan this way. + case GlobalLimit(IntegerLiteral(globalLimit), + Offset(IntegerLiteral(offset), + LocalLimit(IntegerLiteral(localLimit), child))) + if globalLimit + offset == localLimit => +Some((offset, globalLimit, child)) Review Comment: In fact, we not use `globalLimit` in physical plan. It seems we can return `localLimit` here. Then we can avoid `+` in physical plan. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #36541: [SPARK-39180][SQL] Simplify the planning of limit and offset
beliefer commented on code in PR #36541: URL: https://github.com/apache/spark/pull/36541#discussion_r873402602 ## sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala: ## @@ -81,55 +81,56 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { */ object SpecialLimits extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case ReturnAnswer(rootPlan) => rootPlan match { -case Limit(IntegerLiteral(limit), Sort(order, true, child)) -if limit < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil -case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child))) -if limit < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil + // Call `planTakeOrdered` first which matches a larger plan. + case ReturnAnswer(rootPlan) => planTakeOrdered(rootPlan).getOrElse(rootPlan match { +// We should match the combination of limit and offset first, to get the optimal physical +// plan, instead of planning limit and offset separately. +case LimitAndOffset(limit, offset, child) => + CollectLimitExec(limit = limit, child = planLater(child), offset = offset) +case OffsetAndLimit(offset, limit, child) => + // 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'. + CollectLimitExec(limit = offset + limit, child = planLater(child), offset = offset) case Limit(IntegerLiteral(limit), child) => - CollectLimitExec(limit, planLater(child)) :: Nil -case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), - Sort(order, true, child)) if limit + offset < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec( -limit, order, child.output, planLater(child), offset) :: Nil -case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), - Project(projectList, Sort(order, true, child))) -if limit + offset < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec( -limit, order, projectList, planLater(child), offset) :: Nil -case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), child) => - CollectLimitExec(limit, planLater(child), offset) :: Nil + CollectLimitExec(limit = limit, child = planLater(child)) case logical.Offset(IntegerLiteral(offset), child) => - CollectLimitExec(child = planLater(child), offset = offset) :: Nil + CollectLimitExec(child = planLater(child), offset = offset) case Tail(IntegerLiteral(limit), child) => - CollectTailExec(limit, planLater(child)) :: Nil -case other => planLater(other) :: Nil - } + CollectTailExec(limit, planLater(child)) +case other => planLater(other) + }) :: Nil + + case other => planTakeOrdered(other).toSeq +} + +private def planTakeOrdered(plan: LogicalPlan): Option[SparkPlan] = plan match { + // We should match the combination of limit and offset first, to get the optimal physical + // plan, instead of planning limit and offset separately. + case LimitAndOffset(limit, offset, Sort(order, true, child)) + if limit < conf.topKSortFallbackThreshold => +Some(TakeOrderedAndProjectExec( + limit, order, child.output, planLater(child), offset)) + case LimitAndOffset(limit, offset, Project(projectList, Sort(order, true, child))) + if limit < conf.topKSortFallbackThreshold => +Some(TakeOrderedAndProjectExec( + limit, order, projectList, planLater(child), offset)) + // 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'. + case OffsetAndLimit(offset, limit, Sort(order, true, child)) + if offset + limit < conf.topKSortFallbackThreshold => Review Comment: If we use `localLimit` directly, we can avoid `offset + limit` here. ## sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala: ## @@ -81,55 +81,56 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { */ object SpecialLimits extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case ReturnAnswer(rootPlan) => rootPlan match { -case Limit(IntegerLiteral(limit), Sort(order, true, child)) -if limit < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil -case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child))) -if limit < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil + // C
[GitHub] [spark] beliefer commented on a diff in pull request #36541: [SPARK-39180][SQL] Simplify the planning of limit and offset
beliefer commented on code in PR #36541: URL: https://github.com/apache/spark/pull/36541#discussion_r873403382 ## sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala: ## @@ -814,12 +815,19 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.LocalRelation(output, data, _) => LocalTableScanExec(output, data) :: Nil case CommandResult(output, _, plan, data) => CommandResultExec(output, plan, data) :: Nil + // We should match the combination of limit and offset first, to get the optimal physical + // plan, instead of planning limit and offset separately. + case LimitAndOffset(limit, offset, child) => +GlobalLimitExec(limit, planLater(child), offset) :: Nil + case OffsetAndLimit(offset, limit, child) => +// 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'. +GlobalLimitExec(limit = offset + limit, child = planLater(child), offset = offset) :: Nil Review Comment: ditto ## sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala: ## @@ -81,55 +81,56 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { */ object SpecialLimits extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case ReturnAnswer(rootPlan) => rootPlan match { -case Limit(IntegerLiteral(limit), Sort(order, true, child)) -if limit < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil -case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child))) -if limit < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil + // Call `planTakeOrdered` first which matches a larger plan. + case ReturnAnswer(rootPlan) => planTakeOrdered(rootPlan).getOrElse(rootPlan match { +// We should match the combination of limit and offset first, to get the optimal physical +// plan, instead of planning limit and offset separately. +case LimitAndOffset(limit, offset, child) => + CollectLimitExec(limit = limit, child = planLater(child), offset = offset) +case OffsetAndLimit(offset, limit, child) => + // 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'. + CollectLimitExec(limit = offset + limit, child = planLater(child), offset = offset) case Limit(IntegerLiteral(limit), child) => - CollectLimitExec(limit, planLater(child)) :: Nil -case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), - Sort(order, true, child)) if limit + offset < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec( -limit, order, child.output, planLater(child), offset) :: Nil -case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), - Project(projectList, Sort(order, true, child))) -if limit + offset < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec( -limit, order, projectList, planLater(child), offset) :: Nil -case LimitAndOffset(IntegerLiteral(limit), IntegerLiteral(offset), child) => - CollectLimitExec(limit, planLater(child), offset) :: Nil + CollectLimitExec(limit = limit, child = planLater(child)) case logical.Offset(IntegerLiteral(offset), child) => - CollectLimitExec(child = planLater(child), offset = offset) :: Nil + CollectLimitExec(child = planLater(child), offset = offset) case Tail(IntegerLiteral(limit), child) => - CollectTailExec(limit, planLater(child)) :: Nil -case other => planLater(other) :: Nil - } + CollectTailExec(limit, planLater(child)) +case other => planLater(other) + }) :: Nil + + case other => planTakeOrdered(other).toSeq +} + +private def planTakeOrdered(plan: LogicalPlan): Option[SparkPlan] = plan match { + // We should match the combination of limit and offset first, to get the optimal physical + // plan, instead of planning limit and offset separately. + case LimitAndOffset(limit, offset, Sort(order, true, child)) + if limit < conf.topKSortFallbackThreshold => +Some(TakeOrderedAndProjectExec( + limit, order, child.output, planLater(child), offset)) + case LimitAndOffset(limit, offset, Project(projectList, Sort(order, true, child))) + if limit < conf.topKSortFallbackThreshold => +Some(TakeOrderedAndProjectExec( + limit, order, projectList, planLater(child), offset)) + // 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'. + case OffsetAndLimit(offset, limit, Sort(order, true, child)) + if offset + limit < conf.topKSortFallbackThre
[GitHub] [spark] beliefer commented on a diff in pull request #36541: [SPARK-39180][SQL] Simplify the planning of limit and offset
beliefer commented on code in PR #36541: URL: https://github.com/apache/spark/pull/36541#discussion_r873406676 ## sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala: ## @@ -81,55 +81,56 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { */ object SpecialLimits extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case ReturnAnswer(rootPlan) => rootPlan match { -case Limit(IntegerLiteral(limit), Sort(order, true, child)) -if limit < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil -case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child))) -if limit < conf.topKSortFallbackThreshold => - TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil + // Call `planTakeOrdered` first which matches a larger plan. + case ReturnAnswer(rootPlan) => planTakeOrdered(rootPlan).getOrElse(rootPlan match { +// We should match the combination of limit and offset first, to get the optimal physical +// plan, instead of planning limit and offset separately. +case LimitAndOffset(limit, offset, child) => + CollectLimitExec(limit = limit, child = planLater(child), offset = offset) +case OffsetAndLimit(offset, limit, child) => + // 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'. + CollectLimitExec(limit = offset + limit, child = planLater(child), offset = offset) Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #36541: [SPARK-39180][SQL] Simplify the planning of limit and offset
beliefer commented on code in PR #36541: URL: https://github.com/apache/spark/pull/36541#discussion_r873409826 ## sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala: ## @@ -215,61 +211,52 @@ case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { } /** - * Take the first `limit` elements of the child's single output partition. - */ -case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { - - override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil - - override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = -copy(child = newChild) -} - -/** - * Skip the first `offset` elements then take the first `limit` of the following elements in - * the child's single output partition. + * Take the first `limit` elements and then drop the first `offset` elements in the child's single + * output partition. */ -case class GlobalLimitAndOffsetExec( -limit: Int = -1, -offset: Int, -child: SparkPlan) extends BaseLimitExec { - assert(offset > 0) +case class GlobalLimitExec(limit: Int = -1, child: SparkPlan, offset: Int = 0) + extends BaseLimitExec { + assert(limit >= 0 || (limit == -1 && offset > 0)) override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil - override def doExecute(): RDD[InternalRow] = if (limit >= 0) { -child.execute().mapPartitionsInternal(iter => iter.slice(offset, limit + offset)) - } else { -child.execute().mapPartitionsInternal(iter => iter.drop(offset)) + override def doExecute(): RDD[InternalRow] = { +if (offset > 0) { + if (limit >= 0) { +child.execute().mapPartitionsInternal(iter => iter.slice(offset, limit)) + } else { +child.execute().mapPartitionsInternal(iter => iter.drop(offset)) + } +} else { + super.doExecute() +} } - private lazy val skipTerm = BaseLimitExec.newLimitCountTerm() - override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { -ctx.addMutableState( - CodeGenerator.JAVA_INT, skipTerm, forceInline = true, useFreshName = false) -if (limit >= 0) { - // The counter name is already obtained by the upstream operators via `limitNotReachedChecks`. - // Here we have to inline it to not change its name. This is fine as we won't have many limit - // operators in one query. - ctx.addMutableState( -CodeGenerator.JAVA_INT, countTerm, forceInline = true, useFreshName = false) - s""" - | if ($skipTerm < $offset) { - | $skipTerm += 1; - | } else if ($countTerm < $limit) { - | $countTerm += 1; - | ${consume(ctx, input)} - | } +if (offset > 0) { + val skipTerm = ctx.addMutableState(CodeGenerator.JAVA_INT, "rowsSkipped", forceInline = true) + if (limit > 0) { +// In codegen, we skip the first `offset` rows, then take the first `limit - offset` rows. +val finalLimit = limit - offset +s""" + | if ($skipTerm < $offset) { + | $skipTerm += 1; + | } else if ($countTerm < $finalLimit) { + | $countTerm += 1; + | ${consume(ctx, input)} + | } """.stripMargin -} else { - s""" - | if ($skipTerm < $offset) { - | $skipTerm += 1; - | } else { - | ${consume(ctx, input)} - | } + } else { +s""" + | if ($skipTerm < $offset) { + | $skipTerm += 1; + | } else { + | ${consume(ctx, input)} + | } """.stripMargin Review Comment: ```suggestion """.stripMargin ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #36541: [SPARK-39180][SQL] Simplify the planning of limit and offset
beliefer commented on code in PR #36541: URL: https://github.com/apache/spark/pull/36541#discussion_r873410416 ## sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala: ## @@ -278,9 +265,9 @@ case class GlobalLimitAndOffsetExec( } /** - * Take the first limit elements as defined by the sortOrder, and do projection if needed. - * This is logically equivalent to having a Limit operator after a [[SortExec]] operator, - * or having a [[ProjectExec]] operator between them. + * Take the first `limit`` elements as defined by the sortOrder, then drop the first `offset` Review Comment: ```suggestion * Take the first `limit` elements as defined by the sortOrder, then drop the first `offset` ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36541: [SPARK-39180][SQL] Simplify the planning of limit and offset
cloud-fan commented on code in PR #36541: URL: https://github.com/apache/spark/pull/36541#discussion_r873423506 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala: ## @@ -1303,12 +1303,25 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr copy(child = newChild) } +object OffsetAndLimit { + def unapply(p: GlobalLimit): Option[(Int, Int, LogicalPlan)] = { +p match { + // Optimizer pushes local limit through offset, so we need to match the plan this way. + case GlobalLimit(IntegerLiteral(globalLimit), + Offset(IntegerLiteral(offset), + LocalLimit(IntegerLiteral(localLimit), child))) + if globalLimit + offset == localLimit => +Some((offset, globalLimit, child)) Review Comment: This pattern match is to match a logical offset + limit, but we care more about semantics here. Returning `localLimit` is super confusing. ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala: ## @@ -1303,12 +1303,25 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr copy(child = newChild) } +object OffsetAndLimit { + def unapply(p: GlobalLimit): Option[(Int, Int, LogicalPlan)] = { +p match { + // Optimizer pushes local limit through offset, so we need to match the plan this way. + case GlobalLimit(IntegerLiteral(globalLimit), + Offset(IntegerLiteral(offset), + LocalLimit(IntegerLiteral(localLimit), child))) + if globalLimit + offset == localLimit => +Some((offset, globalLimit, child)) Review Comment: This pattern match is to match a logical offset + limit, and we care more about semantics here. Returning `localLimit` is super confusing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36541: [SPARK-39180][SQL] Simplify the planning of limit and offset
cloud-fan commented on code in PR #36541: URL: https://github.com/apache/spark/pull/36541#discussion_r873425084 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala: ## @@ -1303,12 +1303,25 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr copy(child = newChild) } +object OffsetAndLimit { + def unapply(p: GlobalLimit): Option[(Int, Int, LogicalPlan)] = { +p match { + // Optimizer pushes local limit through offset, so we need to match the plan this way. + case GlobalLimit(IntegerLiteral(globalLimit), + Offset(IntegerLiteral(offset), + LocalLimit(IntegerLiteral(localLimit), child))) + if globalLimit + offset == localLimit => +Some((offset, globalLimit, child)) Review Comment: It's better to go for better readability, instead of saving a bit typing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #36541: [SPARK-39180][SQL] Simplify the planning of limit and offset
beliefer commented on code in PR #36541: URL: https://github.com/apache/spark/pull/36541#discussion_r873425906 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala: ## @@ -1303,12 +1303,25 @@ case class LocalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPr copy(child = newChild) } +object OffsetAndLimit { + def unapply(p: GlobalLimit): Option[(Int, Int, LogicalPlan)] = { +p match { + // Optimizer pushes local limit through offset, so we need to match the plan this way. + case GlobalLimit(IntegerLiteral(globalLimit), + Offset(IntegerLiteral(offset), + LocalLimit(IntegerLiteral(localLimit), child))) + if globalLimit + offset == localLimit => +Some((offset, globalLimit, child)) Review Comment: OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Yikun commented on a diff in pull request #36464: [SPARK-38947][PYTHON] Supports groupby positional indexing
Yikun commented on code in PR #36464: URL: https://github.com/apache/spark/pull/36464#discussion_r873426625 ## python/pyspark/pandas/groupby.py: ## @@ -2110,22 +2110,79 @@ def _limit(self, n: int, asc: bool) -> FrameLike: groupkey_scols = [psdf._internal.spark_column_for(label) for label in groupkey_labels] sdf = psdf._internal.spark_frame -tmp_col = verify_temp_column_name(sdf, "__row_number__") +tmp_row_num_col = verify_temp_column_name(sdf, "__row_number__") +window = Window.partitionBy(*groupkey_scols) # This part is handled differently depending on whether it is a tail or a head. -window = ( - Window.partitionBy(*groupkey_scols).orderBy(F.col(NATURAL_ORDER_COLUMN_NAME).asc()) +ordered_window = ( +window.orderBy(F.col(NATURAL_ORDER_COLUMN_NAME).asc()) if asc -else Window.partitionBy(*groupkey_scols).orderBy( -F.col(NATURAL_ORDER_COLUMN_NAME).desc() -) +else window.orderBy(F.col(NATURAL_ORDER_COLUMN_NAME).desc()) ) -sdf = ( -sdf.withColumn(tmp_col, F.row_number().over(window)) -.filter(F.col(tmp_col) <= n) -.drop(tmp_col) -) +if n >= 0 or LooseVersion(pd.__version__) < LooseVersion("1.4.0"): + +sdf = ( +sdf.withColumn(tmp_row_num_col, F.row_number().over(ordered_window)) +.filter(F.col(tmp_row_num_col) <= n) +.drop(tmp_row_num_col) +) +else: +# Pandas supports Groupby positional indexing since v1.4.0 +# https://pandas.pydata.org/docs/whatsnew/v1.4.0.html#groupby-positional-indexing +# +# To support groupby positional indexing, we need add two columns to help we filter +# target rows: +# - Add `__row_number__` and `__group_count__` columns. +# - Use `F.col(tmp_row_num_col) - F.col(tmp_cnt_col) <= positional_index_number` to +# filter target rows. +# - Then drop `__row_number__` and `__group_count__` columns. +# +# For example for the dataframe: +# >>> df = ps.DataFrame([["g", "g0"], +# ... ["g", "g1"], +# ... ["g", "g2"], +# ... ["g", "g3"], +# ... ["h", "h0"], +# ... ["h", "h1"]], columns=["A", "B"]) +# >>> df.groupby("A").head(-1) +# +# Below is an example to show the `__row_number__` column and `__group_count__` column +# for above df: +# >>> sdf.withColumn(tmp_row_num_col, F.row_number().over(window)) +#.withColumn(tmp_cnt_col, F.count("*").over(window)).show() +# +---++---+---++--+---+ +# |__index_level..|__groupkey..| A| B|__natural_..|__row_number__|__group_count__| +# +---++---+---++--+---+ +# | 0| g| g| g0| 17179869184| 1| 4| +# | 1| g| g| g1| 42949672960| 2| 4| +# | 2| g| g| g2| 60129542144| 3| 4| +# | 3| g| g| g3| 85899345920| 4| 4| +# | 4| h| h| h0|111669149696| 1| 2| +# | 5| h| h| h1|128849018880| 2| 2| +# +---++---+---++--+---+ +# +# The limit n is `-1`, we need to filter rows[:-1] in each group: +# +# >>> sdf.withColumn(tmp_row_num_col, F.row_number().over(window)) +#.withColumn(tmp_cnt_col, F.count("*").over(window)) +#.filter(F.col(tmp_row_num_col) - F.col(tmp_cnt_col) <= -1).show() Review Comment: @zhengruifeng I think `lag` is better in here: 1 `WindowsExec` + 2 `sort` + 1 `shuffle`, same cost with orignal implments == Physical Plan == ``` == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Project [__index_level_0__#0, __groupkey_0__#19L, a#1L, b#2L, c#3L, __natural_order__#8L] +- Filter isnull(__tmp_lag__#447) +- Window [lag(0, -2, null) windowspecdefinition(__groupkey_0__#19L, __natural_order__#8L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -2, -2)) AS __tmp_lag__#447], [__groupkey_0__#19L], [__natural_order__#8L ASC NULLS FIRST] +- Sort [__groupkey_0__#19L ASC NULLS FIRST, __natural_order__#8L ASC NULLS FIRST], false, 0
[GitHub] [spark] Yikun commented on a diff in pull request #36464: [SPARK-38947][PYTHON] Supports groupby positional indexing
Yikun commented on code in PR #36464: URL: https://github.com/apache/spark/pull/36464#discussion_r873437306 ## python/pyspark/pandas/groupby.py: ## @@ -2110,22 +2110,60 @@ def _limit(self, n: int, asc: bool) -> FrameLike: groupkey_scols = [psdf._internal.spark_column_for(label) for label in groupkey_labels] sdf = psdf._internal.spark_frame -tmp_col = verify_temp_column_name(sdf, "__row_number__") +window = Window.partitionBy(*groupkey_scols) # This part is handled differently depending on whether it is a tail or a head. -window = ( - Window.partitionBy(*groupkey_scols).orderBy(F.col(NATURAL_ORDER_COLUMN_NAME).asc()) +ordered_window = ( +window.orderBy(F.col(NATURAL_ORDER_COLUMN_NAME).asc()) if asc -else Window.partitionBy(*groupkey_scols).orderBy( -F.col(NATURAL_ORDER_COLUMN_NAME).desc() -) +else window.orderBy(F.col(NATURAL_ORDER_COLUMN_NAME).desc()) ) -sdf = ( -sdf.withColumn(tmp_col, F.row_number().over(window)) -.filter(F.col(tmp_col) <= n) -.drop(tmp_col) -) +if n >= 0 or LooseVersion(pd.__version__) < LooseVersion("1.4.0"): +tmp_row_num_col = verify_temp_column_name(sdf, "__row_number__") +sdf = ( +sdf.withColumn(tmp_row_num_col, F.row_number().over(ordered_window)) +.filter(F.col(tmp_row_num_col) <= n) +.drop(tmp_row_num_col) +) Review Comment: BTW, we could also consider to unify here to use `lag` way: ```python sdf = ( sdf.withColumn(tmp_lag_col, F.lag(F.lit(0), n).over(window)) # for positive case .where(F.isnull(F.col(tmp_lag_col))) .drop(tmp_lag_col) ) ``` I can submit a separate PR to address it, if you guys think it's necessary. Theoretically, `lag` has better performance than `row_number` especially when rows number is very huge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Yikun commented on a diff in pull request #36464: [SPARK-38947][PYTHON] Supports groupby positional indexing
Yikun commented on code in PR #36464: URL: https://github.com/apache/spark/pull/36464#discussion_r873437306 ## python/pyspark/pandas/groupby.py: ## @@ -2110,22 +2110,60 @@ def _limit(self, n: int, asc: bool) -> FrameLike: groupkey_scols = [psdf._internal.spark_column_for(label) for label in groupkey_labels] sdf = psdf._internal.spark_frame -tmp_col = verify_temp_column_name(sdf, "__row_number__") +window = Window.partitionBy(*groupkey_scols) # This part is handled differently depending on whether it is a tail or a head. -window = ( - Window.partitionBy(*groupkey_scols).orderBy(F.col(NATURAL_ORDER_COLUMN_NAME).asc()) +ordered_window = ( +window.orderBy(F.col(NATURAL_ORDER_COLUMN_NAME).asc()) if asc -else Window.partitionBy(*groupkey_scols).orderBy( -F.col(NATURAL_ORDER_COLUMN_NAME).desc() -) +else window.orderBy(F.col(NATURAL_ORDER_COLUMN_NAME).desc()) ) -sdf = ( -sdf.withColumn(tmp_col, F.row_number().over(window)) -.filter(F.col(tmp_col) <= n) -.drop(tmp_col) -) +if n >= 0 or LooseVersion(pd.__version__) < LooseVersion("1.4.0"): +tmp_row_num_col = verify_temp_column_name(sdf, "__row_number__") +sdf = ( +sdf.withColumn(tmp_row_num_col, F.row_number().over(ordered_window)) +.filter(F.col(tmp_row_num_col) <= n) +.drop(tmp_row_num_col) +) Review Comment: BTW, we could also consider to unify here to use `lag` way: ```python sdf = ( sdf.withColumn(tmp_lag_col, F.lag(F.lit(0), n).over(window)) # for positive case .where(F.isnull(F.col(tmp_lag_col))) .drop(tmp_lag_col) ) ``` If you guys think it's necessary, I can submit a separate PR to address it. Theoretically, `lag` has better performance than `row_number` especially when rows number is very huge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AmplabJenkins commented on pull request #36559: [SPARK-39176][PYSPARK] Fixed a problem with pyspark serializing pre-1970 datetime in windows
AmplabJenkins commented on PR #36559: URL: https://github.com/apache/spark/pull/36559#issuecomment-1127364481 Can one of the admins verify this patch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng opened a new pull request, #36560: [SPARK-39192][PYTHON] make pandas-on-spark's kurt consistent with pandas
zhengruifeng opened a new pull request, #36560: URL: https://github.com/apache/spark/pull/36560 ### What changes were proposed in this pull request? make pandas-on-spark's kurt consistent with pandas ### Why are the changes needed? 1, the formulas of Kurtosis were different between spark sql and pandas; 2, pandas zeros out small `numerator` and `denominator` for better numerical stability; ### Does this PR introduce _any_ user-facing change? yes, the logic of kurt changed ### How was this patch tested? added UT -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #36560: [SPARK-39192][PYTHON] make pandas-on-spark's kurt consistent with pandas
zhengruifeng commented on PR #36560: URL: https://github.com/apache/spark/pull/36560#issuecomment-1127378942 before this pr ``` In [2]: df = ps.DataFrame({'a': [1, 2, 3, np.nan], 'b': [0.1, 0.2, 0.3, np.nan]}, columns=['a', 'b']) In [3]: df.kurtosis() Out[3]: a -1.5 b -1.5 dtype: float64 In [4]: df.to_pandas().kurtosis() /d0/Dev/Opensource/spark/python/pyspark/pandas/utils.py:976: PandasAPIOnSparkAdviceWarning: `to_pandas` loads all data into the driver's memory. It should only be used if the resulting pandas DataFrame is expected to be small. warnings.warn(message, PandasAPIOnSparkAdviceWarning) Out[4]: a NaN b NaN dtype: float64 In [5]: df = ps.DataFrame({'a': [1, 2, 3, np.nan, 6], 'b': [0.1, 0.2, 0.3, np.nan, 0.8]}, columns=['a', 'b']) In [6]: df.kurtosis() Out[6]: a -1.00 b -0.839477 dtype: float64 In [7]: df.to_pandas().kurtosis() /d0/Dev/Opensource/spark/python/pyspark/pandas/utils.py:976: PandasAPIOnSparkAdviceWarning: `to_pandas` loads all data into the driver's memory. It should only be used if the resulting pandas DataFrame is expected to be small. warnings.warn(message, PandasAPIOnSparkAdviceWarning) Out[7]: a1.50 b2.703924 dtype: float64 ``` after this pr, pandas and pandas-on-spark return the same results -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] panbingkun opened a new pull request, #36561: [SPARK-37939][SQL] Use error classes in the parsing errors of properties
panbingkun opened a new pull request, #36561: URL: https://github.com/apache/spark/pull/36561 ## What changes were proposed in this pull request? Migrate the following errors in QueryParsingErrors onto use error classes: - cannotCleanReservedNamespacePropertyError => UNSUPPORTED_FEATURE.CLEAN_RESERVED_NAMESPACE_PROPERTY - cannotCleanReservedTablePropertyError => UNSUPPORTED_FEATURE.CLEAN_RESERVED_TABLE_PROPERTY - invalidPropertyKeyForSetQuotedConfigurationError => INVALID_PROPERTY_KEY - invalidPropertyValueForSetQuotedConfigurationError => INVALID_PROPERTY_VALUE - propertiesAndDbPropertiesBothSpecifiedError => UNSUPPORTED_FEATURE.PROPERTIES_AND_DBPROPERTIES_BOTH_SPECIFIED_CONFLICT ### Why are the changes needed? Porting parsing errors of partitions to new error framework, improve test coverage, and document expected error messages in tests. ### Does this PR introduce any user-facing change? No ### How was this patch tested? By running new test: ``` $ build/sbt "sql/testOnly *QueryParsingErrorsSuite*" ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang opened a new pull request, #36562: [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources
gengliangwang opened a new pull request, #36562: URL: https://github.com/apache/spark/pull/36562 ### What changes were proposed in this pull request? When reading JSON/CSV files with inferring timestamp types (`.option("inferTimestamp", true)`), the Timestamp conversion will throw and catch exceptions. As we are putting decent error messages in the exception: ``` def cannotCastToDateTimeError( value: Any, from: DataType, to: DataType, errorContext: String): Throwable = { val valueString = toSQLValue(value, from) new SparkDateTimeException("INVALID_SYNTAX_FOR_CAST", Array(toSQLType(to), valueString, SQLConf.ANSI_ENABLED.key, errorContext)) } ``` the creation of the exceptions is actually not cheap. It consumes more than 90% of the type inference time. We can use the parsing methods which return optional results to avoid creating the exceptions. With this PR, the schema inference time is reduced by 90% in a local benchmark. ### Why are the changes needed? Performance improvement ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing UT Also manual test the runtime to inferring a JSON file of 624MB with inferring timestamp enabled: ``` spark.read.option("inferTimestamp", true).json(file) ``` - Before the change, it takes 166 seconds - After the change, it only 16 seconds. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] zhengruifeng commented on pull request #36560: [SPARK-39192][PYTHON] make pandas-on-spark's kurt consistent with pandas
zhengruifeng commented on PR #36560: URL: https://github.com/apache/spark/pull/36560#issuecomment-1127383653 cc @HyukjinKwon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on pull request #36562: [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources
gengliangwang commented on PR #36562: URL: https://github.com/apache/spark/pull/36562#issuecomment-1127384475 cc @sadikovi who reported this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] yaooqinn opened a new pull request, #36563: [SPARK-39194][SQL] Add a pre resolution builder for spark session extensions
yaooqinn opened a new pull request, #36563: URL: https://github.com/apache/spark/pull/36563 ### What changes were proposed in this pull request? This PR aims to introduce an extension point for pre resolution. ### Why are the changes needed? A pre resolution enables developers or users to do some preparations before the actual resolution phase. For example, the current catalog v2 implementations require setting SQL configurations ahead to activate, which is not flexible to use. The current relation resolution always falls back to the current/default catalog, which makes extra resolution rules unable to handle this issue. But W/ this feature we have opportunities to inject catalogs early. ### Does this PR introduce _any_ user-facing change? it is a dev oriented change ### How was this patch tested? new tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on pull request #36558: [SPARK-39187][SQL][3.3] Remove `SparkIllegalStateException`
MaxGekk commented on PR #36558: URL: https://github.com/apache/spark/pull/36558#issuecomment-1127392276 Merging to 3.3. Thank you, @HyukjinKwon for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk closed pull request #36558: [SPARK-39187][SQL][3.3] Remove `SparkIllegalStateException`
MaxGekk closed pull request #36558: [SPARK-39187][SQL][3.3] Remove `SparkIllegalStateException` URL: https://github.com/apache/spark/pull/36558 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] yaooqinn commented on pull request #36563: [SPARK-39194][SQL] Add a pre resolution builder for spark session extensions
yaooqinn commented on PR #36563: URL: https://github.com/apache/spark/pull/36563#issuecomment-1127402365 cc @cloud-fan @HyukjinKwon thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on pull request #36562: [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources
gengliangwang commented on PR #36562: URL: https://github.com/apache/spark/pull/36562#issuecomment-1127404489 @MaxGekk Sure. BTW I suggest including this one in the RC2. @sadikovi found that the perf is 30% slower with https://github.com/apache/spark/pull/36362. So this one is actually fixing a perf regression in 3.3. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AngersZhuuuu opened a new pull request, #36564: [WIP][SPARK-39195][SQL] Spark should use two step update of outputCommitCoordinator
AngersZh opened a new pull request, #36564: URL: https://github.com/apache/spark/pull/36564 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a diff in pull request #36530: [SPARK-39172][SQL] Remove outer join if all output come from streamed side and buffered side keys exist unique key
ulysses-you commented on code in PR #36530: URL: https://github.com/apache/spark/pull/36530#discussion_r873491017 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala: ## @@ -139,6 +139,14 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { * SELECT t1.c1, max(t1.c2) FROM t1 GROUP BY t1.c1 * }}} * + * 3. Remove outer join if all output comes from streamed side and the join keys from buffered side + * exist unique key. Review Comment: changed it to ``` * 3. Remove outer join if: * - For a left outer join with only left-side columns being selected and the right side join * keys are unique. * - For a right outer join with only right-side columns being selected and the left side join * keys are unique. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AnywalkerGiser closed pull request #36559: [SPARK-39176][PYSPARK] Fixed a problem with pyspark serializing pre-1970 datetime in windows
AnywalkerGiser closed pull request #36559: [SPARK-39176][PYSPARK] Fixed a problem with pyspark serializing pre-1970 datetime in windows URL: https://github.com/apache/spark/pull/36559 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AnywalkerGiser opened a new pull request, #36565: [SPARK-39176][PYSPARK] Fixed a problem with pyspark serializing pre-1970 datetime in windows
AnywalkerGiser opened a new pull request, #36565: URL: https://github.com/apache/spark/pull/36565 ### What changes were proposed in this pull request? Fix problems with pyspark in Windows: 1. Fixed datetime conversion to timestamp before 1970; 2. Fixed datetime conversion when timestamp is negative; 3. Adding a test script. ### Why are the changes needed? Pyspark has problems serializing pre-1970 times in Windows. An exception occurs when executing the following code under Windows: ```python rdd = sc.parallelize([('a', datetime(1957, 1, 9, 0, 0)), ('b', datetime(2014, 1, 27, 0, 0))]) df = spark.createDataFrame(rdd, ["id", "date"]) df.show() df.printSchema() print(df.collect()) ``` ```bash File "...\spark\python\lib\pyspark.zip\pyspark\sql\types.py", line 195, in toInternal else time.mktime(dt.timetuple())) OverflowError: mktime argument out of range at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ... 1 more ``` and ```bash File ...\spark\python\lib\pyspark.zip\pyspark\sql\types.py, in fromInternal: Line 207: return datetime.datetime.fromtimestamp(ts // 100).replace(microsecond=ts % 100) OSError: [Errno 22] Invalid argument ``` After updating the code, the above code was run successfully! ```bash +---+---+ | id| date| +---+---+ | a|1957-01-08 16:00:00| | b|2014-01-26 16:00:00| +---+---+ root |-- id: string (nullable = true) |-- date: timestamp (nullable = true) [Row(id='a', date=datetime.datetime(1957, 1, 9, 0, 0)), Row(id='b', date=datetime.datetime(2014, 1, 27, 0, 0))] ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New and existing test suites -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AnywalkerGiser closed pull request #36565: [SPARK-39176][PYSPARK] Fixed a problem with pyspark serializing pre-1970 datetime in windows
AnywalkerGiser closed pull request #36565: [SPARK-39176][PYSPARK] Fixed a problem with pyspark serializing pre-1970 datetime in windows URL: https://github.com/apache/spark/pull/36565 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36531: [SPARK-39171][SQL] Unify the Cast expression
cloud-fan commented on code in PR #36531: URL: https://github.com/apache/spark/pull/36531#discussion_r873508975 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala: ## @@ -275,6 +376,55 @@ object Cast { case _ => null } } + + // Show suggestion on how to complete the disallowed explicit casting with built-in type + // conversion functions. + private def suggestionOnConversionFunctions ( + from: DataType, + to: DataType, + functionNames: String): String = { +// scalastyle:off line.size.limit +s"""cannot cast ${from.catalogString} to ${to.catalogString}. + |To convert values from ${from.catalogString} to ${to.catalogString}, you can use $functionNames instead. + |""".stripMargin +// scalastyle:on line.size.limit + } + + def typeCheckFailureMessage( + from: DataType, + to: DataType, + fallbackConfKey: Option[String], + fallbackConfValue: Option[String]): String = +(from, to) match { + case (_: NumericType, TimestampType) => +suggestionOnConversionFunctions(from, to, + "functions TIMESTAMP_SECONDS/TIMESTAMP_MILLIS/TIMESTAMP_MICROS") + + case (TimestampType, _: NumericType) => +suggestionOnConversionFunctions(from, to, "functions UNIX_SECONDS/UNIX_MILLIS/UNIX_MICROS") + + case (_: NumericType, DateType) => +suggestionOnConversionFunctions(from, to, "function DATE_FROM_UNIX_DATE") + + case (DateType, _: NumericType) => +suggestionOnConversionFunctions(from, to, "function UNIX_DATE") + + // scalastyle:off line.size.limit + case _ if fallbackConfKey.isDefined && fallbackConfValue.isDefined && Cast.canCast(from, to) => +s""" + | cannot cast ${from.catalogString} to ${to.catalogString} with ANSI mode on. + | If you have to cast ${from.catalogString} to ${to.catalogString}, you can set ${fallbackConfKey.get} as ${fallbackConfValue.get}. Review Comment: Now I see the value of `AnsiCast`: it identifies the cast added by the table insertion resolver so that we can provide a different error message here. I think it's a bit overkill to have a class `AnsiCast` for this purpose. We can have a bool `TreeNodeTag` for `Cast` to indicate if it's added by table insertion resolver. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r873511627 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/ScanBuilder.java: ## @@ -23,7 +23,7 @@ * An interface for building the {@link Scan}. Implementations can mixin SupportsPushDownXYZ * interfaces to do operator push down, and keep the operator push down result in the returned * {@link Scan}. When pushing down operators, the push down order is: - * sample -> filter -> aggregate -> limit -> column pruning. + * sample -> filter -> aggregate -> offset -> limit or top N -> column pruning. Review Comment: top n is a bit tricky as it's sort + limit. how about `aggregate -> limit/top-n(sort + limit) -> offset`? the order of limit and offset doesn't matter as we can always switch the order and adjust the value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r873511627 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/ScanBuilder.java: ## @@ -23,7 +23,7 @@ * An interface for building the {@link Scan}. Implementations can mixin SupportsPushDownXYZ * interfaces to do operator push down, and keep the operator push down result in the returned * {@link Scan}. When pushing down operators, the push down order is: - * sample -> filter -> aggregate -> limit -> column pruning. + * sample -> filter -> aggregate -> offset -> limit or top N -> column pruning. Review Comment: top n is a bit tricky as it's sort + limit. how about `aggregate -> limit/top-N(sort + limit) -> offset`? the order of limit and offset doesn't matter as we can always switch the order and adjust the value. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r873511627 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/ScanBuilder.java: ## @@ -23,7 +23,7 @@ * An interface for building the {@link Scan}. Implementations can mixin SupportsPushDownXYZ * interfaces to do operator push down, and keep the operator push down result in the returned * {@link Scan}. When pushing down operators, the push down order is: - * sample -> filter -> aggregate -> limit -> column pruning. + * sample -> filter -> aggregate -> offset -> limit or top N -> column pruning. Review Comment: top n is a bit tricky as it's sort + limit. how about `aggregate -> limit/top-N(sort + limit) -> offset`? the order of limit and offset doesn't matter as we can always switch the order and adjust the value. And this order matches the physical plan more. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
cloud-fan commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r873515998 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala: ## @@ -419,6 +420,30 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper wit } } + private def pushDownOffset(plan: LogicalPlan, offset: Int): (LogicalPlan, Boolean) = plan match { +case operation @ ScanOperation(_, filter, sHolder: ScanBuilderHolder) if filter.isEmpty => + val isPushed = PushDownUtils.pushOffset(sHolder.builder, offset) + if (isPushed) { +sHolder.pushedOffset = Some(offset) + } + (operation, isPushed) +case p: Project => + val (newChild, isPushed) = pushDownOffset(p.child, offset) + (p.withNewChildren(Seq(newChild)), isPushed) +case other => (other, false) + } + + def pushDownOffsets(plan: LogicalPlan): LogicalPlan = plan.transform { +// TODO supports push down Limit append Offset or Offset append Limit +case offset @ Offset(IntegerLiteral(n), child) => Review Comment: We can match offset, limit + offset and offset + limit, similar to the planner rule (after https://github.com/apache/spark/pull/36541) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36530: [SPARK-39172][SQL] Remove outer join if all output come from streamed side and buffered side keys exist unique key
cloud-fan commented on code in PR #36530: URL: https://github.com/apache/spark/pull/36530#discussion_r873517334 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala: ## @@ -139,6 +139,17 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { * SELECT t1.c1, max(t1.c2) FROM t1 GROUP BY t1.c1 * }}} * + * 3. Remove outer join if: + * - For a left outer join with only left-side columns being selected and the right side join + * keys are unique. + * - For a right outer join with only right-side columns being selected and the left side join + * keys are unique. + * + * {{{ + * SELECT t1.* FROM t1 LEFT JOIN (SELECT DISTINCT c1 as c1 FROM t)t2 ON t1.c1 = t2.c1 ==> Review Comment: ```suggestion * SELECT t1.* FROM t1 LEFT JOIN (SELECT DISTINCT c1 as c1 FROM t) t2 ON t1.c1 = t2.c1 ==> ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on pull request #36562: [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources
gengliangwang commented on PR #36562: URL: https://github.com/apache/spark/pull/36562#issuecomment-1127443975 @MaxGekk I tried generating the benchmark files for CSV. There is no significant improvement since the timestamp inputs are all valid timestamp strings. Do I need to continue with it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36530: [SPARK-39172][SQL] Remove outer join if all output come from streamed side and buffered side keys exist unique key
cloud-fan commented on code in PR #36530: URL: https://github.com/apache/spark/pull/36530#discussion_r873517906 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala: ## @@ -268,4 +268,54 @@ class OuterJoinEliminationSuite extends PlanTest { comparePlans(optimized, originalQuery.analyze) } + + test("SPARK-39172: Remove outer join if all output come from streamed side and buffered side " + Review Comment: please update the test name as well ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala: ## @@ -268,4 +268,54 @@ class OuterJoinEliminationSuite extends PlanTest { comparePlans(optimized, originalQuery.analyze) } + + test("SPARK-39172: Remove outer join if all output come from streamed side and buffered side " + +"keys exist unique key") { +val x = testRelation.subquery(Symbol("x")) Review Comment: ```suggestion val x = testRelation.subquery("x") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36530: [SPARK-39172][SQL] Remove outer join if all output come from streamed side and buffered side keys exist unique key
cloud-fan commented on code in PR #36530: URL: https://github.com/apache/spark/pull/36530#discussion_r873520469 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala: ## @@ -268,4 +268,54 @@ class OuterJoinEliminationSuite extends PlanTest { comparePlans(optimized, originalQuery.analyze) } + + test("SPARK-39172: Remove outer join if all output come from streamed side and buffered side " + +"keys exist unique key") { +val x = testRelation.subquery(Symbol("x")) +val y = testRelation1.subquery(Symbol("y")) + +// left outer +comparePlans(Optimize.execute( + x.join(y.groupBy($"d")($"d"), LeftOuter, Some($"a" === $"d")) +.select($"a", $"b", $"c").analyze), + x.select($"a", $"b", $"c").analyze +) + +comparePlans(Optimize.execute( + x.join(y.groupBy($"d")($"d", count($"d").as("x")), LeftOuter, +Some($"a" === $"d" && $"b" === $"x")) +.select($"a", $"b", $"c").analyze), + x.select($"a", $"b", $"c").analyze +) + +// right outer +comparePlans(Optimize.execute( + x.groupBy($"a")($"a").join(y, RightOuter, Some($"a" === $"d")) +.select($"d", $"e", $"f").analyze), + y.select($"d", $"e", $"f").analyze +) + +comparePlans(Optimize.execute( + x.groupBy($"a")($"a", count($"a").as("x")).join(y, RightOuter, +Some($"a" === $"d" && $"x" === $"e")) +.select($"d", $"e", $"f").analyze), + y.select($"d", $"e", $"f").analyze +) + +// negative case +// not a equi-join +val p1 = x.join(y.groupBy($"d")($"d"), LeftOuter, Some($"a" > $"d")) + .select($"a").analyze +comparePlans(Optimize.execute(p1), p1) + +// do not exist unique key +val p2 = x.join(y.groupBy($"d", $"e")($"d", $"e"), LeftOuter, Some($"a" === $"d")) + .select($"a").analyze +comparePlans(Optimize.execute(p2), p2) + +// output comes from buffered side Review Comment: ```suggestion // output comes from the right side of a left outer join ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AnywalkerGiser opened a new pull request, #36566: [SPARK-39176][PYSPARK] Fixed a problem with pyspark serializing pre-1970 datetime in windows
AnywalkerGiser opened a new pull request, #36566: URL: https://github.com/apache/spark/pull/36566 ### What changes were proposed in this pull request? Fix problems with pyspark in Windows: 1. Fixed datetime conversion to timestamp before 1970; 2. Fixed datetime conversion when timestamp is negative; 3. Adding a test script. ### Why are the changes needed? Pyspark has problems serializing pre-1970 times in Windows. An exception occurs when executing the following code under Windows: ```python rdd = sc.parallelize([('a', datetime(1957, 1, 9, 0, 0)), ('b', datetime(2014, 1, 27, 0, 0))]) df = spark.createDataFrame(rdd, ["id", "date"]) df.show() df.printSchema() print(df.collect()) ``` ```bash File "...\spark\python\lib\pyspark.zip\pyspark\sql\types.py", line 195, in toInternal else time.mktime(dt.timetuple())) OverflowError: mktime argument out of range at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ... 1 more ``` and ```bash File ...\spark\python\lib\pyspark.zip\pyspark\sql\types.py, in fromInternal: Line 207: return datetime.datetime.fromtimestamp(ts // 100).replace(microsecond=ts % 100) OSError: [Errno 22] Invalid argument ``` After updating the code, the above code was run successfully! ```bash +---+---+ | id| date| +---+---+ | a|1957-01-08 16:00:00| | b|2014-01-26 16:00:00| +---+---+ root |-- id: string (nullable = true) |-- date: timestamp (nullable = true) [Row(id='a', date=datetime.datetime(1957, 1, 9, 0, 0)), Row(id='b', date=datetime.datetime(2014, 1, 27, 0, 0))] ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New and existing test suites -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on pull request #36445: [SPARK-39096][SQL] Support MERGE commands with DEFAULT values
gengliangwang commented on PR #36445: URL: https://github.com/apache/spark/pull/36445#issuecomment-1127454108 Thanks, merging to master! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang closed pull request #36445: [SPARK-39096][SQL] Support MERGE commands with DEFAULT values
gengliangwang closed pull request #36445: [SPARK-39096][SQL] Support MERGE commands with DEFAULT values URL: https://github.com/apache/spark/pull/36445 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on pull request #36557: [SPARK-39190][SQL] Provide query context for decimal precision overflow error when WSCG is off
gengliangwang commented on PR #36557: URL: https://github.com/apache/spark/pull/36557#issuecomment-1127456145 Merging to master/3.3 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang closed pull request #36557: [SPARK-39190][SQL] Provide query context for decimal precision overflow error when WSCG is off
gengliangwang closed pull request #36557: [SPARK-39190][SQL] Provide query context for decimal precision overflow error when WSCG is off URL: https://github.com/apache/spark/pull/36557 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
beliefer commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r873551016 ## sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/ScanBuilder.java: ## @@ -23,7 +23,7 @@ * An interface for building the {@link Scan}. Implementations can mixin SupportsPushDownXYZ * interfaces to do operator push down, and keep the operator push down result in the returned * {@link Scan}. When pushing down operators, the push down order is: - * sample -> filter -> aggregate -> limit -> column pruning. + * sample -> filter -> aggregate -> offset -> limit or top N -> column pruning. Review Comment: OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] beliefer commented on a diff in pull request #36295: [SPARK-38978][SQL] Support push down OFFSET to JDBC data source V2
beliefer commented on code in PR #36295: URL: https://github.com/apache/spark/pull/36295#discussion_r873554764 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala: ## @@ -419,6 +420,30 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper wit } } + private def pushDownOffset(plan: LogicalPlan, offset: Int): (LogicalPlan, Boolean) = plan match { +case operation @ ScanOperation(_, filter, sHolder: ScanBuilderHolder) if filter.isEmpty => + val isPushed = PushDownUtils.pushOffset(sHolder.builder, offset) + if (isPushed) { +sHolder.pushedOffset = Some(offset) + } + (operation, isPushed) +case p: Project => + val (newChild, isPushed) = pushDownOffset(p.child, offset) + (p.withNewChildren(Seq(newChild)), isPushed) +case other => (other, false) + } + + def pushDownOffsets(plan: LogicalPlan): LogicalPlan = plan.transform { +// TODO supports push down Limit append Offset or Offset append Limit +case offset @ Offset(IntegerLiteral(n), child) => Review Comment: OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on pull request #36562: [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources
gengliangwang commented on PR #36562: URL: https://github.com/apache/spark/pull/36562#issuecomment-1127484622 I will upload the rengenerated benchmark results for json later, which takes more than 1 hour. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on a diff in pull request #36562: [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources
gengliangwang commented on code in PR #36562: URL: https://github.com/apache/spark/pull/36562#discussion_r873559626 ## sql/core/benchmarks/CSVBenchmark-results.txt: ## @@ -2,66 +2,66 @@ Benchmark to measure CSV read/write performance -OpenJDK 64-Bit Server VM 1.8.0_322-b06 on Linux 5.13.0-1021-azure -Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 12.2.1 +Apple M1 Pro Parsing quoted values:Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -One quoted string 41610 42902 1598 0.0 832194.2 1.0X +One quoted string 16964 16981 15 0.0 339281.1 1.0X Review Comment: Not directly related to this PR, but m1 chips are so fast! cc @dbtsai @dongjoon-hyun @viirya @sunchao @huaxingao -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] EnricoMi commented on pull request #35965: [SPARK-38647][SQL] Add SupportsReportOrdering mix in interface for Scan (DataSourceV2)
EnricoMi commented on PR #35965: URL: https://github.com/apache/spark/pull/35965#issuecomment-1127488590 @sunchao @HyukjinKwon @aokolnychyi @cloud-fan I have addressed comments and rebased. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] EnricoMi commented on pull request #36150: [SPARK-38864][SQL] Add melt / unpivot to Dataset
EnricoMi commented on PR #36150: URL: https://github.com/apache/spark/pull/36150#issuecomment-1127495090 @HyukjinKwon @awdavidson @aray I have addressed all comments, rebased and removed the `[WIP]`. Please let me know what you think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36530: [SPARK-39172][SQL] Remove outer join if all output come from streamed side and buffered side keys exist unique key
cloud-fan commented on code in PR #36530: URL: https://github.com/apache/spark/pull/36530#discussion_r873576622 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala: ## @@ -462,6 +462,7 @@ package object dsl { Window(windowExpressions, partitionSpec, orderSpec, logicalPlan) def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, logicalPlan) Review Comment: I don't know why do we need to accept a `Symbol` here. We can probably do a cleanup later and remove this method. The same to `def as(alias: Symbol): NamedExpression = Alias(expr, alias.name)()` in this file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36530: [SPARK-39172][SQL] Remove outer join if all output come from streamed side and buffered side keys exist unique key
cloud-fan commented on code in PR #36530: URL: https://github.com/apache/spark/pull/36530#discussion_r873577805 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala: ## @@ -268,4 +268,59 @@ class OuterJoinEliminationSuite extends PlanTest { comparePlans(optimized, originalQuery.analyze) } + + test("SPARK-39172: Remove left outer join if only left-side columns being selected and " + Review Comment: Remove left outer join if only left-side columns are selected and the join keys on the other side are unique. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36530: [SPARK-39172][SQL] Remove outer join if all output come from streamed side and buffered side keys exist unique key
cloud-fan commented on code in PR #36530: URL: https://github.com/apache/spark/pull/36530#discussion_r873578814 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala: ## @@ -268,4 +268,59 @@ class OuterJoinEliminationSuite extends PlanTest { comparePlans(optimized, originalQuery.analyze) } + + test("SPARK-39172: Remove left outer join if only left-side columns being selected and " + Review Comment: The PR title can be `Remove left/right outer join if only left/right side columns are selected and the join keys on the other side are unique` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #36530: [SPARK-39172][SQL] Remove outer join if all output come from streamed side and buffered side keys exist unique key
cloud-fan commented on code in PR #36530: URL: https://github.com/apache/spark/pull/36530#discussion_r873577805 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala: ## @@ -268,4 +268,59 @@ class OuterJoinEliminationSuite extends PlanTest { comparePlans(optimized, originalQuery.analyze) } + + test("SPARK-39172: Remove left outer join if only left-side columns being selected and " + Review Comment: Remove left/right outer join if only left/right side columns are selected and the join keys on the other side are unique. ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala: ## @@ -268,4 +268,59 @@ class OuterJoinEliminationSuite extends PlanTest { comparePlans(optimized, originalQuery.analyze) } + + test("SPARK-39172: Remove left outer join if only left-side columns being selected and " + Review Comment: The same to PR title -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a diff in pull request #36530: [SPARK-39172][SQL] Remove outer join if all output come from streamed side and buffered side keys exist unique key
ulysses-you commented on code in PR #36530: URL: https://github.com/apache/spark/pull/36530#discussion_r873584312 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala: ## @@ -462,6 +462,7 @@ package object dsl { Window(windowExpressions, partitionSpec, orderSpec, logicalPlan) def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, logicalPlan) Review Comment: I looked up the history, it is added at the beginning of SQL .. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a diff in pull request #36530: [SPARK-39172][SQL] Remove outer join if all output come from streamed side and buffered side keys exist unique key
ulysses-you commented on code in PR #36530: URL: https://github.com/apache/spark/pull/36530#discussion_r873584492 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala: ## @@ -462,6 +462,7 @@ package object dsl { Window(windowExpressions, partitionSpec, orderSpec, logicalPlan) def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, logicalPlan) Review Comment: sure, will do a cleanup later -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] AnywalkerGiser commented on pull request #36566: [SPARK-39176][PYSPARK] Fixed a problem with pyspark serializing pre-1970 datetime in windows
AnywalkerGiser commented on PR #36566: URL: https://github.com/apache/spark/pull/36566#issuecomment-1127516999 @HyukjinKwon I closed the RP in the 3.0 branch(https://github.com/apache/spark/pull/36537) and raised a new RP in the master branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on a diff in pull request #36562: [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources
gengliangwang commented on code in PR #36562: URL: https://github.com/apache/spark/pull/36562#discussion_r873559626 ## sql/core/benchmarks/CSVBenchmark-results.txt: ## @@ -2,66 +2,66 @@ Benchmark to measure CSV read/write performance -OpenJDK 64-Bit Server VM 1.8.0_322-b06 on Linux 5.13.0-1021-azure -Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 12.2.1 +Apple M1 Pro Parsing quoted values:Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -One quoted string 41610 42902 1598 0.0 832194.2 1.0X +One quoted string 16964 16981 15 0.0 339281.1 1.0X Review Comment: Not directly related to this PR, but the M1 Macbook is so fast! cc @dbtsai @dongjoon-hyun @viirya @sunchao @huaxingao -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on pull request #36562: [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources
gengliangwang commented on PR #36562: URL: https://github.com/apache/spark/pull/36562#issuecomment-1127600354 @MaxGekk Actually the benchmark doesn't show a significant improvement for the timestamp inference, since the input set are all valid timestamp strings -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a diff in pull request #36562: [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources
MaxGekk commented on code in PR #36562: URL: https://github.com/apache/spark/pull/36562#discussion_r873663460 ## sql/core/benchmarks/CSVBenchmark-results.txt: ## @@ -2,66 +2,66 @@ Benchmark to measure CSV read/write performance -OpenJDK 64-Bit Server VM 1.8.0_322-b06 on Linux 5.13.0-1021-azure -Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 12.2.1 +Apple M1 Pro Parsing quoted values:Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -One quoted string 41610 42902 1598 0.0 832194.2 1.0X +One quoted string 16964 16981 15 0.0 339281.1 1.0X Review Comment: > the M1 Macbook is so fast! You should use a GA for the benchmark. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on pull request #36562: [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources
MaxGekk commented on PR #36562: URL: https://github.com/apache/spark/pull/36562#issuecomment-1127610949 > Actually the benchmark doesn't show a significant improvement for the timestamp inference ok. Could you open an JIRA to add new benchmarks for CSV/JSON. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on pull request #36562: [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources
gengliangwang commented on PR #36562: URL: https://github.com/apache/spark/pull/36562#issuecomment-1127654867 @MaxGekk So I will revert the benchmark results in this one. There will be regenerated results in the new benchmark PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on a diff in pull request #36562: [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources
gengliangwang commented on code in PR #36562: URL: https://github.com/apache/spark/pull/36562#discussion_r873709851 ## sql/core/benchmarks/CSVBenchmark-results.txt: ## @@ -2,66 +2,66 @@ Benchmark to measure CSV read/write performance -OpenJDK 64-Bit Server VM 1.8.0_322-b06 on Linux 5.13.0-1021-azure -Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 12.2.1 +Apple M1 Pro Parsing quoted values:Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -One quoted string 41610 42902 1598 0.0 832194.2 1.0X +One quoted string 16964 16981 15 0.0 339281.1 1.0X Review Comment: OK I will try it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on a diff in pull request #36562: [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources
gengliangwang commented on code in PR #36562: URL: https://github.com/apache/spark/pull/36562#discussion_r873559626 ## sql/core/benchmarks/CSVBenchmark-results.txt: ## @@ -2,66 +2,66 @@ Benchmark to measure CSV read/write performance -OpenJDK 64-Bit Server VM 1.8.0_322-b06 on Linux 5.13.0-1021-azure -Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 12.2.1 +Apple M1 Pro Parsing quoted values:Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -One quoted string 41610 42902 1598 0.0 832194.2 1.0X +One quoted string 16964 16981 15 0.0 339281.1 1.0X Review Comment: Not directly related to this PR, but the M1 Macbook is so fast! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] panbingkun commented on pull request #36561: [SPARK-37939][SQL] Use error classes in the parsing errors of properties
panbingkun commented on PR #36561: URL: https://github.com/apache/spark/pull/36561#issuecomment-1127713688 pinging @MaxGekk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dcoliversun opened a new pull request, #36567: [SPARK-39196][CORE][SQL][K8S] replace `getOrElse(null)` with `orNull`
dcoliversun opened a new pull request, #36567: URL: https://github.com/apache/spark/pull/36567 ### What changes were proposed in this pull request? This PR aims to replace `getOrElse(null)` with `orNull`. ### Why are the changes needed? Code simplification. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the GA. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on a diff in pull request #36496: [SPARK-39104][SQL] Synchronize isCachedColumnBuffersLoaded to avoid concurrency issue
pan3793 commented on code in PR #36496: URL: https://github.com/apache/spark/pull/36496#discussion_r873803127 ## sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala: ## @@ -238,7 +238,12 @@ case class CachedRDDBuilder( } def isCachedColumnBuffersLoaded: Boolean = { -_cachedColumnBuffers != null && isCachedRDDLoaded +if (_cachedColumnBuffers != null) { + _cachedColumnBuffers.synchronized { +return _cachedColumnBuffers != null && isCachedRDDLoaded + } +} +false } def isCachedRDDLoaded: Boolean = { Review Comment: After second thought, I think it doesn't make sense to add `synchronized` block in `isCacheRDDLoaded`, becuase we need to check `_cachedColumnBuffers != null` again and that make `isCacheRDDLoaded` as same as `isCachedColumnBuffersLoaded`. Alternatively, I change `isCacheRDDLoaded` to private to avoid accident un-thread-safe invokes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on pull request #36496: [SPARK-39104][SQL] Synchronize isCachedColumnBuffersLoaded to avoid concurrency issue
pan3793 commented on PR #36496: URL: https://github.com/apache/spark/pull/36496#issuecomment-1127755192 > Is there a chance to add a new test? Sorry I missed this comment, added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen commented on pull request #36499: [SPARK-38846][SQL] Add explicit data mapping between Teradata Numeric Type and Spark DecimalType
srowen commented on PR #36499: URL: https://github.com/apache/spark/pull/36499#issuecomment-1127758808 I don't know anything about teradata - is it documented that this should be the result, and is it specific to teradata? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on a diff in pull request #36485: [SPARK-39128][SQL][HIVE] Log cost time for getting FileStatus in HadoopTableReader
pan3793 commented on code in PR #36485: URL: https://github.com/apache/spark/pull/36485#discussion_r873808592 ## sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala: ## @@ -175,7 +175,8 @@ class HadoopTableReader( def updateExistPathSetByPathPattern(pathPatternStr: String): Unit = { val pathPattern = new Path(pathPatternStr) val fs = pathPattern.getFileSystem(hadoopConf) - val matches = fs.globStatus(pathPattern) + val (matches, timeCost) = Utils.timeTakenMs(fs.globStatus(pathPattern)) + logInfo(s"Get FileStatus for path '$pathPatternStr' costs $timeCost ms.") Review Comment: @srowen Do you have any concerns before merging it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen commented on a diff in pull request #36485: [SPARK-39128][SQL][HIVE] Log cost time for getting FileStatus in HadoopTableReader
srowen commented on code in PR #36485: URL: https://github.com/apache/spark/pull/36485#discussion_r873811257 ## sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala: ## @@ -175,7 +175,8 @@ class HadoopTableReader( def updateExistPathSetByPathPattern(pathPatternStr: String): Unit = { val pathPattern = new Path(pathPatternStr) val fs = pathPattern.getFileSystem(hadoopConf) - val matches = fs.globStatus(pathPattern) + val (matches, timeCost) = Utils.timeTakenMs(fs.globStatus(pathPattern)) + logInfo(s"Get FileStatus for path '$pathPatternStr' costs $timeCost ms.") Review Comment: I'm neutral, would not merge it but do not object to it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on a diff in pull request #36485: [SPARK-39128][SQL][HIVE] Log cost time for getting FileStatus in HadoopTableReader
pan3793 commented on code in PR #36485: URL: https://github.com/apache/spark/pull/36485#discussion_r873816933 ## sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala: ## @@ -175,7 +175,8 @@ class HadoopTableReader( def updateExistPathSetByPathPattern(pathPatternStr: String): Unit = { val pathPattern = new Path(pathPatternStr) val fs = pathPattern.getFileSystem(hadoopConf) - val matches = fs.globStatus(pathPattern) + val (matches, timeCost) = Utils.timeTakenMs(fs.globStatus(pathPattern)) + logInfo(s"Get FileStatus for path '$pathPatternStr' costs $timeCost ms.") Review Comment: emm -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] LuciferYang commented on a diff in pull request #36496: [SPARK-39104][SQL] InMemoryRelation#isCachedColumnBuffersLoaded should be thread-safe
LuciferYang commented on code in PR #36496: URL: https://github.com/apache/spark/pull/36496#discussion_r873817057 ## sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala: ## @@ -563,4 +563,33 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSparkSession { } } } + + test("SPARK-39104: InMemoryRelation#isCachedColumnBuffersLoaded should be thread-safe") { Review Comment: I tested this UT manually, seems it can't reproduce the issue? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on pull request #36485: [SPARK-39128][SQL][HIVE] Log cost time for getting FileStatus in HadoopTableReader
pan3793 commented on PR #36485: URL: https://github.com/apache/spark/pull/36485#issuecomment-1127767757 cc @yaooqinn would you please take a look if you have time? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gengliangwang commented on pull request #36562: [SPARK-39193][SQL] Fasten Timestamp type inference of JSON/CSV data sources
gengliangwang commented on PR #36562: URL: https://github.com/apache/spark/pull/36562#issuecomment-1127883660 FYI I will fix the test failures tomorrow. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] huaxingao commented on a diff in pull request #36556: [SPARK-39162][SQL][3.3] Jdbc dialect should decide which function could be pushed down
huaxingao commented on code in PR #36556: URL: https://github.com/apache/spark/pull/36556#discussion_r873942747 ## sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala: ## @@ -240,8 +240,27 @@ abstract class JdbcDialect extends Serializable with Logging{ getJDBCType(dataType).map(_.databaseTypeDefinition).getOrElse(dataType.typeName) s"CAST($l AS $databaseTypeDefinition)" } + +override def visitSQLFunction(funcName: String, inputs: Array[String]): String = { + if (isSupportedFunction(funcName)) { +s"""$funcName(${inputs.mkString(", ")})""" + } else { +// The framework will catch the error and give up the push-down. +// Please see `JdbcDialect.compileExpression(expr: Expression)` for more details. +throw new UnsupportedOperationException( + s"${this.getClass.getSimpleName} does not support function: $funcName") + } +} } + /** + * Returns whether the database supports function. + * @param funcName Upper-cased function name + * @return True if the database supports function. + */ + @Since("3.3.0") Review Comment: Let's hope this can still make RC2. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dtenedor commented on a diff in pull request #36527: [SPARK-39169][SQL] Optimize FIRST when used as a single aggregate fun…
dtenedor commented on code in PR #36527: URL: https://github.com/apache/spark/pull/36527#discussion_r873944972 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNonAggregateFirstSuite.scala: ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Max} +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical._ + +class RewriteNonAggregateFirstSuite extends PlanTest { + val testRelation: LocalRelation = LocalRelation($"a".string, $"b".string) + + private def checkRewrite(rewrite: LogicalPlan): Unit = rewrite match { +case Aggregate(_, _, GlobalLimit(_, _)) => +case _ => fail(s"Plan is not rewritten:\n$rewrite") + } + + test("single FIRST aggregate and no group by") { +val input = testRelation.select( + First($"a", ignoreNulls = false).toAggregateExpression()).analyze +val rewrite = RewriteNonAggregateFirst(input.analyze) +checkRewrite(rewrite) + } + + test("multiple FIRST aggregates and no group by") { +val input = testRelation.select( + First($"a", ignoreNulls = false).toAggregateExpression(), + First($"b", ignoreNulls = false).toAggregateExpression()).analyze +val rewrite = RewriteNonAggregateFirst(input.analyze) +checkRewrite(rewrite) + } + + test("ignoreNulls set to true blocks rewrite") { +val input = testRelation.select( + First($"a", ignoreNulls = false).toAggregateExpression(), + First($"b", ignoreNulls = true).toAggregateExpression()).analyze +val rewrite = RewriteNonAggregateFirst(input.analyze) +comparePlans(input, rewrite) + } + + test("FIRST aggregate with group by") { +val input = testRelation + .groupBy($"a")(First($"a", ignoreNulls = false).toAggregateExpression()) + .analyze +val rewrite = RewriteNonAggregateFirst(input) +comparePlans(input, rewrite) + } + + test("mixed aggregates with group by") { +val input = testRelation + .groupBy('a)( +First($"a", ignoreNulls = false).toAggregateExpression().as('agg1), +Max($"b").toAggregateExpression().as('agg2)) + .analyze +val rewrite = RewriteNonAggregateFirst(input) +comparePlans(input, rewrite) + } + + test("mixed aggregates without group by") { Review Comment: Let's also add a test case like the one Bart described, where the input is empty? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] xinrong-databricks commented on pull request #36547: Implement `skipna` parameter of `Groupby.all`
xinrong-databricks commented on PR #36547: URL: https://github.com/apache/spark/pull/36547#issuecomment-1127916884 @bjornjorgensen We may want to reach parity with pandas https://github.com/pandas-dev/pandas/blob/v1.4.2/pandas/core/groupby/groupby.py#L1810-L1828. I may not see how `all_to_skip` saves us from defining `all`. Would you elaborate? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on pull request #36530: [SPARK-39172][SQL] Remove left/right outer join if only left/right side columns are selected and the join keys on the other side are unique
cloud-fan commented on PR #36530: URL: https://github.com/apache/spark/pull/36530#issuecomment-1127920799 thanks, merging to master! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan closed pull request #36530: [SPARK-39172][SQL] Remove left/right outer join if only left/right side columns are selected and the join keys on the other side are unique
cloud-fan closed pull request #36530: [SPARK-39172][SQL] Remove left/right outer join if only left/right side columns are selected and the join keys on the other side are unique URL: https://github.com/apache/spark/pull/36530 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] xinrong-databricks commented on a diff in pull request #36547: Implement `skipna` parameter of `Groupby.all`
xinrong-databricks commented on code in PR #36547: URL: https://github.com/apache/spark/pull/36547#discussion_r873967446 ## python/pyspark/pandas/groupby.py: ## @@ -2879,7 +2898,43 @@ def _reduce_for_stat_function( psdf = psdf.reset_index(level=should_drop_index, drop=True) if len(should_drop_index) < len(self._groupkeys): psdf = psdf.reset_index() -return self._cleanup_and_return(psdf) +psdf = self._cleanup_and_return(psdf) +return psdf + +def _prepare_reduce( Review Comment: Code of `_prepare_reduce` is extracted from `_reduce_for_stat_function`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] xinrong-databricks commented on a diff in pull request #36547: Implement `skipna` parameter of `Groupby.all`
xinrong-databricks commented on code in PR #36547: URL: https://github.com/apache/spark/pull/36547#discussion_r873968124 ## python/pyspark/pandas/groupby.py: ## @@ -2862,6 +2879,9 @@ def _reduce_for_stat_function( ) psdf = DataFrame(internal) +return self._prepare_return(psdf) + +def _prepare_return(self, psdf: DataFrame) -> DataType: Review Comment: Code of `_prepare_return` is extracted from `_reduce_for_stat_function`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on pull request #36541: [SPARK-39180][SQL] Simplify the planning of limit and offset
cloud-fan commented on PR #36541: URL: https://github.com/apache/spark/pull/36541#issuecomment-1127922415 thanks for the review, merging to master! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan closed pull request #36541: [SPARK-39180][SQL] Simplify the planning of limit and offset
cloud-fan closed pull request #36541: [SPARK-39180][SQL] Simplify the planning of limit and offset URL: https://github.com/apache/spark/pull/36541 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] MaxGekk commented on a diff in pull request #36561: [SPARK-37939][SQL] Use error classes in the parsing errors of properties
MaxGekk commented on code in PR #36561: URL: https://github.com/apache/spark/pull/36561#discussion_r874007811 ## sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala: ## @@ -267,16 +267,26 @@ object QueryParsingErrors extends QueryErrorsBase { def cannotCleanReservedNamespacePropertyError( property: String, ctx: ParserRuleContext, msg: String): Throwable = { -new ParseException(s"$property is a reserved namespace property, $msg.", ctx) +new ParseException( + errorClass = "UNSUPPORTED_FEATURE", + messageParameters = Array("CLEAN_RESERVED_NAMESPACE_PROPERTY", s"$property", s"$msg"), Review Comment: ```suggestion messageParameters = Array("CLEAN_RESERVED_NAMESPACE_PROPERTY", property, msg), ``` ## sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala: ## @@ -267,16 +267,26 @@ object QueryParsingErrors extends QueryErrorsBase { def cannotCleanReservedNamespacePropertyError( property: String, ctx: ParserRuleContext, msg: String): Throwable = { -new ParseException(s"$property is a reserved namespace property, $msg.", ctx) +new ParseException( + errorClass = "UNSUPPORTED_FEATURE", + messageParameters = Array("CLEAN_RESERVED_NAMESPACE_PROPERTY", s"$property", s"$msg"), + ctx) } def propertiesAndDbPropertiesBothSpecifiedError(ctx: CreateNamespaceContext): Throwable = { -new ParseException("Either PROPERTIES or DBPROPERTIES is allowed.", ctx) +new ParseException( + errorClass = "UNSUPPORTED_FEATURE", + messageParameters = Array("PROPERTIES_AND_DBPROPERTIES_BOTH_SPECIFIED_CONFLICT"), + ctx +) } def cannotCleanReservedTablePropertyError( property: String, ctx: ParserRuleContext, msg: String): Throwable = { -new ParseException(s"$property is a reserved table property, $msg.", ctx) +new ParseException( + errorClass = "UNSUPPORTED_FEATURE", + messageParameters = Array("CLEAN_RESERVED_TABLE_PROPERTY", s"$property", s"$msg"), Review Comment: ```suggestion messageParameters = Array("CLEAN_RESERVED_TABLE_PROPERTY", property, msg), ``` ## sql/core/src/test/scala/org/apache/spark/sql/errors/QueryParsingErrorsSuite.scala: ## @@ -642,4 +642,92 @@ class QueryParsingErrorsSuite extends QueryTest with QueryErrorsSuiteBase { |^^^ |""".stripMargin) } + + test("UNSUPPORTED_FEATURE: cannot clean reserved namespace property") { +val sql = "CREATE NAMESPACE IF NOT EXISTS a.b.c WITH PROPERTIES ('location'='/home/user/db')" +validateParsingError( + sqlText = sql, + errorClass = "UNSUPPORTED_FEATURE", + errorSubClass = Some("CLEAN_RESERVED_NAMESPACE_PROPERTY"), Review Comment: I guess, the name of the error class might confuse users. Could you make it simpler CLEAN_RESERVED_NAMESPACE_PROPERTY -> SET_NAMESPACE_PROPERTY. So users should see: ``` [UNSUPPORTED_FEATURE.SET_NAMESPACE_PROPERTY] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vli-databricks commented on a diff in pull request #36527: [SPARK-39169][SQL] Optimize FIRST when used as a single aggregate fun…
vli-databricks commented on code in PR #36527: URL: https://github.com/apache/spark/pull/36527#discussion_r874028670 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteNonAggregateFirstSuite.scala: ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Max} +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical._ + +class RewriteNonAggregateFirstSuite extends PlanTest { + val testRelation: LocalRelation = LocalRelation($"a".string, $"b".string) + + private def checkRewrite(rewrite: LogicalPlan): Unit = rewrite match { +case Aggregate(_, _, GlobalLimit(_, _)) => +case _ => fail(s"Plan is not rewritten:\n$rewrite") + } + + test("single FIRST aggregate and no group by") { +val input = testRelation.select( + First($"a", ignoreNulls = false).toAggregateExpression()).analyze +val rewrite = RewriteNonAggregateFirst(input.analyze) +checkRewrite(rewrite) + } + + test("multiple FIRST aggregates and no group by") { +val input = testRelation.select( + First($"a", ignoreNulls = false).toAggregateExpression(), + First($"b", ignoreNulls = false).toAggregateExpression()).analyze +val rewrite = RewriteNonAggregateFirst(input.analyze) +checkRewrite(rewrite) + } + + test("ignoreNulls set to true blocks rewrite") { +val input = testRelation.select( + First($"a", ignoreNulls = false).toAggregateExpression(), + First($"b", ignoreNulls = true).toAggregateExpression()).analyze +val rewrite = RewriteNonAggregateFirst(input.analyze) +comparePlans(input, rewrite) + } + + test("FIRST aggregate with group by") { +val input = testRelation + .groupBy($"a")(First($"a", ignoreNulls = false).toAggregateExpression()) + .analyze +val rewrite = RewriteNonAggregateFirst(input) +comparePlans(input, rewrite) + } + + test("mixed aggregates with group by") { +val input = testRelation + .groupBy('a)( +First($"a", ignoreNulls = false).toAggregateExpression().as('agg1), +Max($"b").toAggregateExpression().as('agg2)) + .analyze +val rewrite = RewriteNonAggregateFirst(input) +comparePlans(input, rewrite) + } + + test("mixed aggregates without group by") { Review Comment: added in `misc-aggregate.sql` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] pan3793 commented on a diff in pull request #36496: [SPARK-39104][SQL] InMemoryRelation#isCachedColumnBuffersLoaded should be thread-safe
pan3793 commented on code in PR #36496: URL: https://github.com/apache/spark/pull/36496#discussion_r874040979 ## sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala: ## @@ -563,4 +563,33 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSparkSession { } } } + + test("SPARK-39104: InMemoryRelation#isCachedColumnBuffersLoaded should be thread-safe") { Review Comment: I updated the UT and reproduced the issue on local, but I would say it's not easy to reproduce ``` [info] InMemoryColumnarQuerySuite: 01:58:25.558 WARN org.apache.spark.util.Utils: Your hostname, Chengs-Mac-mini.local resolves to a loopback address: 127.0.0.1; using 10.221.96.10 instead (on interface en1) 01:58:25.562 WARN org.apache.spark.util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address 01:58:25.808 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Exception: org.scalatest.exceptions.TestFailedException thrown from the UncaughtExceptionHandler in thread "Thread-10" ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] bjornjorgensen commented on pull request #36547: [SPARK-39197][PYTHON] Implement `skipna` parameter of `GroupBy.all`
bjornjorgensen commented on PR #36547: URL: https://github.com/apache/spark/pull/36547#issuecomment-1128007756 Yes, def `all` is a built in function in python. Spark python codebase has no more than 56 of this now, which can be a problem. Sometimes shading python built-in functions is the right way. And in this case, it's right. But if you look at the code that pandas are using, they [decorate the function](https://github.com/pandas-dev/pandas/blob/v1.4.2/pandas/core/groupby/groupby.py#L1810) with `@final` This is [PEP 591 – Adding a final qualifier to typing](https://peps.python.org/pep-0591/) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] eswardhinakaran-toast opened a new pull request, #36568: Analytics flavor spark
eswardhinakaran-toast opened a new pull request, #36568: URL: https://github.com/apache/spark/pull/36568 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] eswardhinakaran-toast closed pull request #36568: Analytics flavor spark
eswardhinakaran-toast closed pull request #36568: Analytics flavor spark URL: https://github.com/apache/spark/pull/36568 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] xinrong-databricks commented on pull request #36547: [SPARK-39197][PYTHON] Implement `skipna` parameter of `GroupBy.all`
xinrong-databricks commented on PR #36547: URL: https://github.com/apache/spark/pull/36547#issuecomment-1128134042 The comment on PySpark's shading python built-in functions really makes sense. The PEP targets Python 3.8 so it seems fine to introduce it to the master branch. May I get some opinions on that? FYI @ueshin @HyukjinKwon @itholic @zero323 @Yikun @zhengruifeng -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ueshin commented on pull request #36547: [SPARK-39197][PYTHON] Implement `skipna` parameter of `GroupBy.all`
ueshin commented on PR #36547: URL: https://github.com/apache/spark/pull/36547#issuecomment-1128165133 I'm afraid I'm not sure the point here. I don't think methods affect the builtin functions: ```py >>> class A: ... def all(self): ... print("all") ... >>> >>> A().all() all >>> all >>> all() Traceback (most recent call last): File "", line 1, in TypeError: all() takes exactly one argument (0 given) >>> all([True]) True >>> all([False]) False ``` Also I'm not sure how `@final` helps to avoid shading issue. Btw, > The PEP targets Python 3.8 so it seems fine to introduce it to the master branch. We are still supporting Python 3.7. https://github.com/apache/spark/blob/47d237c74ccb1836e3de82dc583499ffd3f25755/python/setup.py#L276 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] xinrong-databricks opened a new pull request, #36569: Implement `ignore_index` of `DataFrame.explode` and `DataFrame.drop_duplicates`
xinrong-databricks opened a new pull request, #36569: URL: https://github.com/apache/spark/pull/36569 ### What changes were proposed in this pull request? Implement `ignore_index` of `DataFrame.explode` and `DataFrame.drop_duplicates`. ### Why are the changes needed? Increase pandas API coverage. ### Does this PR introduce _any_ user-facing change? Yes. `ignore_index` of `DataFrame.explode` and `DataFrame.drop_duplicates` is supported as below. ### How was this patch tested? Unit tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen closed pull request #36544: [SPARK-39183][BUILD] Upgrade Apache Xerces Java to 2.12.2
srowen closed pull request #36544: [SPARK-39183][BUILD] Upgrade Apache Xerces Java to 2.12.2 URL: https://github.com/apache/spark/pull/36544 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] srowen commented on pull request #36544: [SPARK-39183][BUILD] Upgrade Apache Xerces Java to 2.12.2
srowen commented on PR #36544: URL: https://github.com/apache/spark/pull/36544#issuecomment-1128225167 Merged to master/3.3/3.2 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] warrenzhu25 commented on a diff in pull request #35498: [SPARK-34777][UI] StagePage input/output size records not show when r…
warrenzhu25 commented on code in PR #35498: URL: https://github.com/apache/spark/pull/35498#discussion_r874254004 ## core/src/main/resources/org/apache/spark/ui/static/stagepage.js: ## @@ -404,8 +404,8 @@ $(document).ready(function () { var responseBody = response; var dataToShow = {}; -dataToShow.showInputData = responseBody.inputBytes > 0; -dataToShow.showOutputData = responseBody.outputBytes > 0; +dataToShow.showInputData = responseBody.inputBytes > 0 || responseBody.inputRecords > 0 ; +dataToShow.showOutputData = responseBody.outputBytes > 0 || responseBody.outputRecords > 0; Review Comment: This can be easily reproduced by any spark structured streaming job reading from Kafka source. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] commented on pull request #35342: [SPARK-38043][SQL] Refactor FileBasedDataSourceSuite and add DataSourceSuite for each data source
github-actions[bot] commented on PR #35342: URL: https://github.com/apache/spark/pull/35342#issuecomment-1128264145 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] github-actions[bot] commented on pull request #34359: [SPARK-36986][SQL] Improving schema filtering flexibility
github-actions[bot] commented on PR #34359: URL: https://github.com/apache/spark/pull/34359#issuecomment-1128264166 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org