[jira] [Assigned] (SPARK-26370) Fix resolution of higher-order function for the same identifier.
[ https://issues.apache.org/jira/browse/SPARK-26370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26370: Assignee: (was: Apache Spark) > Fix resolution of higher-order function for the same identifier. > > > Key: SPARK-26370 > URL: https://issues.apache.org/jira/browse/SPARK-26370 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Takuya Ueshin >Priority: Major > > When using a higher-order function with the same variable name as the > existing columns in {{Filter}} or something which uses > {{Analyzer.resolveExpressionBottomUp}} during the resolution, e.g.,: > {code} > val df = Seq( > (Seq(1, 9, 8, 7), 1, 2), > (Seq(5, 9, 7), 2, 2), > (Seq.empty, 3, 2), > (null, 4, 2) > ).toDF("i", "x", "d") > checkAnswer(df.filter("exists(i, x -> x % d == 0)"), > Seq(Row(Seq(1, 9, 8, 7), 1, 2))) > checkAnswer(df.select("x").filter("exists(i, x -> x % d == 0)"), > Seq(Row(1))) > {code} > the following exception happens: > {code:java} > java.lang.ClassCastException: > org.apache.spark.sql.catalyst.expressions.BoundReference cannot be cast to > org.apache.spark.sql.catalyst.expressions.NamedExpression > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237) > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at scala.collection.TraversableLike.map(TraversableLike.scala:237) > at scala.collection.TraversableLike.map$(TraversableLike.scala:230) > at scala.collection.AbstractTraversable.map(Traversable.scala:108) > at > org.apache.spark.sql.catalyst.expressions.HigherOrderFunction.$anonfun$functionsForEval$1(higherOrderFunctions.scala:147) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237) > at scala.collection.immutable.List.foreach(List.scala:392) > at scala.collection.TraversableLike.map(TraversableLike.scala:237) > at scala.collection.TraversableLike.map$(TraversableLike.scala:230) > at scala.collection.immutable.List.map(List.scala:298) > at > org.apache.spark.sql.catalyst.expressions.HigherOrderFunction.functionsForEval(higherOrderFunctions.scala:145) > at > org.apache.spark.sql.catalyst.expressions.HigherOrderFunction.functionsForEval$(higherOrderFunctions.scala:145) > at > org.apache.spark.sql.catalyst.expressions.ArrayExists.functionsForEval$lzycompute(higherOrderFunctions.scala:369) > at > org.apache.spark.sql.catalyst.expressions.ArrayExists.functionsForEval(higherOrderFunctions.scala:369) > at > org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.functionForEval(higherOrderFunctions.scala:176) > at > org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.functionForEval$(higherOrderFunctions.scala:176) > at > org.apache.spark.sql.catalyst.expressions.ArrayExists.functionForEval(higherOrderFunctions.scala:369) > at > org.apache.spark.sql.catalyst.expressions.ArrayExists.nullSafeEval(higherOrderFunctions.scala:387) > at > org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.eval(higherOrderFunctions.scala:190) > at > org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.eval$(higherOrderFunctions.scala:185) > at > org.apache.spark.sql.catalyst.expressions.ArrayExists.eval(higherOrderFunctions.scala:369) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown > Source) > at > org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3(basicPhysicalOperators.scala:216) > at > org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3$adapted(basicPhysicalOperators.scala:215) > ... > {code} > because the {{UnresolvedAttribute}} s in {{LambdaFunction}} are unexpectedly > resolved by the rule. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26370) Fix resolution of higher-order function for the same identifier.
[ https://issues.apache.org/jira/browse/SPARK-26370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26370: Assignee: Apache Spark > Fix resolution of higher-order function for the same identifier. > > > Key: SPARK-26370 > URL: https://issues.apache.org/jira/browse/SPARK-26370 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Takuya Ueshin >Assignee: Apache Spark >Priority: Major > > When using a higher-order function with the same variable name as the > existing columns in {{Filter}} or something which uses > {{Analyzer.resolveExpressionBottomUp}} during the resolution, e.g.,: > {code} > val df = Seq( > (Seq(1, 9, 8, 7), 1, 2), > (Seq(5, 9, 7), 2, 2), > (Seq.empty, 3, 2), > (null, 4, 2) > ).toDF("i", "x", "d") > checkAnswer(df.filter("exists(i, x -> x % d == 0)"), > Seq(Row(Seq(1, 9, 8, 7), 1, 2))) > checkAnswer(df.select("x").filter("exists(i, x -> x % d == 0)"), > Seq(Row(1))) > {code} > the following exception happens: > {code:java} > java.lang.ClassCastException: > org.apache.spark.sql.catalyst.expressions.BoundReference cannot be cast to > org.apache.spark.sql.catalyst.expressions.NamedExpression > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237) > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at scala.collection.TraversableLike.map(TraversableLike.scala:237) > at scala.collection.TraversableLike.map$(TraversableLike.scala:230) > at scala.collection.AbstractTraversable.map(Traversable.scala:108) > at > org.apache.spark.sql.catalyst.expressions.HigherOrderFunction.$anonfun$functionsForEval$1(higherOrderFunctions.scala:147) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237) > at scala.collection.immutable.List.foreach(List.scala:392) > at scala.collection.TraversableLike.map(TraversableLike.scala:237) > at scala.collection.TraversableLike.map$(TraversableLike.scala:230) > at scala.collection.immutable.List.map(List.scala:298) > at > org.apache.spark.sql.catalyst.expressions.HigherOrderFunction.functionsForEval(higherOrderFunctions.scala:145) > at > org.apache.spark.sql.catalyst.expressions.HigherOrderFunction.functionsForEval$(higherOrderFunctions.scala:145) > at > org.apache.spark.sql.catalyst.expressions.ArrayExists.functionsForEval$lzycompute(higherOrderFunctions.scala:369) > at > org.apache.spark.sql.catalyst.expressions.ArrayExists.functionsForEval(higherOrderFunctions.scala:369) > at > org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.functionForEval(higherOrderFunctions.scala:176) > at > org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.functionForEval$(higherOrderFunctions.scala:176) > at > org.apache.spark.sql.catalyst.expressions.ArrayExists.functionForEval(higherOrderFunctions.scala:369) > at > org.apache.spark.sql.catalyst.expressions.ArrayExists.nullSafeEval(higherOrderFunctions.scala:387) > at > org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.eval(higherOrderFunctions.scala:190) > at > org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.eval$(higherOrderFunctions.scala:185) > at > org.apache.spark.sql.catalyst.expressions.ArrayExists.eval(higherOrderFunctions.scala:369) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown > Source) > at > org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3(basicPhysicalOperators.scala:216) > at > org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3$adapted(basicPhysicalOperators.scala:215) > ... > {code} > because the {{UnresolvedAttribute}} s in {{LambdaFunction}} are unexpectedly > resolved by the rule. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26370) Fix resolution of higher-order function for the same identifier.
[ https://issues.apache.org/jira/browse/SPARK-26370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721025#comment-16721025 ] ASF GitHub Bot commented on SPARK-26370: ueshin opened a new pull request #23320: [SPARK-26370][SQL] Fix resolution of higher-order function for the same identifier. URL: https://github.com/apache/spark/pull/23320 ## What changes were proposed in this pull request? When using a higher-order function with the same variable name as the existing columns in `Filter` or something which uses `Analyzer.resolveExpressionBottomUp` during the resolution, e.g.,: ```scala val df = Seq( (Seq(1, 9, 8, 7), 1, 2), (Seq(5, 9, 7), 2, 2), (Seq.empty, 3, 2), (null, 4, 2) ).toDF("i", "x", "d") checkAnswer(df.filter("exists(i, x -> x % d == 0)"), Seq(Row(Seq(1, 9, 8, 7), 1, 2))) checkAnswer(df.select("x").filter("exists(i, x -> x % d == 0)"), Seq(Row(1))) ``` the following exception happens: ``` java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.BoundReference cannot be cast to org.apache.spark.sql.catalyst.expressions.NamedExpression at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at scala.collection.TraversableLike.map(TraversableLike.scala:237) at scala.collection.TraversableLike.map$(TraversableLike.scala:230) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.catalyst.expressions.HigherOrderFunction.$anonfun$functionsForEval$1(higherOrderFunctions.scala:147) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike.map(TraversableLike.scala:237) at scala.collection.TraversableLike.map$(TraversableLike.scala:230) at scala.collection.immutable.List.map(List.scala:298) at org.apache.spark.sql.catalyst.expressions.HigherOrderFunction.functionsForEval(higherOrderFunctions.scala:145) at org.apache.spark.sql.catalyst.expressions.HigherOrderFunction.functionsForEval$(higherOrderFunctions.scala:145) at org.apache.spark.sql.catalyst.expressions.ArrayExists.functionsForEval$lzycompute(higherOrderFunctions.scala:369) at org.apache.spark.sql.catalyst.expressions.ArrayExists.functionsForEval(higherOrderFunctions.scala:369) at org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.functionForEval(higherOrderFunctions.scala:176) at org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.functionForEval$(higherOrderFunctions.scala:176) at org.apache.spark.sql.catalyst.expressions.ArrayExists.functionForEval(higherOrderFunctions.scala:369) at org.apache.spark.sql.catalyst.expressions.ArrayExists.nullSafeEval(higherOrderFunctions.scala:387) at org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.eval(higherOrderFunctions.scala:190) at org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.eval$(higherOrderFunctions.scala:185) at org.apache.spark.sql.catalyst.expressions.ArrayExists.eval(higherOrderFunctions.scala:369) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source) at org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3(basicPhysicalOperators.scala:216) at org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3$adapted(basicPhysicalOperators.scala:215) ... ``` because the `UnresolvedAttribute`s in `LambdaFunction` are unexpectedly resolved by the rule. This pr modified to use a placeholder `UnresolvedNamedLambdaVariable` to prevent unexpected resolution. ## How was this patch tested? Added a test and modified some tests. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix resolution of higher-order function for the same identifier. > > > Key: SPARK-26370 > URL: https://issues.apache.org/jira/browse/SPARK-26370 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Takuya Ueshin >Priority: Major > > When using a higher-order function with the same variable name as the
[jira] [Created] (SPARK-26370) Fix resolution of higher-order function for the same identifier.
Takuya Ueshin created SPARK-26370: - Summary: Fix resolution of higher-order function for the same identifier. Key: SPARK-26370 URL: https://issues.apache.org/jira/browse/SPARK-26370 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.4.0 Reporter: Takuya Ueshin When using a higher-order function with the same variable name as the existing columns in {{Filter}} or something which uses {{Analyzer.resolveExpressionBottomUp}} during the resolution, e.g.,: {code} val df = Seq( (Seq(1, 9, 8, 7), 1, 2), (Seq(5, 9, 7), 2, 2), (Seq.empty, 3, 2), (null, 4, 2) ).toDF("i", "x", "d") checkAnswer(df.filter("exists(i, x -> x % d == 0)"), Seq(Row(Seq(1, 9, 8, 7), 1, 2))) checkAnswer(df.select("x").filter("exists(i, x -> x % d == 0)"), Seq(Row(1))) {code} the following exception happens: {code:java} java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.BoundReference cannot be cast to org.apache.spark.sql.catalyst.expressions.NamedExpression at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at scala.collection.TraversableLike.map(TraversableLike.scala:237) at scala.collection.TraversableLike.map$(TraversableLike.scala:230) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.catalyst.expressions.HigherOrderFunction.$anonfun$functionsForEval$1(higherOrderFunctions.scala:147) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike.map(TraversableLike.scala:237) at scala.collection.TraversableLike.map$(TraversableLike.scala:230) at scala.collection.immutable.List.map(List.scala:298) at org.apache.spark.sql.catalyst.expressions.HigherOrderFunction.functionsForEval(higherOrderFunctions.scala:145) at org.apache.spark.sql.catalyst.expressions.HigherOrderFunction.functionsForEval$(higherOrderFunctions.scala:145) at org.apache.spark.sql.catalyst.expressions.ArrayExists.functionsForEval$lzycompute(higherOrderFunctions.scala:369) at org.apache.spark.sql.catalyst.expressions.ArrayExists.functionsForEval(higherOrderFunctions.scala:369) at org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.functionForEval(higherOrderFunctions.scala:176) at org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.functionForEval$(higherOrderFunctions.scala:176) at org.apache.spark.sql.catalyst.expressions.ArrayExists.functionForEval(higherOrderFunctions.scala:369) at org.apache.spark.sql.catalyst.expressions.ArrayExists.nullSafeEval(higherOrderFunctions.scala:387) at org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.eval(higherOrderFunctions.scala:190) at org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.eval$(higherOrderFunctions.scala:185) at org.apache.spark.sql.catalyst.expressions.ArrayExists.eval(higherOrderFunctions.scala:369) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source) at org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3(basicPhysicalOperators.scala:216) at org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3$adapted(basicPhysicalOperators.scala:215) ... {code} because the {{UnresolvedAttribute}} s in {{LambdaFunction}} are unexpectedly resolved by the rule. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26369) How to limit Spark concurrent tasks number in one job?
[ https://issues.apache.org/jira/browse/SPARK-26369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721010#comment-16721010 ] Fu Chen commented on SPARK-26369: - ok, thank your reply, I found solution in [SPARK-20589|https://issues.apache.org/jira/browse/SPARK-20589] > How to limit Spark concurrent tasks number in one job? > -- > > Key: SPARK-26369 > URL: https://issues.apache.org/jira/browse/SPARK-26369 > Project: Spark > Issue Type: Question > Components: Scheduler >Affects Versions: 2.1.0, 2.2.0, 2.3.2, 2.4.0 >Reporter: Fu Chen >Priority: Major > > Hi All, > it is possible make fair scheduler pools pluggable? so that we can > implement our own SchedulingAlgorithm. In our case, we want to limit the > max tasks number of one job which will load data from mysql database, if we > set a bigger executer.number * cores.number, it will trigger alarm. Or we > can do this in an other way? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-26369) How to limit Spark concurrent tasks number in one job?
[ https://issues.apache.org/jira/browse/SPARK-26369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fu Chen closed SPARK-26369. --- > How to limit Spark concurrent tasks number in one job? > -- > > Key: SPARK-26369 > URL: https://issues.apache.org/jira/browse/SPARK-26369 > Project: Spark > Issue Type: Question > Components: Scheduler >Affects Versions: 2.1.0, 2.2.0, 2.3.2, 2.4.0 >Reporter: Fu Chen >Priority: Major > > Hi All, > it is possible make fair scheduler pools pluggable? so that we can > implement our own SchedulingAlgorithm. In our case, we want to limit the > max tasks number of one job which will load data from mysql database, if we > set a bigger executer.number * cores.number, it will trigger alarm. Or we > can do this in an other way? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26353) Add typed aggregate functions:max&
[ https://issues.apache.org/jira/browse/SPARK-26353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720997#comment-16720997 ] ASF GitHub Bot commented on SPARK-26353: 10110346 closed pull request #23304: [SPARK-26353][SQL]Add typed aggregate functions: max& URL: https://github.com/apache/spark/pull/23304 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala index b6550bf3e4aac..2d08ea3fce6fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala @@ -99,3 +99,71 @@ class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]] } } + +class TypedMaxDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] { + override def zero: Double = Double.MinValue + override def reduce(b: Double, a: IN): Double = if (b > f(a)) b else f(a) + override def merge(b1: Double, b2: Double): Double = if (b1 > b2) b1 else b2 + override def finish(reduction: Double): Double = reduction + + override def bufferEncoder: Encoder[Double] = ExpressionEncoder[Double]() + override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]() + + // Java api support + def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => f.call(x).asInstanceOf[Double]) + + def toColumnJava: TypedColumn[IN, java.lang.Double] = { +toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]] + } +} + +class TypedMaxLong[IN](val f: IN => Long) extends Aggregator[IN, Long, Long] { + override def zero: Long = Long.MinValue + override def reduce(b: Long, a: IN): Long = if (b > f(a)) b else f(a) + override def merge(b1: Long, b2: Long): Long = if (b1 > b2) b1 else b2 + override def finish(reduction: Long): Long = reduction + + override def bufferEncoder: Encoder[Long] = ExpressionEncoder[Long]() + override def outputEncoder: Encoder[Long] = ExpressionEncoder[Long]() + + // Java api support + def this(f: MapFunction[IN, java.lang.Long]) = this((x: IN) => f.call(x).asInstanceOf[Long]) + + def toColumnJava: TypedColumn[IN, java.lang.Long] = { +toColumn.asInstanceOf[TypedColumn[IN, java.lang.Long]] + } +} + +class TypedMinDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] { + override def zero: Double = Double.MaxValue + override def reduce(b: Double, a: IN): Double = if (b < f(a)) b else f(a) + override def merge(b1: Double, b2: Double): Double = if (b1 < b2) b1 else b2 + override def finish(reduction: Double): Double = reduction + + override def bufferEncoder: Encoder[Double] = ExpressionEncoder[Double]() + override def outputEncoder: Encoder[Double] = ExpressionEncoder[Double]() + + // Java api support + def this(f: MapFunction[IN, java.lang.Double]) = this((x: IN) => f.call(x).asInstanceOf[Double]) + + def toColumnJava: TypedColumn[IN, java.lang.Double] = { +toColumn.asInstanceOf[TypedColumn[IN, java.lang.Double]] + } +} + +class TypedMinLong[IN](val f: IN => Long) extends Aggregator[IN, Long, Long] { + override def zero: Long = Long.MaxValue + override def reduce(b: Long, a: IN): Long = if (b < f(a)) b else f(a) + override def merge(b1: Long, b2: Long): Long = if (b1 < b2) b1 else b2 + override def finish(reduction: Long): Long = reduction + + override def bufferEncoder: Encoder[Long] = ExpressionEncoder[Long]() + override def outputEncoder: Encoder[Long] = ExpressionEncoder[Long]() + + // Java api support + def this(f: MapFunction[IN, java.lang.Long]) = this((x: IN) => f.call(x).asInstanceOf[Long]) + + def toColumnJava: TypedColumn[IN, java.lang.Long] = { +toColumn.asInstanceOf[TypedColumn[IN, java.lang.Long]] + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala index 1cb579c4faa76..6a8336e01d6f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/scalalang/typed.scala @@ -77,14 +77,31 @@ object typed { */ def sumLong[IN](f: IN => Long): TypedColumn[IN, Long] = new TypedSumLong[IN](f).toColumn + /** + * Max aggregate function for floating point (double) type. + */ + def max[IN](f: IN => Double): TypedColumn[IN, Double] = new TypedMaxDouble[IN](f).toColumn + + /** + * Max aggregate function for
[jira] [Commented] (SPARK-26316) Because of the perf degradation in TPC-DS, we currently partial revert SPARK-21052:Add hash map metrics to join,
[ https://issues.apache.org/jira/browse/SPARK-26316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720978#comment-16720978 ] ASF GitHub Bot commented on SPARK-26316: JkSelf opened a new pull request #23319: [SPARK-26316][BRANCH-2.3] Revert hash join metrics in spark 21052 that causes performance degradation URL: https://github.com/apache/spark/pull/23319 ## What changes were proposed in this pull request? Revert spark 21052 in spark 2.4 because of the discussion in [PR23269](https://github.com/apache/spark/pull/23269) ## How was this patch tested? N/A This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Because of the perf degradation in TPC-DS, we currently partial revert > SPARK-21052:Add hash map metrics to join, > > > Key: SPARK-26316 > URL: https://issues.apache.org/jira/browse/SPARK-26316 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.4.0 >Reporter: Ke Jia >Assignee: Ke Jia >Priority: Major > Fix For: 3.0.0 > > > The code of > [L486|https://github.com/apache/spark/blob/1d3dd58d21400b5652b75af7e7e53aad85a31528/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L486] > and > [L487|https://github.com/apache/spark/blob/1d3dd58d21400b5652b75af7e7e53aad85a31528/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L487] > in SPARK-21052 cause performance degradation in spark2.3. The result of > all queries in TPC-DS with 1TB is in [TPC-DS > result|https://docs.google.com/spreadsheets/d/18a5BdOlmm8euTaRodyeWum9yu92mbWWu6JbhGXtr7yE/edit#gid=0] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26369) How to limit Spark concurrent tasks number in one job?
[ https://issues.apache.org/jira/browse/SPARK-26369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-26369. -- Resolution: Invalid Questions should go to mailing list. I think you could have a better answer there. > How to limit Spark concurrent tasks number in one job? > -- > > Key: SPARK-26369 > URL: https://issues.apache.org/jira/browse/SPARK-26369 > Project: Spark > Issue Type: Question > Components: Scheduler >Affects Versions: 2.1.0, 2.2.0, 2.3.2, 2.4.0 >Reporter: Fu Chen >Priority: Major > > Hi All, > it is possible make fair scheduler pools pluggable? so that we can > implement our own SchedulingAlgorithm. In our case, we want to limit the > max tasks number of one job which will load data from mysql database, if we > set a bigger executer.number * cores.number, it will trigger alarm. Or we > can do this in an other way? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26222) Scan: track file listing time
[ https://issues.apache.org/jira/browse/SPARK-26222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720974#comment-16720974 ] Yuanjian Li commented on SPARK-26222: - Copy that, thanks for your explain ! > Scan: track file listing time > - > > Key: SPARK-26222 > URL: https://issues.apache.org/jira/browse/SPARK-26222 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Priority: Major > > We should track file listing time and add it to the scan node's SQL metric, > so we have visibility how much is spent in file listing. It'd be useful to > track not just duration, but also start and end time so we can construct a > timeline. > This requires a little bit design to define what file listing time means, > when we are reading from cache, vs not cache. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26316) Because of the perf degradation in TPC-DS, we currently partial revert SPARK-21052:Add hash map metrics to join,
[ https://issues.apache.org/jira/browse/SPARK-26316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720970#comment-16720970 ] Apache Spark commented on SPARK-26316: -- User 'JkSelf' has created a pull request for this issue: https://github.com/apache/spark/pull/23318 > Because of the perf degradation in TPC-DS, we currently partial revert > SPARK-21052:Add hash map metrics to join, > > > Key: SPARK-26316 > URL: https://issues.apache.org/jira/browse/SPARK-26316 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.4.0 >Reporter: Ke Jia >Assignee: Ke Jia >Priority: Major > Fix For: 3.0.0 > > > The code of > [L486|https://github.com/apache/spark/blob/1d3dd58d21400b5652b75af7e7e53aad85a31528/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L486] > and > [L487|https://github.com/apache/spark/blob/1d3dd58d21400b5652b75af7e7e53aad85a31528/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L487] > in SPARK-21052 cause performance degradation in spark2.3. The result of > all queries in TPC-DS with 1TB is in [TPC-DS > result|https://docs.google.com/spreadsheets/d/18a5BdOlmm8euTaRodyeWum9yu92mbWWu6JbhGXtr7yE/edit#gid=0] -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26222) Scan: track file listing time
[ https://issues.apache.org/jira/browse/SPARK-26222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720957#comment-16720957 ] Reynold Xin commented on SPARK-26222: - So I spent some time looking at the code base to understand what's going on, and how we should report this. In short, I think we have two types of tables: (1) tables that require full file listing in order to resolve the schema (including partition columns) (2) tables that don't. This means there are 3 scenarios to think about: (1) spark.read.parquet("/path/to/table").count() -> in this case an InMemoryFileIndex containing all of the leaf files is created. (2a) spark.read.table("abcd").count() -> when partitions are not tracked in the catalog, which is basically the same as (1) (2b) spark.read.table("abcd").count() -> when partitions are tracked in the catalog. In this case a CatalogFileIndex is created. We should measure the listing time in CatalogFileIndex.filterPartitions. Also instead of tracking them as phases, I'd associate the timing with the scan operator in SQL metrics. I'd report the start and end time, rather than just a single duration. > Scan: track file listing time > - > > Key: SPARK-26222 > URL: https://issues.apache.org/jira/browse/SPARK-26222 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Priority: Major > > We should track file listing time and add it to the scan node's SQL metric, > so we have visibility how much is spent in file listing. It'd be useful to > track not just duration, but also start and end time so we can construct a > timeline. > This requires a little bit design to define what file listing time means, > when we are reading from cache, vs not cache. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26362) Remove 'spark.driver.allowMultipleContexts' to disallow multiple Spark contexts
[ https://issues.apache.org/jira/browse/SPARK-26362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon updated SPARK-26362: - Summary: Remove 'spark.driver.allowMultipleContexts' to disallow multiple Spark contexts (was: Deprecate 'spark.driver.allowMultipleContexts' to disallow multiple Spark contexts) > Remove 'spark.driver.allowMultipleContexts' to disallow multiple Spark > contexts > --- > > Key: SPARK-26362 > URL: https://issues.apache.org/jira/browse/SPARK-26362 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Hyukjin Kwon >Priority: Major > > Multiple Spark contexts are discouraged and it has been warning from 4 years > ago (see SPARK-4180). > It could cause arbitrary and mysterious error cases. (Honestly, I didn't even > know Spark allows it). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26368) Make it clear that getOrInferFileFormatSchema doesn't create InMemoryFileIndex
[ https://issues.apache.org/jira/browse/SPARK-26368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720925#comment-16720925 ] ASF GitHub Bot commented on SPARK-26368: asfgit closed pull request #23317: [SPARK-26368][SQL] Make it clear that getOrInferFileFormatSchema doesn't create InMemoryFileIndex URL: https://github.com/apache/spark/pull/23317 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 795a6d0b6b040..fefff68c4ba8b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -122,21 +122,14 @@ case class DataSource( * be any further inference in any triggers. * * @param format the file format object for this DataSource - * @param fileIndex optional [[InMemoryFileIndex]] for getting partition schema and file list + * @param getFileIndex [[InMemoryFileIndex]] for getting partition schema and file list * @return A pair of the data schema (excluding partition columns) and the schema of the partition * columns. */ private def getOrInferFileFormatSchema( format: FileFormat, - fileIndex: Option[InMemoryFileIndex] = None): (StructType, StructType) = { -// The operations below are expensive therefore try not to do them if we don't need to, e.g., -// in streaming mode, we have already inferred and registered partition columns, we will -// never have to materialize the lazy val below -lazy val tempFileIndex = fileIndex.getOrElse { - val globbedPaths = -checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, checkFilesExist = false) - createInMemoryFileIndex(globbedPaths) -} + getFileIndex: () => InMemoryFileIndex): (StructType, StructType) = { +lazy val tempFileIndex = getFileIndex() val partitionSchema = if (partitionColumns.isEmpty) { // Try to infer partitioning, because no DataSource in the read path provides the partitioning @@ -236,7 +229,15 @@ case class DataSource( "you may be able to create a static DataFrame on that directory with " + "'spark.read.load(directory)' and infer schema from it.") } -val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format) + +val (dataSchema, partitionSchema) = getOrInferFileFormatSchema(format, () => { + // The operations below are expensive therefore try not to do them if we don't need to, + // e.g., in streaming mode, we have already inferred and registered partition columns, + // we will never have to materialize the lazy val below + val globbedPaths = +checkAndGlobPathIfNecessary(checkEmptyGlobPath = false, checkFilesExist = false) + createInMemoryFileIndex(globbedPaths) +}) SourceInfo( s"FileSource[$path]", StructType(dataSchema ++ partitionSchema), @@ -370,7 +371,7 @@ case class DataSource( } else { val index = createInMemoryFileIndex(globbedPaths) val (resultDataSchema, resultPartitionSchema) = -getOrInferFileFormatSchema(format, Some(index)) +getOrInferFileFormatSchema(format, () => index) (index, resultDataSchema, resultPartitionSchema) } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make it clear that getOrInferFileFormatSchema doesn't create InMemoryFileIndex > -- > > Key: SPARK-26368 > URL: https://issues.apache.org/jira/browse/SPARK-26368 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > Fix For: 3.0.0 > > > I was looking at the code and it was a bit difficult to see the life cycle of > InMemoryFileIndex passed into getOrInferFileFormatSchema, because once it is > passed in, and another time it was created in getOrInferFileFormatSchema. > It'd be easier to understand the life cycle if we move the creation of it out. >
[jira] [Resolved] (SPARK-26368) Make it clear that getOrInferFileFormatSchema doesn't create InMemoryFileIndex
[ https://issues.apache.org/jira/browse/SPARK-26368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li resolved SPARK-26368. - Resolution: Fixed Fix Version/s: 3.0.0 > Make it clear that getOrInferFileFormatSchema doesn't create InMemoryFileIndex > -- > > Key: SPARK-26368 > URL: https://issues.apache.org/jira/browse/SPARK-26368 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > Fix For: 3.0.0 > > > I was looking at the code and it was a bit difficult to see the life cycle of > InMemoryFileIndex passed into getOrInferFileFormatSchema, because once it is > passed in, and another time it was created in getOrInferFileFormatSchema. > It'd be easier to understand the life cycle if we move the creation of it out. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26337) Add benchmark for LongToUnsafeRowMap
[ https://issues.apache.org/jira/browse/SPARK-26337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan resolved SPARK-26337. - Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 23284 [https://github.com/apache/spark/pull/23284] > Add benchmark for LongToUnsafeRowMap > > > Key: SPARK-26337 > URL: https://issues.apache.org/jira/browse/SPARK-26337 > Project: Spark > Issue Type: Test > Components: SQL >Affects Versions: 3.0.0 >Reporter: Liang-Chi Hsieh >Assignee: Liang-Chi Hsieh >Priority: Major > Fix For: 3.0.0 > > > Regarding the performance issue of SPARK-26155, I think it is better to add a > benchmark for LongToUnsafeRowMap which is the root cause of performance > regression. It can be easier to show performance difference between different > metric implementation in LongToUnsafeRowMap. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26337) Add benchmark for LongToUnsafeRowMap
[ https://issues.apache.org/jira/browse/SPARK-26337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan reassigned SPARK-26337: --- Assignee: Liang-Chi Hsieh > Add benchmark for LongToUnsafeRowMap > > > Key: SPARK-26337 > URL: https://issues.apache.org/jira/browse/SPARK-26337 > Project: Spark > Issue Type: Test > Components: SQL >Affects Versions: 3.0.0 >Reporter: Liang-Chi Hsieh >Assignee: Liang-Chi Hsieh >Priority: Major > Fix For: 3.0.0 > > > Regarding the performance issue of SPARK-26155, I think it is better to add a > benchmark for LongToUnsafeRowMap which is the root cause of performance > regression. It can be easier to show performance difference between different > metric implementation in LongToUnsafeRowMap. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26364) Clean up import statements in pandas udf tests
[ https://issues.apache.org/jira/browse/SPARK-26364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720858#comment-16720858 ] ASF GitHub Bot commented on SPARK-26364: asfgit closed pull request #23314: [SPARK-26364][PYTHON][TESTING] Clean up imports in test_pandas_udf* URL: https://github.com/apache/spark/pull/23314 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/python/pyspark/sql/tests/test_pandas_udf.py b/python/pyspark/sql/tests/test_pandas_udf.py index c4b5478a7e893..d4d9679649ee9 100644 --- a/python/pyspark/sql/tests/test_pandas_udf.py +++ b/python/pyspark/sql/tests/test_pandas_udf.py @@ -17,12 +17,16 @@ import unittest +from pyspark.sql.functions import udf, pandas_udf, PandasUDFType from pyspark.sql.types import * from pyspark.sql.utils import ParseException +from pyspark.rdd import PythonEvalType from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, have_pyarrow, \ pandas_requirement_message, pyarrow_requirement_message from pyspark.testing.utils import QuietTest +from py4j.protocol import Py4JJavaError + @unittest.skipIf( not have_pandas or not have_pyarrow, @@ -30,9 +34,6 @@ class PandasUDFTests(ReusedSQLTestCase): def test_pandas_udf_basic(self): -from pyspark.rdd import PythonEvalType -from pyspark.sql.functions import pandas_udf, PandasUDFType - udf = pandas_udf(lambda x: x, DoubleType()) self.assertEqual(udf.returnType, DoubleType()) self.assertEqual(udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF) @@ -65,10 +66,6 @@ def test_pandas_udf_basic(self): self.assertEqual(udf.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF) def test_pandas_udf_decorator(self): -from pyspark.rdd import PythonEvalType -from pyspark.sql.functions import pandas_udf, PandasUDFType -from pyspark.sql.types import StructType, StructField, DoubleType - @pandas_udf(DoubleType()) def foo(x): return x @@ -114,8 +111,6 @@ def foo(x): self.assertEqual(foo.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF) def test_udf_wrong_arg(self): -from pyspark.sql.functions import pandas_udf, PandasUDFType - with QuietTest(self.sc): with self.assertRaises(ParseException): @pandas_udf('blah') @@ -151,9 +146,6 @@ def foo(k, v, w): return k def test_stopiteration_in_udf(self): -from pyspark.sql.functions import udf, pandas_udf, PandasUDFType -from py4j.protocol import Py4JJavaError - def foo(x): raise StopIteration() diff --git a/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py b/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py index 5383704434c85..18264ead2fd08 100644 --- a/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py +++ b/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py @@ -17,6 +17,9 @@ import unittest +from pyspark.rdd import PythonEvalType +from pyspark.sql.functions import array, explode, col, lit, mean, sum, \ +udf, pandas_udf, PandasUDFType from pyspark.sql.types import * from pyspark.sql.utils import AnalysisException from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas, have_pyarrow, \ @@ -31,7 +34,6 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase): @property def data(self): -from pyspark.sql.functions import array, explode, col, lit return self.spark.range(10).toDF('id') \ .withColumn("vs", array([lit(i * 1.0) + col('id') for i in range(20, 30)])) \ .withColumn("v", explode(col('vs'))) \ @@ -40,8 +42,6 @@ def data(self): @property def python_plus_one(self): -from pyspark.sql.functions import udf - @udf('double') def plus_one(v): assert isinstance(v, (int, float)) @@ -51,7 +51,6 @@ def plus_one(v): @property def pandas_scalar_plus_two(self): import pandas as pd -from pyspark.sql.functions import pandas_udf, PandasUDFType @pandas_udf('double', PandasUDFType.SCALAR) def plus_two(v): @@ -61,8 +60,6 @@ def plus_two(v): @property def pandas_agg_mean_udf(self): -from pyspark.sql.functions import pandas_udf, PandasUDFType - @pandas_udf('double', PandasUDFType.GROUPED_AGG) def avg(v): return v.mean() @@ -70,8 +67,6 @@ def avg(v): @property def pandas_agg_sum_udf(self): -from pyspark.sql.functions import pandas_udf, PandasUDFType - @pandas_udf('double', PandasUDFType.GROUPED_AGG) def sum(v): return v.sum() @@
[jira] [Resolved] (SPARK-26360) Avoid extra validateQuery call in createStreamingWriteSupport
[ https://issues.apache.org/jira/browse/SPARK-26360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-26360. -- Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 23309 [https://github.com/apache/spark/pull/23309] > Avoid extra validateQuery call in createStreamingWriteSupport > - > > Key: SPARK-26360 > URL: https://issues.apache.org/jira/browse/SPARK-26360 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.3.2, 2.4.0 >Reporter: Wu Wenjie >Assignee: Wu Wenjie >Priority: Trivial > Fix For: 3.0.0 > > > When I'm reading structured streaming source code, I find there is a > redundant KafkaWriter.validateQuery() function call in > createStreamingWriteSupport func in class `KafkaSourceProvider`. > {code:scala} > // KafkaSourceProvider.scala > override def createStreamingWriteSupport( > queryId: String, > schema: StructType, > mode: OutputMode, > options: DataSourceOptions): StreamingWriteSupport = { >. > // validate once here > KafkaWriter.validateQuery(schema.toAttributes, producerParams, topic) > // validate twice here > new KafkaStreamingWriteSupport(topic, producerParams, schema) > } > // KafkaStreamingWriteSupport.scala > class KafkaStreamingWriteSupport( > topic: Option[String], > producerParams: ju.Map[String, Object], > schema: StructType) > extends StreamingWriteSupport { > validateQuery(schema.toAttributes, producerParams, topic) > > } > {code} > > I think we just need to remove one of these two. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26350) Allow the user to override the group id of the Kafka's consumer
[ https://issues.apache.org/jira/browse/SPARK-26350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kevin Zhang updated SPARK-26350: Attachment: Permalink.url > Allow the user to override the group id of the Kafka's consumer > --- > > Key: SPARK-26350 > URL: https://issues.apache.org/jira/browse/SPARK-26350 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Shixiong Zhu >Priority: Major > Attachments: Permalink.url > > > Sometimes the group id is used to identify the stream for "security". We > should give a flag that lets you override it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26360) Avoid extra validateQuery call in createStreamingWriteSupport
[ https://issues.apache.org/jira/browse/SPARK-26360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720859#comment-16720859 ] ASF GitHub Bot commented on SPARK-26360: asfgit closed pull request #23309: [SPARK-26360]remove redundant validateQuery call URL: https://github.com/apache/spark/pull/23309 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 6a0c2088ac3d1..4b8b5c0019b44 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -266,8 +266,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister // We convert the options argument from V2 -> Java map -> scala mutable -> scala immutable. val producerParams = kafkaParamsForProducer(options.asMap.asScala.toMap) -KafkaWriter.validateQuery(schema.toAttributes, producerParams, topic) - new KafkaStreamingWriteSupport(topic, producerParams, schema) } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Avoid extra validateQuery call in createStreamingWriteSupport > - > > Key: SPARK-26360 > URL: https://issues.apache.org/jira/browse/SPARK-26360 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.3.2, 2.4.0 >Reporter: Wu Wenjie >Priority: Trivial > Fix For: 3.0.0 > > > When I'm reading structured streaming source code, I find there is a > redundant KafkaWriter.validateQuery() function call in > createStreamingWriteSupport func in class `KafkaSourceProvider`. > {code:scala} > // KafkaSourceProvider.scala > override def createStreamingWriteSupport( > queryId: String, > schema: StructType, > mode: OutputMode, > options: DataSourceOptions): StreamingWriteSupport = { >. > // validate once here > KafkaWriter.validateQuery(schema.toAttributes, producerParams, topic) > // validate twice here > new KafkaStreamingWriteSupport(topic, producerParams, schema) > } > // KafkaStreamingWriteSupport.scala > class KafkaStreamingWriteSupport( > topic: Option[String], > producerParams: ju.Map[String, Object], > schema: StructType) > extends StreamingWriteSupport { > validateQuery(schema.toAttributes, producerParams, topic) > > } > {code} > > I think we just need to remove one of these two. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26337) Add benchmark for LongToUnsafeRowMap
[ https://issues.apache.org/jira/browse/SPARK-26337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720862#comment-16720862 ] ASF GitHub Bot commented on SPARK-26337: asfgit closed pull request #23284: [SPARK-26337][SQL][TEST] Add benchmark for LongToUnsafeRowMap URL: https://github.com/apache/spark/pull/23284 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sql/core/benchmarks/HashedRelationMetricsBenchmark-results.txt b/sql/core/benchmarks/HashedRelationMetricsBenchmark-results.txt new file mode 100644 index 0..338244ad542f4 --- /dev/null +++ b/sql/core/benchmarks/HashedRelationMetricsBenchmark-results.txt @@ -0,0 +1,11 @@ + +LongToUnsafeRowMap metrics + + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.13.6 +Intel(R) Core(TM) i7-7700HQ CPU @ 2.80GHz +LongToUnsafeRowMap metrics: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + +LongToUnsafeRowMap 234 / 315 2.1 467.3 1.0X + + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/HashedRelationMetricsBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/HashedRelationMetricsBenchmark.scala new file mode 100644 index 0..bdf753debe62a --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/HashedRelationMetricsBenchmark.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import org.apache.spark.SparkConf +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.internal.config.MEMORY_OFFHEAP_ENABLED +import org.apache.spark.memory.{StaticMemoryManager, TaskMemoryManager} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{BoundReference, UnsafeProjection} +import org.apache.spark.sql.execution.joins.LongToUnsafeRowMap +import org.apache.spark.sql.types.LongType + +/** + * Benchmark to measure metrics performance at HashedRelation. + * To run this benchmark: + * {{{ + * 1. without sbt: bin/spark-submit --class + * 2. build/sbt "sql/test:runMain " + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain " + * Results will be written to "benchmarks/HashedRelationMetricsBenchmark-results.txt". + * }}} + */ +object HashedRelationMetricsBenchmark extends SqlBasedBenchmark { + + def benchmarkLongToUnsafeRowMapMetrics(numRows: Int): Unit = { +runBenchmark("LongToUnsafeRowMap metrics") { + val benchmark = new Benchmark("LongToUnsafeRowMap metrics", numRows, output = output) + benchmark.addCase("LongToUnsafeRowMap") { iter => +val taskMemoryManager = new TaskMemoryManager( + new StaticMemoryManager( +new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), +Long.MaxValue, +Long.MaxValue, +1), + 0) +val unsafeProj = UnsafeProjection.create(Seq(BoundReference(0, LongType, false))) + +val keys = Range.Long(0, numRows, 1) +val map = new LongToUnsafeRowMap(taskMemoryManager, 1) +keys.foreach { k => + map.append(k, unsafeProj(InternalRow(k))) +} +map.optimize() + +val threads = (0 to 100).map { _ => + val thread = new Thread { +override def run: Unit = { + val row = unsafeProj(InternalRow(0L)).copy() + keys.foreach { k => +assert(map.getValue(k, row) eq row) +assert(row.getLong(0) == k) + } +}
[jira] [Assigned] (SPARK-26364) Clean up import statements in pandas udf tests
[ https://issues.apache.org/jira/browse/SPARK-26364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon reassigned SPARK-26364: Assignee: Li Jin > Clean up import statements in pandas udf tests > -- > > Key: SPARK-26364 > URL: https://issues.apache.org/jira/browse/SPARK-26364 > Project: Spark > Issue Type: Improvement > Components: Tests >Affects Versions: 2.4.0 >Reporter: Li Jin >Assignee: Li Jin >Priority: Minor > Fix For: 3.0.0 > > > Per discussion [https://github.com/apache/spark/pull/22305/files#r241215618] > we should clean up the import statements in test_pandas_udf* and move them to > the top. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26360) Avoid extra validateQuery call in createStreamingWriteSupport
[ https://issues.apache.org/jira/browse/SPARK-26360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon reassigned SPARK-26360: Assignee: Wu Wenjie > Avoid extra validateQuery call in createStreamingWriteSupport > - > > Key: SPARK-26360 > URL: https://issues.apache.org/jira/browse/SPARK-26360 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.3.2, 2.4.0 >Reporter: Wu Wenjie >Assignee: Wu Wenjie >Priority: Trivial > Fix For: 3.0.0 > > > When I'm reading structured streaming source code, I find there is a > redundant KafkaWriter.validateQuery() function call in > createStreamingWriteSupport func in class `KafkaSourceProvider`. > {code:scala} > // KafkaSourceProvider.scala > override def createStreamingWriteSupport( > queryId: String, > schema: StructType, > mode: OutputMode, > options: DataSourceOptions): StreamingWriteSupport = { >. > // validate once here > KafkaWriter.validateQuery(schema.toAttributes, producerParams, topic) > // validate twice here > new KafkaStreamingWriteSupport(topic, producerParams, schema) > } > // KafkaStreamingWriteSupport.scala > class KafkaStreamingWriteSupport( > topic: Option[String], > producerParams: ju.Map[String, Object], > schema: StructType) > extends StreamingWriteSupport { > validateQuery(schema.toAttributes, producerParams, topic) > > } > {code} > > I think we just need to remove one of these two. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26364) Clean up import statements in pandas udf tests
[ https://issues.apache.org/jira/browse/SPARK-26364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-26364. -- Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 23314 [https://github.com/apache/spark/pull/23314] > Clean up import statements in pandas udf tests > -- > > Key: SPARK-26364 > URL: https://issues.apache.org/jira/browse/SPARK-26364 > Project: Spark > Issue Type: Improvement > Components: Tests >Affects Versions: 2.4.0 >Reporter: Li Jin >Assignee: Li Jin >Priority: Minor > Fix For: 3.0.0 > > > Per discussion [https://github.com/apache/spark/pull/22305/files#r241215618] > we should clean up the import statements in test_pandas_udf* and move them to > the top. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26367) Remove ReplaceExceptWithFilter from nonExcludableRules
[ https://issues.apache.org/jira/browse/SPARK-26367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720850#comment-16720850 ] ASF GitHub Bot commented on SPARK-26367: gatorsmile closed pull request #23316: [SPARK-26367] [SQL] Remove ReplaceExceptWithFilter from nonExcludableRules URL: https://github.com/apache/spark/pull/23316 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 8d251eeab8484..5087ce211b68f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -199,7 +199,6 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) RewriteDistinctAggregates.ruleName :: ReplaceDeduplicateWithAggregate.ruleName :: ReplaceIntersectWithSemiJoin.ruleName :: - ReplaceExceptWithFilter.ruleName :: ReplaceExceptWithAntiJoin.ruleName :: RewriteExceptAll.ruleName :: RewriteIntersectAll.ruleName :: This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove ReplaceExceptWithFilter from nonExcludableRules > -- > > Key: SPARK-26367 > URL: https://issues.apache.org/jira/browse/SPARK-26367 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Xiao Li >Assignee: Xiao Li >Priority: Major > > ReplaceExceptWithFilter is optional and thus remove it from nonExcludableRules -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26369) How to limit Spark concurrent tasks number in one job?
Fu Chen created SPARK-26369: --- Summary: How to limit Spark concurrent tasks number in one job? Key: SPARK-26369 URL: https://issues.apache.org/jira/browse/SPARK-26369 Project: Spark Issue Type: Question Components: Scheduler Affects Versions: 2.4.0, 2.3.2, 2.2.0, 2.1.0 Reporter: Fu Chen Hi All, it is possible make fair scheduler pools pluggable? so that we can implement our own SchedulingAlgorithm. In our case, we want to limit the max tasks number of one job which will load data from mysql database, if we set a bigger executer.number * cores.number, it will trigger alarm. Or we can do this in an other way? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-26222) Scan: track file listing time
[ https://issues.apache.org/jira/browse/SPARK-26222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-26222: Comment: was deleted (was: xuanyuanking opened a new pull request #23298: [SPARK-26222][SQL] Track file listing time URL: https://github.com/apache/spark/pull/23298 ## What changes were proposed in this pull request? File listing time in scan node's SQL metrics has done and improved in spark-20136/SPARK-26327. In this pr we use QueryPlanningTracker to track start and end time of file listing. ## How was this patch tested? Add test for DataFrameWriter and Non-physical phase below: - DataFrameReader.load, file listing will be triggered by DataSource.resolveRelation. - Analyze rule like FindDataSourceTable. - Optimization rule like PruneFileSourcePartitions, OptimizeMetadataOnlyQuery. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org ) > Scan: track file listing time > - > > Key: SPARK-26222 > URL: https://issues.apache.org/jira/browse/SPARK-26222 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Priority: Major > > We should track file listing time and add it to the scan node's SQL metric, > so we have visibility how much is spent in file listing. It'd be useful to > track not just duration, but also start and end time so we can construct a > timeline. > This requires a little bit design to define what file listing time means, > when we are reading from cache, vs not cache. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26368) Make it clear that getOrInferFileFormatSchema doesn't create InMemoryFileIndex
Reynold Xin created SPARK-26368: --- Summary: Make it clear that getOrInferFileFormatSchema doesn't create InMemoryFileIndex Key: SPARK-26368 URL: https://issues.apache.org/jira/browse/SPARK-26368 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.0 Reporter: Reynold Xin Assignee: Reynold Xin I was looking at the code and it was a bit difficult to see the life cycle of InMemoryFileIndex passed into getOrInferFileFormatSchema, because once it is passed in, and another time it was created in getOrInferFileFormatSchema. It'd be easier to understand the life cycle if we move the creation of it out. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26368) Make it clear that getOrInferFileFormatSchema doesn't create InMemoryFileIndex
[ https://issues.apache.org/jira/browse/SPARK-26368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26368: Assignee: Reynold Xin (was: Apache Spark) > Make it clear that getOrInferFileFormatSchema doesn't create InMemoryFileIndex > -- > > Key: SPARK-26368 > URL: https://issues.apache.org/jira/browse/SPARK-26368 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > > I was looking at the code and it was a bit difficult to see the life cycle of > InMemoryFileIndex passed into getOrInferFileFormatSchema, because once it is > passed in, and another time it was created in getOrInferFileFormatSchema. > It'd be easier to understand the life cycle if we move the creation of it out. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26368) Make it clear that getOrInferFileFormatSchema doesn't create InMemoryFileIndex
[ https://issues.apache.org/jira/browse/SPARK-26368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26368: Assignee: Apache Spark (was: Reynold Xin) > Make it clear that getOrInferFileFormatSchema doesn't create InMemoryFileIndex > -- > > Key: SPARK-26368 > URL: https://issues.apache.org/jira/browse/SPARK-26368 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Assignee: Apache Spark >Priority: Major > > I was looking at the code and it was a bit difficult to see the life cycle of > InMemoryFileIndex passed into getOrInferFileFormatSchema, because once it is > passed in, and another time it was created in getOrInferFileFormatSchema. > It'd be easier to understand the life cycle if we move the creation of it out. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26368) Make it clear that getOrInferFileFormatSchema doesn't create InMemoryFileIndex
[ https://issues.apache.org/jira/browse/SPARK-26368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720809#comment-16720809 ] ASF GitHub Bot commented on SPARK-26368: rxin opened a new pull request #23317: [SPARK-26368][SQL] Make it clear that getOrInferFileFormatSchema doesn't create InMemoryFileIndex URL: https://github.com/apache/spark/pull/23317 ## What changes were proposed in this pull request? I was looking at the code and it was a bit difficult to see the life cycle of InMemoryFileIndex passed into getOrInferFileFormatSchema, because once it is passed in, and another time it was created in getOrInferFileFormatSchema. It'd be easier to understand the life cycle if we move the creation of it out. ## How was this patch tested? This is a simple code move and should be covered by existing tests. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Make it clear that getOrInferFileFormatSchema doesn't create InMemoryFileIndex > -- > > Key: SPARK-26368 > URL: https://issues.apache.org/jira/browse/SPARK-26368 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Assignee: Reynold Xin >Priority: Major > > I was looking at the code and it was a bit difficult to see the life cycle of > InMemoryFileIndex passed into getOrInferFileFormatSchema, because once it is > passed in, and another time it was created in getOrInferFileFormatSchema. > It'd be easier to understand the life cycle if we move the creation of it out. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23886) update query.status
[ https://issues.apache.org/jira/browse/SPARK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720719#comment-16720719 ] ASF GitHub Bot commented on SPARK-23886: asfgit closed pull request #23095: [SPARK-23886][SS] Update query status for ContinuousExecution URL: https://github.com/apache/spark/pull/23095 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 2cac86599ef19..f2dda0373c7ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -146,6 +146,12 @@ class MicroBatchExecution( logInfo(s"Query $prettyIdString was stopped") } + /** Begins recording statistics about query progress for a given trigger. */ + override protected def startTrigger(): Unit = { +super.startTrigger() +currentStatus = currentStatus.copy(isTriggerActive = true) + } + /** * Repeatedly attempts to run batches as data arrives. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 392229bcb5f55..a5fbb56e27099 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -114,7 +114,6 @@ trait ProgressReporter extends Logging { logDebug("Starting Trigger Calculation") lastTriggerStartTimestamp = currentTriggerStartTimestamp currentTriggerStartTimestamp = triggerClock.getTimeMillis() -currentStatus = currentStatus.copy(isTriggerActive = true) currentTriggerStartOffsets = null currentTriggerEndOffsets = null currentDurationsMs.clear() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 4a7df731da67d..adbec0b00f368 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -117,6 +117,8 @@ class ContinuousExecution( // For at least once, we can just ignore those reports and risk duplicates. commitLog.getLatest() match { case Some((latestEpochId, _)) => +updateStatusMessage("Starting new streaming query " + + s"and getting offsets from latest epoch $latestEpochId") val nextOffsets = offsetLog.get(latestEpochId).getOrElse { throw new IllegalStateException( s"Batch $latestEpochId was committed without end epoch offsets!") @@ -128,6 +130,7 @@ class ContinuousExecution( nextOffsets case None => // We are starting this stream for the first time. Offsets are all None. +updateStatusMessage("Starting new streaming query") logInfo(s"Starting new streaming query.") currentBatchId = 0 OffsetSeq.fill(continuousSources.map(_ => null): _*) @@ -260,6 +263,7 @@ class ContinuousExecution( epochUpdateThread.setDaemon(true) epochUpdateThread.start() + updateStatusMessage("Running") reportTimeTaken("runContinuous") { SQLExecution.withNewExecutionId( sparkSessionForQuery, lastExecution) { @@ -319,6 +323,8 @@ class ContinuousExecution( * before this is called. */ def commit(epoch: Long): Unit = { +updateStatusMessage(s"Committing epoch $epoch") + assert(continuousSources.length == 1, "only one continuous source supported currently") assert(offsetLog.get(epoch).isDefined, s"offset for epoch $epoch not reported before commit") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala index a0c9bcc8929eb..ca79e0248c06b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala @@ -28,9 +28,11 @@ import org.apache.spark.annotation.InterfaceStability * Reports information about the instantaneous status of a streaming query. * * @param message A human readable description of
[jira] [Resolved] (SPARK-23886) update query.status
[ https://issues.apache.org/jira/browse/SPARK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marcelo Vanzin resolved SPARK-23886. Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 23095 [https://github.com/apache/spark/pull/23095] > update query.status > --- > > Key: SPARK-23886 > URL: https://issues.apache.org/jira/browse/SPARK-23886 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Assignee: Gabor Somogyi >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-23886) update query.status
[ https://issues.apache.org/jira/browse/SPARK-23886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marcelo Vanzin reassigned SPARK-23886: -- Assignee: Gabor Somogyi > update query.status > --- > > Key: SPARK-23886 > URL: https://issues.apache.org/jira/browse/SPARK-23886 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Jose Torres >Assignee: Gabor Somogyi >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26352) join reordering should not change the order of output attributes
[ https://issues.apache.org/jira/browse/SPARK-26352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kris Mok updated SPARK-26352: - Summary: join reordering should not change the order of output attributes (was: ReorderJoin should not change the order of columns) > join reordering should not change the order of output attributes > > > Key: SPARK-26352 > URL: https://issues.apache.org/jira/browse/SPARK-26352 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0, 2.4.0 >Reporter: Kris Mok >Priority: Major > > The optimizer rule {{org.apache.spark.sql.catalyst.optimizer.ReorderJoin}} > performs join reordering on inner joins. This was introduced from SPARK-12032 > in 2015-12. > After it had reordered the joins, though, it didn't check whether or not the > column order (in terms of the {{output}} attribute list) is still the same as > before. Thus, it's possible to have a mismatch between the reordered column > order vs the schema that a DataFrame thinks it has. > This can be demonstrated with the example: > {code:none} > spark.sql("create table table_a (x int, y int) using parquet") > spark.sql("create table table_b (i int, j int) using parquet") > spark.sql("create table table_c (a int, b int) using parquet") > val df = spark.sql("with df1 as (select * from table_a cross join table_b) > select * from df1 join table_c on a = x and b = i") > {code} > here's what the DataFrame thinks: > {code:none} > scala> df.printSchema > root > |-- x: integer (nullable = true) > |-- y: integer (nullable = true) > |-- i: integer (nullable = true) > |-- j: integer (nullable = true) > |-- a: integer (nullable = true) > |-- b: integer (nullable = true) > {code} > here's what the optimized plan thinks, after join reordering: > {code:none} > scala> df.queryExecution.optimizedPlan.output.foreach(a => println(s"|-- > ${a.name}: ${a.dataType.typeName}")) > |-- x: integer > |-- y: integer > |-- a: integer > |-- b: integer > |-- i: integer > |-- j: integer > {code} > If we exclude the {{ReorderJoin}} rule (using Spark 2.4's optimizer rule > exclusion feature), it's back to normal: > {code:none} > scala> spark.conf.set("spark.sql.optimizer.excludedRules", > "org.apache.spark.sql.catalyst.optimizer.ReorderJoin") > scala> val df = spark.sql("with df1 as (select * from table_a cross join > table_b) select * from df1 join table_c on a = x and b = i") > df: org.apache.spark.sql.DataFrame = [x: int, y: int ... 4 more fields] > scala> df.queryExecution.optimizedPlan.output.foreach(a => println(s"|-- > ${a.name}: ${a.dataType.typeName}")) > |-- x: integer > |-- y: integer > |-- i: integer > |-- j: integer > |-- a: integer > |-- b: integer > {code} > Note that this column ordering problem leads to data corruption, and can > manifest itself in various symptoms: > * Silently corrupting data, if the reordered columns happen to either have > matching types or have sufficiently-compatible types (e.g. all fixed length > primitive types are considered as "sufficiently compatible" in an UnsafeRow), > then only the resulting data is going to be wrong but it might not trigger > any alarms immediately. Or > * Weird Java-level exceptions like {{java.lang.NegativeArraySizeException}}, > or even SIGSEGVs. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19739) SparkHadoopUtil.appendS3AndSparkHadoopConfigurations to propagate full set of AWS env vars
[ https://issues.apache.org/jira/browse/SPARK-19739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720664#comment-16720664 ] Imran Rashid commented on SPARK-19739: -- ok, sounds good, and thanks for the quick response! sounds like a nice improvement :) > SparkHadoopUtil.appendS3AndSparkHadoopConfigurations to propagate full set of > AWS env vars > -- > > Key: SPARK-19739 > URL: https://issues.apache.org/jira/browse/SPARK-19739 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Steve Loughran >Assignee: Genmao Yu >Priority: Minor > Fix For: 2.2.0 > > > {{SparkHadoopUtil.appendS3AndSparkHadoopConfigurations()}} propagates the AWS > user and secret key to s3n and s3a config options, so getting secrets from > the user to the cluster, if set. > AWS also supports session authentication (env var {{AWS_SESSION_TOKEN}}) and > region endpoints {{AWS_DEFAULT_REGION}}, the latter being critical if you > want to address V4-auth-only endpoints like frankfurt and Seol. > These env vars should be picked up and passed down to S3a too. 4+ lines of > code, though impossible to test unless the existing code is refactored to > take the env var map[String, String], so allowing a test suite to set the > values in itds own map. > side issue: what if only half the env vars are set and users are trying to > understand why auth is failing? It may be good to build up a string > identifying which env vars had their value propagate, and log that @ debug, > while not logging the values, obviously. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19739) SparkHadoopUtil.appendS3AndSparkHadoopConfigurations to propagate full set of AWS env vars
[ https://issues.apache.org/jira/browse/SPARK-19739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720656#comment-16720656 ] Steve Loughran commented on SPARK-19739: no, leave alone. * In HADOOP-14556 I'm actually adding that temp one as ahead of the normal creds in the list, so it will be lifted by default. * That patch actually adds something way more profound: the ability to create Delegation Tokens off S3A endpoints, which will be automatic session credentials, or, with a bit more config, shorter lived role credentials with restricted access to only the resources you need (specific s3a, any matching ddb table). With that, you don't need to propagate S3 credentials at all, so your secrets stay on your local system. Yes, spark works with this. No, can't do a demo right now, but I'll stick a video up as soon as I can make one (next week) > SparkHadoopUtil.appendS3AndSparkHadoopConfigurations to propagate full set of > AWS env vars > -- > > Key: SPARK-19739 > URL: https://issues.apache.org/jira/browse/SPARK-19739 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Steve Loughran >Assignee: Genmao Yu >Priority: Minor > Fix For: 2.2.0 > > > {{SparkHadoopUtil.appendS3AndSparkHadoopConfigurations()}} propagates the AWS > user and secret key to s3n and s3a config options, so getting secrets from > the user to the cluster, if set. > AWS also supports session authentication (env var {{AWS_SESSION_TOKEN}}) and > region endpoints {{AWS_DEFAULT_REGION}}, the latter being critical if you > want to address V4-auth-only endpoints like frankfurt and Seol. > These env vars should be picked up and passed down to S3a too. 4+ lines of > code, though impossible to test unless the existing code is refactored to > take the env var map[String, String], so allowing a test suite to set the > values in itds own map. > side issue: what if only half the env vars are set and users are trying to > understand why auth is failing? It may be good to build up a string > identifying which env vars had their value propagate, and log that @ debug, > while not logging the values, obviously. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-19739) SparkHadoopUtil.appendS3AndSparkHadoopConfigurations to propagate full set of AWS env vars
[ https://issues.apache.org/jira/browse/SPARK-19739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720650#comment-16720650 ] Imran Rashid edited comment on SPARK-19739 at 12/13/18 10:18 PM: - [~ste...@apache.org] I didn't realize when using this at first that I also needed to add the conf {{--conf "spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider"}} to have {{AWS_SESSION_TOKEN}} take any effect. You don't get any useful error msg if you don't add that credentials provided -- just access forbidden. Do you think its useful to do that automatically as well when {{AWS_SESSION_TOKEN}} is set? was (Author: irashid): [~ste...@apache.org] I didn't realize when using this at first that I also needed to add the conf {{--conf "spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider"}} to have {{AWS_SESSION_TOKEN}} take any effect. You don't get any useful error msg when that happens -- just access forbidden. Do you think its useful to do that automatically as well when {{AWS_SESSION_TOKEN}} is set? > SparkHadoopUtil.appendS3AndSparkHadoopConfigurations to propagate full set of > AWS env vars > -- > > Key: SPARK-19739 > URL: https://issues.apache.org/jira/browse/SPARK-19739 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Steve Loughran >Assignee: Genmao Yu >Priority: Minor > Fix For: 2.2.0 > > > {{SparkHadoopUtil.appendS3AndSparkHadoopConfigurations()}} propagates the AWS > user and secret key to s3n and s3a config options, so getting secrets from > the user to the cluster, if set. > AWS also supports session authentication (env var {{AWS_SESSION_TOKEN}}) and > region endpoints {{AWS_DEFAULT_REGION}}, the latter being critical if you > want to address V4-auth-only endpoints like frankfurt and Seol. > These env vars should be picked up and passed down to S3a too. 4+ lines of > code, though impossible to test unless the existing code is refactored to > take the env var map[String, String], so allowing a test suite to set the > values in itds own map. > side issue: what if only half the env vars are set and users are trying to > understand why auth is failing? It may be good to build up a string > identifying which env vars had their value propagate, and log that @ debug, > while not logging the values, obviously. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19739) SparkHadoopUtil.appendS3AndSparkHadoopConfigurations to propagate full set of AWS env vars
[ https://issues.apache.org/jira/browse/SPARK-19739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720650#comment-16720650 ] Imran Rashid commented on SPARK-19739: -- [~ste...@apache.org] I didn't realize when using this at first that I also needed to add the conf {{--conf "spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider"}} to have {{AWS_SESSION_TOKEN}} take any effect. You don't get any useful error msg when that happens -- just access forbidden. Do you think its useful to do that automatically as well when {{AWS_SESSION_TOKEN}} is set? > SparkHadoopUtil.appendS3AndSparkHadoopConfigurations to propagate full set of > AWS env vars > -- > > Key: SPARK-19739 > URL: https://issues.apache.org/jira/browse/SPARK-19739 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Steve Loughran >Assignee: Genmao Yu >Priority: Minor > Fix For: 2.2.0 > > > {{SparkHadoopUtil.appendS3AndSparkHadoopConfigurations()}} propagates the AWS > user and secret key to s3n and s3a config options, so getting secrets from > the user to the cluster, if set. > AWS also supports session authentication (env var {{AWS_SESSION_TOKEN}}) and > region endpoints {{AWS_DEFAULT_REGION}}, the latter being critical if you > want to address V4-auth-only endpoints like frankfurt and Seol. > These env vars should be picked up and passed down to S3a too. 4+ lines of > code, though impossible to test unless the existing code is refactored to > take the env var map[String, String], so allowing a test suite to set the > values in itds own map. > side issue: what if only half the env vars are set and users are trying to > understand why auth is failing? It may be good to build up a string > identifying which env vars had their value propagate, and log that @ debug, > while not logging the values, obviously. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26254) Move delegation token providers into a separate project
[ https://issues.apache.org/jira/browse/SPARK-26254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720633#comment-16720633 ] Steve Loughran commented on SPARK-26254: bq. There was concern about using ServiceLoader before, but if the interface being loaded is private to Spark, it's fine with me. HADOOP-15808 there. If you have any class which declares a delegation token, but that class doesn't actually load (missing, transitive CNFE, etc), and the jar containing that META-INF manifest gets into the classpath of your resource manager, there goes your cluster as soon as the first job is submitted.Traumatic. > Move delegation token providers into a separate project > --- > > Key: SPARK-26254 > URL: https://issues.apache.org/jira/browse/SPARK-26254 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Gabor Somogyi >Priority: Major > > There was a discussion in > [PR#22598|https://github.com/apache/spark/pull/22598] that there are several > provided dependencies inside core project which shouldn't be there (for ex. > hive and kafka). This jira is to solve this problem. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26359) Spark checkpoint restore fails after query restart
[ https://issues.apache.org/jira/browse/SPARK-26359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720558#comment-16720558 ] Gabor Somogyi commented on SPARK-26359: --- As I see the main issue appears because S3's read-after-write consistency (we've spoken about this issue on another jira). The checkpoint try to save a file with a so called renaming strategy. It is necessary to write files atomically. It creates a temp file and then renames it. In this case: * Temp file created * Tried to move * Exception happened In such situations even s3 guard is useless. > Spark checkpoint restore fails after query restart > -- > > Key: SPARK-26359 > URL: https://issues.apache.org/jira/browse/SPARK-26359 > Project: Spark > Issue Type: Bug > Components: Spark Submit, Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark 2.4.0 deployed in standalone-client mode > Checkpointing is done to S3 > The Spark application in question is responsible for running 4 different > queries > Queries are written using Structured Streaming > We are using the following algorithm for hopes of better performance: > spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version: "2" # When the > default is 1 >Reporter: Kaspar Tint >Priority: Major > Attachments: driver-redacted, metadata, redacted-offsets, > state-redacted, worker-redacted > > > We had an incident where one of our structured streaming queries could not be > restarted after an usual S3 checkpointing failure. Now to clarify before > everything else - we are aware of the issues with S3 and are working towards > moving to HDFS but this will take time. S3 will cause queries to fail quite > often during peak hours and we have separate logic to handle this that will > attempt to restart the failed queries if possible. > In this particular case we could not restart one of the failed queries. Seems > like between detecting a failure in the query and starting it up again > something went really wrong with Spark and state in checkpoint folder got > corrupted for some reason. > The issue starts with the usual *FileNotFoundException* that happens with S3 > {code:java} > 2018-12-10 21:09:25.785 ERROR MicroBatchExecution: Query feedback [id = > c074233a-2563-40fc-8036-b5e38e2e2c42, runId = > e607eb6e-8431-4269-acab-cc2c1f9f09dd] > terminated with error > java.io.FileNotFoundException: No such file or directory: > s3a://some.domain/spark/checkpoints/49/feedback/offsets/.37 > 348.8227943f-a848-4af5-b5bf-1fea81775b24.tmp > at > org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088) > at > org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2715) > at > org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131) > at > org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726) > at > org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699) > at org.apache.hadoop.fs.FileContext.rename(FileContext.java:965) > at > org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:331) > at > org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.org$apache$spark$sql$execution$streaming$HDFSMetadataLog$$writeBatchToFile(HDFSMetadataL > og.scala:126) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply$mcZ$sp(HDFSMetadataLog.scala:112) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply$mcV$sp(MicroBatchExecution.scala:382) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply(MicroBatchExecution.scala:381) > at >
[jira] [Assigned] (SPARK-26367) Remove ReplaceExceptWithFilter from nonExcludableRules
[ https://issues.apache.org/jira/browse/SPARK-26367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26367: Assignee: Apache Spark (was: Xiao Li) > Remove ReplaceExceptWithFilter from nonExcludableRules > -- > > Key: SPARK-26367 > URL: https://issues.apache.org/jira/browse/SPARK-26367 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Xiao Li >Assignee: Apache Spark >Priority: Major > > ReplaceExceptWithFilter is optional and thus remove it from nonExcludableRules -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26367) Remove ReplaceExceptWithFilter from nonExcludableRules
[ https://issues.apache.org/jira/browse/SPARK-26367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26367: Assignee: Xiao Li (was: Apache Spark) > Remove ReplaceExceptWithFilter from nonExcludableRules > -- > > Key: SPARK-26367 > URL: https://issues.apache.org/jira/browse/SPARK-26367 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Xiao Li >Assignee: Xiao Li >Priority: Major > > ReplaceExceptWithFilter is optional and thus remove it from nonExcludableRules -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26367) Remove ReplaceExceptWithFilter from nonExcludableRules
[ https://issues.apache.org/jira/browse/SPARK-26367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720514#comment-16720514 ] ASF GitHub Bot commented on SPARK-26367: gatorsmile opened a new pull request #23316: [SPARK-26367] Remove ReplaceExceptWithFilter from nonExcludableRules URL: https://github.com/apache/spark/pull/23316 ## What changes were proposed in this pull request? ReplaceExceptWithFilter is optional and thus remove it from nonExcludableRules ## How was this patch tested? N/A This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove ReplaceExceptWithFilter from nonExcludableRules > -- > > Key: SPARK-26367 > URL: https://issues.apache.org/jira/browse/SPARK-26367 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Xiao Li >Assignee: Xiao Li >Priority: Major > > ReplaceExceptWithFilter is optional and thus remove it from nonExcludableRules -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26367) Remove ReplaceExceptWithFilter from nonExcludableRules
Xiao Li created SPARK-26367: --- Summary: Remove ReplaceExceptWithFilter from nonExcludableRules Key: SPARK-26367 URL: https://issues.apache.org/jira/browse/SPARK-26367 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.4.0 Reporter: Xiao Li Assignee: Xiao Li ReplaceExceptWithFilter is optional and thus remove it from nonExcludableRules -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-23504) Flaky test: RateSourceV2Suite.basic microbatch execution
[ https://issues.apache.org/jira/browse/SPARK-23504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marcelo Vanzin resolved SPARK-23504. Resolution: Cannot Reproduce Closing this for now based on the above comment. > Flaky test: RateSourceV2Suite.basic microbatch execution > > > Key: SPARK-23504 > URL: https://issues.apache.org/jira/browse/SPARK-23504 > Project: Spark > Issue Type: Bug > Components: SQL, Tests >Affects Versions: 2.4.0 >Reporter: Marcelo Vanzin >Priority: Major > > Seen on an unrelated change: > https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87635/testReport/org.apache.spark.sql.execution.streaming/RateSourceV2Suite/basic_microbatch_execution/ > {noformat} > Error Message > org.scalatest.exceptions.TestFailedException: == Results == !== Correct > Answer - 10 == == Spark Answer - 0 == !struct<_1:timestamp,_2:int> > struct<> ![1969-12-31 16:00:00.0,0] ![1969-12-31 16:00:00.1,1] > ![1969-12-31 16:00:00.2,2] ![1969-12-31 16:00:00.3,3] ![1969-12-31 > 16:00:00.4,4] ![1969-12-31 16:00:00.5,5] ![1969-12-31 16:00:00.6,6] > ![1969-12-31 16:00:00.7,7] ![1969-12-31 16:00:00.8,8] > ![1969-12-31 16:00:00.9,9]== Progress == > AdvanceRateManualClock(1) => CheckLastBatch: [1969-12-31 > 16:00:00.0,0],[1969-12-31 16:00:00.1,1],[1969-12-31 16:00:00.2,2],[1969-12-31 > 16:00:00.3,3],[1969-12-31 16:00:00.4,4],[1969-12-31 16:00:00.5,5],[1969-12-31 > 16:00:00.6,6],[1969-12-31 16:00:00.7,7],[1969-12-31 16:00:00.8,8],[1969-12-31 > 16:00:00.9,9]StopStream > StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@22bc97a,Map(),null) > AdvanceRateManualClock(2)CheckLastBatch: [1969-12-31 > 16:00:01.0,10],[1969-12-31 16:00:01.1,11],[1969-12-31 > 16:00:01.2,12],[1969-12-31 16:00:01.3,13],[1969-12-31 > 16:00:01.4,14],[1969-12-31 16:00:01.5,15],[1969-12-31 > 16:00:01.6,16],[1969-12-31 16:00:01.7,17],[1969-12-31 > 16:00:01.8,18],[1969-12-31 16:00:01.9,19] == Stream == Output Mode: Append > Stream state: > {org.apache.spark.sql.execution.streaming.sources.RateStreamMicroBatchReader@75b88292: > {"0":{"value":-1,"runTimeMs":0}}} Thread state: alive Thread stack trace: > sun.misc.Unsafe.park(Native Method) > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202) > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153) > org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:222) > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:633) > org.apache.spark.SparkContext.runJob(SparkContext.scala:2030) > org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2.scala:84) > > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) > > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247) > org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294) > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272) > org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722) > org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722) > org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253) > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) > org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252) > org.apache.spark.sql.Dataset.collect(Dataset.scala:2722) > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$18.apply(MicroBatchExecution.scala:493) > > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) > >
[jira] [Assigned] (SPARK-26366) Except with transform regression
[ https://issues.apache.org/jira/browse/SPARK-26366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26366: Assignee: Apache Spark > Except with transform regression > > > Key: SPARK-26366 > URL: https://issues.apache.org/jira/browse/SPARK-26366 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.3.2 >Reporter: Dan Osipov >Assignee: Apache Spark >Priority: Major > > There appears to be a regression between Spark 2.2 and 2.3. Below is the code > to reproduce it: > > {code:java} > import org.apache.spark.sql.functions.col > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > val inputDF = spark.sqlContext.createDataFrame( > spark.sparkContext.parallelize(Seq( > Row("0", "john", "smith", "j...@smith.com"), > Row("1", "jane", "doe", "j...@doe.com"), > Row("2", "apache", "spark", "sp...@apache.org"), > Row("3", "foo", "bar", null) > )), > StructType(List( > StructField("id", StringType, nullable=true), > StructField("first_name", StringType, nullable=true), > StructField("last_name", StringType, nullable=true), > StructField("email", StringType, nullable=true) > )) > ) > val exceptDF = inputDF.transform( toProcessDF => > toProcessDF.filter( > ( > col("first_name").isin(Seq("john", "jane"): _*) > and col("last_name").isin(Seq("smith", "doe"): _*) > ) > or col("email").isin(List(): _*) > ) > ) > inputDF.except(exceptDF).show() > {code} > Output with Spark 2.2: > {noformat} > +---+--+-++ > | id|first_name|last_name| email| > +---+--+-++ > | 2| apache| spark|sp...@apache.org| > | 3| foo| bar| null| > +---+--+-++{noformat} > Output with Spark 2.3: > {noformat} > +---+--+-++ > | id|first_name|last_name| email| > +---+--+-++ > | 2| apache| spark|sp...@apache.org| > +---+--+-++{noformat} > Note, changing the last line to > {code:java} > inputDF.except(exceptDF.cache()).show() > {code} > produces identical output for both Spark 2.3 and 2.2 > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26366) Except with transform regression
[ https://issues.apache.org/jira/browse/SPARK-26366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26366: Assignee: (was: Apache Spark) > Except with transform regression > > > Key: SPARK-26366 > URL: https://issues.apache.org/jira/browse/SPARK-26366 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.3.2 >Reporter: Dan Osipov >Priority: Major > > There appears to be a regression between Spark 2.2 and 2.3. Below is the code > to reproduce it: > > {code:java} > import org.apache.spark.sql.functions.col > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > val inputDF = spark.sqlContext.createDataFrame( > spark.sparkContext.parallelize(Seq( > Row("0", "john", "smith", "j...@smith.com"), > Row("1", "jane", "doe", "j...@doe.com"), > Row("2", "apache", "spark", "sp...@apache.org"), > Row("3", "foo", "bar", null) > )), > StructType(List( > StructField("id", StringType, nullable=true), > StructField("first_name", StringType, nullable=true), > StructField("last_name", StringType, nullable=true), > StructField("email", StringType, nullable=true) > )) > ) > val exceptDF = inputDF.transform( toProcessDF => > toProcessDF.filter( > ( > col("first_name").isin(Seq("john", "jane"): _*) > and col("last_name").isin(Seq("smith", "doe"): _*) > ) > or col("email").isin(List(): _*) > ) > ) > inputDF.except(exceptDF).show() > {code} > Output with Spark 2.2: > {noformat} > +---+--+-++ > | id|first_name|last_name| email| > +---+--+-++ > | 2| apache| spark|sp...@apache.org| > | 3| foo| bar| null| > +---+--+-++{noformat} > Output with Spark 2.3: > {noformat} > +---+--+-++ > | id|first_name|last_name| email| > +---+--+-++ > | 2| apache| spark|sp...@apache.org| > +---+--+-++{noformat} > Note, changing the last line to > {code:java} > inputDF.except(exceptDF.cache()).show() > {code} > produces identical output for both Spark 2.3 and 2.2 > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26366) Except with transform regression
[ https://issues.apache.org/jira/browse/SPARK-26366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720464#comment-16720464 ] ASF GitHub Bot commented on SPARK-26366: mgaido91 opened a new pull request #23315: [SPARK-26366][SQL] ReplaceExceptWithFilter should consider NULL as False URL: https://github.com/apache/spark/pull/23315 ## What changes were proposed in this pull request? In `ReplaceExceptWithFilter` we do not consider the case in which the condition returns NULL. Indeed, in that case, negating NULL still returns NULL, so it is not true the assumption that negating the condition returns all the rows which didn't satisfy it: rows returning NULL are not returned. The PR fixes this problem by returning False for the condition when it is Null. In this way we do return all the rows which didn't satisfy it. ## How was this patch tested? added UTs This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Except with transform regression > > > Key: SPARK-26366 > URL: https://issues.apache.org/jira/browse/SPARK-26366 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 2.3.2 >Reporter: Dan Osipov >Priority: Major > > There appears to be a regression between Spark 2.2 and 2.3. Below is the code > to reproduce it: > > {code:java} > import org.apache.spark.sql.functions.col > import org.apache.spark.sql.Row > import org.apache.spark.sql.types._ > val inputDF = spark.sqlContext.createDataFrame( > spark.sparkContext.parallelize(Seq( > Row("0", "john", "smith", "j...@smith.com"), > Row("1", "jane", "doe", "j...@doe.com"), > Row("2", "apache", "spark", "sp...@apache.org"), > Row("3", "foo", "bar", null) > )), > StructType(List( > StructField("id", StringType, nullable=true), > StructField("first_name", StringType, nullable=true), > StructField("last_name", StringType, nullable=true), > StructField("email", StringType, nullable=true) > )) > ) > val exceptDF = inputDF.transform( toProcessDF => > toProcessDF.filter( > ( > col("first_name").isin(Seq("john", "jane"): _*) > and col("last_name").isin(Seq("smith", "doe"): _*) > ) > or col("email").isin(List(): _*) > ) > ) > inputDF.except(exceptDF).show() > {code} > Output with Spark 2.2: > {noformat} > +---+--+-++ > | id|first_name|last_name| email| > +---+--+-++ > | 2| apache| spark|sp...@apache.org| > | 3| foo| bar| null| > +---+--+-++{noformat} > Output with Spark 2.3: > {noformat} > +---+--+-++ > | id|first_name|last_name| email| > +---+--+-++ > | 2| apache| spark|sp...@apache.org| > +---+--+-++{noformat} > Note, changing the last line to > {code:java} > inputDF.except(exceptDF.cache()).show() > {code} > produces identical output for both Spark 2.3 and 2.2 > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26098) Show associated SQL query in Job page
[ https://issues.apache.org/jira/browse/SPARK-26098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li resolved SPARK-26098. - Resolution: Fixed Assignee: Gengliang Wang Fix Version/s: 3.0.0 > Show associated SQL query in Job page > - > > Key: SPARK-26098 > URL: https://issues.apache.org/jira/browse/SPARK-26098 > Project: Spark > Issue Type: Improvement > Components: Web UI >Affects Versions: 3.0.0 >Reporter: Gengliang Wang >Assignee: Gengliang Wang >Priority: Major > Fix For: 3.0.0 > > > For jobs associated to SQL queries, it would be easier to understand the > context to showing the SQL query in Job detail page. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26364) Clean up import statements in pandas udf tests
[ https://issues.apache.org/jira/browse/SPARK-26364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26364: Assignee: Apache Spark > Clean up import statements in pandas udf tests > -- > > Key: SPARK-26364 > URL: https://issues.apache.org/jira/browse/SPARK-26364 > Project: Spark > Issue Type: Improvement > Components: Tests >Affects Versions: 2.4.0 >Reporter: Li Jin >Assignee: Apache Spark >Priority: Minor > > Per discussion [https://github.com/apache/spark/pull/22305/files#r241215618] > we should clean up the import statements in test_pandas_udf* and move them to > the top. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26364) Clean up import statements in pandas udf tests
[ https://issues.apache.org/jira/browse/SPARK-26364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26364: Assignee: (was: Apache Spark) > Clean up import statements in pandas udf tests > -- > > Key: SPARK-26364 > URL: https://issues.apache.org/jira/browse/SPARK-26364 > Project: Spark > Issue Type: Improvement > Components: Tests >Affects Versions: 2.4.0 >Reporter: Li Jin >Priority: Minor > > Per discussion [https://github.com/apache/spark/pull/22305/files#r241215618] > we should clean up the import statements in test_pandas_udf* and move them to > the top. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26364) Clean up import statements in pandas udf tests
[ https://issues.apache.org/jira/browse/SPARK-26364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720407#comment-16720407 ] ASF GitHub Bot commented on SPARK-26364: icexelloss opened a new pull request #23314: [SPARK-26364][PYTHON][TESTING] Clean up imports in test_pandas_udf* URL: https://github.com/apache/spark/pull/23314 ## What changes were proposed in this pull request? Clean up unconditional import statements and move them to the top. Conditional imports (pandas, numpy, pyarrow) are left as-is. ## How was this patch tested? Exising tests. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Clean up import statements in pandas udf tests > -- > > Key: SPARK-26364 > URL: https://issues.apache.org/jira/browse/SPARK-26364 > Project: Spark > Issue Type: Improvement > Components: Tests >Affects Versions: 2.4.0 >Reporter: Li Jin >Priority: Minor > > Per discussion [https://github.com/apache/spark/pull/22305/files#r241215618] > we should clean up the import statements in test_pandas_udf* and move them to > the top. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26098) Show associated SQL query in Job page
[ https://issues.apache.org/jira/browse/SPARK-26098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720408#comment-16720408 ] ASF GitHub Bot commented on SPARK-26098: asfgit closed pull request #23068: [SPARK-26098][WebUI] Show associated SQL query in Job page URL: https://github.com/apache/spark/pull/23068 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index bd3f58b6182c0..262ff6547faa5 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -70,6 +70,8 @@ private[spark] class AppStatusListener( private val liveTasks = new HashMap[Long, LiveTask]() private val liveRDDs = new HashMap[Int, LiveRDD]() private val pools = new HashMap[String, SchedulerPool]() + + private val SQL_EXECUTION_ID_KEY = "spark.sql.execution.id" // Keep the active executor count as a separate variable to avoid having to do synchronization // around liveExecutors. @volatile private var activeExecutorCount = 0 @@ -318,6 +320,8 @@ private[spark] class AppStatusListener( val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)") val jobGroup = Option(event.properties) .flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) } +val sqlExecutionId = Option(event.properties) + .flatMap(p => Option(p.getProperty(SQL_EXECUTION_ID_KEY)).map(_.toLong)) val job = new LiveJob( event.jobId, @@ -325,7 +329,8 @@ private[spark] class AppStatusListener( if (event.time > 0) Some(new Date(event.time)) else None, event.stageIds, jobGroup, - numTasks) + numTasks, + sqlExecutionId) liveJobs.put(event.jobId, job) liveUpdate(job, now) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index b35781cb36e81..312bcccb1cca1 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -56,6 +56,13 @@ private[spark] class AppStatusStore( store.read(classOf[JobDataWrapper], jobId).info } + // Returns job data and associated SQL execution ID of certain Job ID. + // If there is no related SQL execution, the SQL execution ID part will be None. + def jobWithAssociatedSql(jobId: Int): (v1.JobData, Option[Long]) = { +val data = store.read(classOf[JobDataWrapper], jobId) +(data.info, data.sqlExecutionId) + } + def executorList(activeOnly: Boolean): Seq[v1.ExecutorSummary] = { val base = store.view(classOf[ExecutorSummaryWrapper]) val filtered = if (activeOnly) { diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 47e45a66ecccb..7f7b83a54d794 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -64,7 +64,8 @@ private class LiveJob( val submissionTime: Option[Date], val stageIds: Seq[Int], jobGroup: Option[String], -numTasks: Int) extends LiveEntity { +numTasks: Int, +sqlExecutionId: Option[Long]) extends LiveEntity { var activeTasks = 0 var completedTasks = 0 @@ -108,7 +109,7 @@ private class LiveJob( skippedStages.size, failedStages, killedSummary) -new JobDataWrapper(info, skippedStages) +new JobDataWrapper(info, skippedStages, sqlExecutionId) } } diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index ef19e86f3135f..eea47b3b17098 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -68,7 +68,8 @@ private[spark] class ExecutorSummaryWrapper(val info: ExecutorSummary) { */ private[spark] class JobDataWrapper( val info: JobData, -val skippedStages: Set[Int]) { +val skippedStages: Set[Int], +val sqlExecutionId: Option[Long]) { @JsonIgnore @KVIndex private def id: Int = info.jobId diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala index 55444a2c0c9ab..b58a6ca447edf 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala @@ -189,7 +189,7 @@ private[ui]
[jira] [Assigned] (SPARK-26315) auto cast threshold from Integer to Float in approxSimilarityJoin of BucketedRandomProjectionLSHModel
[ https://issues.apache.org/jira/browse/SPARK-26315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26315: Assignee: Apache Spark > auto cast threshold from Integer to Float in approxSimilarityJoin of > BucketedRandomProjectionLSHModel > - > > Key: SPARK-26315 > URL: https://issues.apache.org/jira/browse/SPARK-26315 > Project: Spark > Issue Type: Improvement > Components: MLlib, PySpark >Affects Versions: 2.3.2 >Reporter: Song Ci >Assignee: Apache Spark >Priority: Major > > when I was using > {code:java} > // code placeholder > BucketedRandomProjectionLSHModel.approxSimilarityJoin(dt_features, > dt_features, distCol="EuclideanDistance", threshold=20.) > {code} > I was confused then that this method reported an exception some java method > (dataset, dataset, integer, string) fingerprint can not be found I think > if I give an integer, and the python method of pyspark should be auto-cast > this to float if needed. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26315) auto cast threshold from Integer to Float in approxSimilarityJoin of BucketedRandomProjectionLSHModel
[ https://issues.apache.org/jira/browse/SPARK-26315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720369#comment-16720369 ] ASF GitHub Bot commented on SPARK-26315: jerryjch opened a new pull request #23313: [SPARK-26315][PYSPARk] auto cast threshold from Integer to Float in approxSimilarityJoin of BucketedRandomProjectionLSHModel URL: https://github.com/apache/spark/pull/23313 ## What changes were proposed in this pull request? If the input parameter 'threshold' to the function approxSimilarityJoin is not a float, we would get an exception. The fix is to convert the 'threshold' into a float before calling the java implementation method. ## How was this patch tested? Added a new test case. Without this fix, the test will throw an exception as reported in the JIRA. With the fix, the test passes. Please review http://spark.apache.org/contributing.html before opening a pull request. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > auto cast threshold from Integer to Float in approxSimilarityJoin of > BucketedRandomProjectionLSHModel > - > > Key: SPARK-26315 > URL: https://issues.apache.org/jira/browse/SPARK-26315 > Project: Spark > Issue Type: Improvement > Components: MLlib, PySpark >Affects Versions: 2.3.2 >Reporter: Song Ci >Priority: Major > > when I was using > {code:java} > // code placeholder > BucketedRandomProjectionLSHModel.approxSimilarityJoin(dt_features, > dt_features, distCol="EuclideanDistance", threshold=20.) > {code} > I was confused then that this method reported an exception some java method > (dataset, dataset, integer, string) fingerprint can not be found I think > if I give an integer, and the python method of pyspark should be auto-cast > this to float if needed. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26365) spark-submit for k8s cluster doesn't propagate exit code
Oscar Bonilla created SPARK-26365: - Summary: spark-submit for k8s cluster doesn't propagate exit code Key: SPARK-26365 URL: https://issues.apache.org/jira/browse/SPARK-26365 Project: Spark Issue Type: Bug Components: Kubernetes, Spark Submit Affects Versions: 2.3.2 Reporter: Oscar Bonilla When launching apps using spark-submit in a kubernetes cluster, if the Spark applications fails (returns exit code = 1 for example), spark-submit will still exit gracefully and return exit code = 0. This is problematic, since there's no way to know if there's been a problem with the Spark application. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26315) auto cast threshold from Integer to Float in approxSimilarityJoin of BucketedRandomProjectionLSHModel
[ https://issues.apache.org/jira/browse/SPARK-26315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26315: Assignee: (was: Apache Spark) > auto cast threshold from Integer to Float in approxSimilarityJoin of > BucketedRandomProjectionLSHModel > - > > Key: SPARK-26315 > URL: https://issues.apache.org/jira/browse/SPARK-26315 > Project: Spark > Issue Type: Improvement > Components: MLlib, PySpark >Affects Versions: 2.3.2 >Reporter: Song Ci >Priority: Major > > when I was using > {code:java} > // code placeholder > BucketedRandomProjectionLSHModel.approxSimilarityJoin(dt_features, > dt_features, distCol="EuclideanDistance", threshold=20.) > {code} > I was confused then that this method reported an exception some java method > (dataset, dataset, integer, string) fingerprint can not be found I think > if I give an integer, and the python method of pyspark should be auto-cast > this to float if needed. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26255) Custom error/exception is not thrown for the SQL tab when UI filters are added in spark-sql launch
[ https://issues.apache.org/jira/browse/SPARK-26255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chakravarthi updated SPARK-26255: - Attachment: command.png > Custom error/exception is not thrown for the SQL tab when UI filters are > added in spark-sql launch > -- > > Key: SPARK-26255 > URL: https://issues.apache.org/jira/browse/SPARK-26255 > Project: Spark > Issue Type: Bug > Components: SQL, Web UI >Affects Versions: 2.3.2 > Environment: 【Test Environment】: > Server OS :-SUSE > No. of Cluster Node:-3 > Spark Version:- 2.3.2 > Hadoop Version:-3.1 >Reporter: Sushanta Sen >Priority: Major > Attachments: command.png, logs_before_fix.png, ui_befofre_fix.png > > > 【Detailed description】:Custom error is not thrown for the SQL tab when UI > filters are added in spark-sql launch > 【Precondition】: > 1.Cluster is up and running【Test step】: > 1. Launch spark sql as below: > [spark-sql --master yarn --conf > spark.ui.filters=org.apache.hadoop.security.authentication.server.AuthenticationFilter > --conf > spark.org.apache.hadoop.security.authentication.server.AuthenticationFilter.params="type=simple"] > 2. Go to Yarn application list UI link > 3. Launch the application master for the Spark-SQL app ID > 4. It will display an error > 5. Append /executors, /stages, /jobs, /environment, /SQL > 【Expect Output】:An error should be displayed "An error has occurred. Please > check for all the TABS > 【Actual Output】:The error message is displayed for all the tabs except SQL > tab . -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26366) Except with transform regression
Dan Osipov created SPARK-26366: -- Summary: Except with transform regression Key: SPARK-26366 URL: https://issues.apache.org/jira/browse/SPARK-26366 Project: Spark Issue Type: Bug Components: Spark Core, SQL Affects Versions: 2.3.2 Reporter: Dan Osipov There appears to be a regression between Spark 2.2 and 2.3. Below is the code to reproduce it: {code:java} import org.apache.spark.sql.functions.col import org.apache.spark.sql.Row import org.apache.spark.sql.types._ val inputDF = spark.sqlContext.createDataFrame( spark.sparkContext.parallelize(Seq( Row("0", "john", "smith", "j...@smith.com"), Row("1", "jane", "doe", "j...@doe.com"), Row("2", "apache", "spark", "sp...@apache.org"), Row("3", "foo", "bar", null) )), StructType(List( StructField("id", StringType, nullable=true), StructField("first_name", StringType, nullable=true), StructField("last_name", StringType, nullable=true), StructField("email", StringType, nullable=true) )) ) val exceptDF = inputDF.transform( toProcessDF => toProcessDF.filter( ( col("first_name").isin(Seq("john", "jane"): _*) and col("last_name").isin(Seq("smith", "doe"): _*) ) or col("email").isin(List(): _*) ) ) inputDF.except(exceptDF).show() {code} Output with Spark 2.2: {noformat} +---+--+-++ | id|first_name|last_name| email| +---+--+-++ | 2| apache| spark|sp...@apache.org| | 3| foo| bar| null| +---+--+-++{noformat} Output with Spark 2.3: {noformat} +---+--+-++ | id|first_name|last_name| email| +---+--+-++ | 2| apache| spark|sp...@apache.org| +---+--+-++{noformat} Note, changing the last line to {code:java} inputDF.except(exceptDF.cache()).show() {code} produces identical output for both Spark 2.3 and 2.2 -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26255) Custom error/exception is not thrown for the SQL tab when UI filters are added in spark-sql launch
[ https://issues.apache.org/jira/browse/SPARK-26255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26255: Assignee: (was: Apache Spark) > Custom error/exception is not thrown for the SQL tab when UI filters are > added in spark-sql launch > -- > > Key: SPARK-26255 > URL: https://issues.apache.org/jira/browse/SPARK-26255 > Project: Spark > Issue Type: Bug > Components: SQL, Web UI >Affects Versions: 2.3.2 > Environment: 【Test Environment】: > Server OS :-SUSE > No. of Cluster Node:-3 > Spark Version:- 2.3.2 > Hadoop Version:-3.1 >Reporter: Sushanta Sen >Priority: Major > > 【Detailed description】:Custom error is not thrown for the SQL tab when UI > filters are added in spark-sql launch > 【Precondition】: > 1.Cluster is up and running【Test step】: > 1. Launch spark sql as below: > [spark-sql --master yarn --conf > spark.ui.filters=org.apache.hadoop.security.authentication.server.AuthenticationFilter > --conf > spark.org.apache.hadoop.security.authentication.server.AuthenticationFilter.params="type=simple"] > 2. Go to Yarn application list UI link > 3. Launch the application master for the Spark-SQL app ID > 4. It will display an error > 5. Append /executors, /stages, /jobs, /environment, /SQL > 【Expect Output】:An error should be displayed "An error has occurred. Please > check for all the TABS > 【Actual Output】:The error message is displayed for all the tabs except SQL > tab . -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-26255) Custom error/exception is not thrown for the SQL tab when UI filters are added in spark-sql launch
[ https://issues.apache.org/jira/browse/SPARK-26255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720353#comment-16720353 ] Chakravarthi edited comment on SPARK-26255 at 12/13/18 4:16 PM: [~hyukjin.kwon] I have attached the snapshots was (Author: chakravarthi): [~hyukjin.kwon]I have attached the snapshots > Custom error/exception is not thrown for the SQL tab when UI filters are > added in spark-sql launch > -- > > Key: SPARK-26255 > URL: https://issues.apache.org/jira/browse/SPARK-26255 > Project: Spark > Issue Type: Bug > Components: SQL, Web UI >Affects Versions: 2.3.2 > Environment: 【Test Environment】: > Server OS :-SUSE > No. of Cluster Node:-3 > Spark Version:- 2.3.2 > Hadoop Version:-3.1 >Reporter: Sushanta Sen >Priority: Major > Attachments: command.png, logs_before_fix.png, ui_befofre_fix.png > > > 【Detailed description】:Custom error is not thrown for the SQL tab when UI > filters are added in spark-sql launch > 【Precondition】: > 1.Cluster is up and running【Test step】: > 1. Launch spark sql as below: > [spark-sql --master yarn --conf > spark.ui.filters=org.apache.hadoop.security.authentication.server.AuthenticationFilter > --conf > spark.org.apache.hadoop.security.authentication.server.AuthenticationFilter.params="type=simple"] > 2. Go to Yarn application list UI link > 3. Launch the application master for the Spark-SQL app ID > 4. It will display an error > 5. Append /executors, /stages, /jobs, /environment, /SQL > 【Expect Output】:An error should be displayed "An error has occurred. Please > check for all the TABS > 【Actual Output】:The error message is displayed for all the tabs except SQL > tab . -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26255) Custom error/exception is not thrown for the SQL tab when UI filters are added in spark-sql launch
[ https://issues.apache.org/jira/browse/SPARK-26255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720353#comment-16720353 ] Chakravarthi commented on SPARK-26255: -- [~hyukjin.kwon]I have attached the snapshots > Custom error/exception is not thrown for the SQL tab when UI filters are > added in spark-sql launch > -- > > Key: SPARK-26255 > URL: https://issues.apache.org/jira/browse/SPARK-26255 > Project: Spark > Issue Type: Bug > Components: SQL, Web UI >Affects Versions: 2.3.2 > Environment: 【Test Environment】: > Server OS :-SUSE > No. of Cluster Node:-3 > Spark Version:- 2.3.2 > Hadoop Version:-3.1 >Reporter: Sushanta Sen >Priority: Major > Attachments: command.png, logs_before_fix.png, ui_befofre_fix.png > > > 【Detailed description】:Custom error is not thrown for the SQL tab when UI > filters are added in spark-sql launch > 【Precondition】: > 1.Cluster is up and running【Test step】: > 1. Launch spark sql as below: > [spark-sql --master yarn --conf > spark.ui.filters=org.apache.hadoop.security.authentication.server.AuthenticationFilter > --conf > spark.org.apache.hadoop.security.authentication.server.AuthenticationFilter.params="type=simple"] > 2. Go to Yarn application list UI link > 3. Launch the application master for the Spark-SQL app ID > 4. It will display an error > 5. Append /executors, /stages, /jobs, /environment, /SQL > 【Expect Output】:An error should be displayed "An error has occurred. Please > check for all the TABS > 【Actual Output】:The error message is displayed for all the tabs except SQL > tab . -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26255) Custom error/exception is not thrown for the SQL tab when UI filters are added in spark-sql launch
[ https://issues.apache.org/jira/browse/SPARK-26255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chakravarthi updated SPARK-26255: - Attachment: logs_before_fix.png > Custom error/exception is not thrown for the SQL tab when UI filters are > added in spark-sql launch > -- > > Key: SPARK-26255 > URL: https://issues.apache.org/jira/browse/SPARK-26255 > Project: Spark > Issue Type: Bug > Components: SQL, Web UI >Affects Versions: 2.3.2 > Environment: 【Test Environment】: > Server OS :-SUSE > No. of Cluster Node:-3 > Spark Version:- 2.3.2 > Hadoop Version:-3.1 >Reporter: Sushanta Sen >Priority: Major > Attachments: logs_before_fix.png, ui_befofre_fix.png > > > 【Detailed description】:Custom error is not thrown for the SQL tab when UI > filters are added in spark-sql launch > 【Precondition】: > 1.Cluster is up and running【Test step】: > 1. Launch spark sql as below: > [spark-sql --master yarn --conf > spark.ui.filters=org.apache.hadoop.security.authentication.server.AuthenticationFilter > --conf > spark.org.apache.hadoop.security.authentication.server.AuthenticationFilter.params="type=simple"] > 2. Go to Yarn application list UI link > 3. Launch the application master for the Spark-SQL app ID > 4. It will display an error > 5. Append /executors, /stages, /jobs, /environment, /SQL > 【Expect Output】:An error should be displayed "An error has occurred. Please > check for all the TABS > 【Actual Output】:The error message is displayed for all the tabs except SQL > tab . -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26255) Custom error/exception is not thrown for the SQL tab when UI filters are added in spark-sql launch
[ https://issues.apache.org/jira/browse/SPARK-26255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chakravarthi updated SPARK-26255: - Attachment: ui_befofre_fix.png > Custom error/exception is not thrown for the SQL tab when UI filters are > added in spark-sql launch > -- > > Key: SPARK-26255 > URL: https://issues.apache.org/jira/browse/SPARK-26255 > Project: Spark > Issue Type: Bug > Components: SQL, Web UI >Affects Versions: 2.3.2 > Environment: 【Test Environment】: > Server OS :-SUSE > No. of Cluster Node:-3 > Spark Version:- 2.3.2 > Hadoop Version:-3.1 >Reporter: Sushanta Sen >Priority: Major > Attachments: logs_before_fix.png, ui_befofre_fix.png > > > 【Detailed description】:Custom error is not thrown for the SQL tab when UI > filters are added in spark-sql launch > 【Precondition】: > 1.Cluster is up and running【Test step】: > 1. Launch spark sql as below: > [spark-sql --master yarn --conf > spark.ui.filters=org.apache.hadoop.security.authentication.server.AuthenticationFilter > --conf > spark.org.apache.hadoop.security.authentication.server.AuthenticationFilter.params="type=simple"] > 2. Go to Yarn application list UI link > 3. Launch the application master for the Spark-SQL app ID > 4. It will display an error > 5. Append /executors, /stages, /jobs, /environment, /SQL > 【Expect Output】:An error should be displayed "An error has occurred. Please > check for all the TABS > 【Actual Output】:The error message is displayed for all the tabs except SQL > tab . -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26255) Custom error/exception is not thrown for the SQL tab when UI filters are added in spark-sql launch
[ https://issues.apache.org/jira/browse/SPARK-26255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26255: Assignee: Apache Spark > Custom error/exception is not thrown for the SQL tab when UI filters are > added in spark-sql launch > -- > > Key: SPARK-26255 > URL: https://issues.apache.org/jira/browse/SPARK-26255 > Project: Spark > Issue Type: Bug > Components: SQL, Web UI >Affects Versions: 2.3.2 > Environment: 【Test Environment】: > Server OS :-SUSE > No. of Cluster Node:-3 > Spark Version:- 2.3.2 > Hadoop Version:-3.1 >Reporter: Sushanta Sen >Assignee: Apache Spark >Priority: Major > > 【Detailed description】:Custom error is not thrown for the SQL tab when UI > filters are added in spark-sql launch > 【Precondition】: > 1.Cluster is up and running【Test step】: > 1. Launch spark sql as below: > [spark-sql --master yarn --conf > spark.ui.filters=org.apache.hadoop.security.authentication.server.AuthenticationFilter > --conf > spark.org.apache.hadoop.security.authentication.server.AuthenticationFilter.params="type=simple"] > 2. Go to Yarn application list UI link > 3. Launch the application master for the Spark-SQL app ID > 4. It will display an error > 5. Append /executors, /stages, /jobs, /environment, /SQL > 【Expect Output】:An error should be displayed "An error has occurred. Please > check for all the TABS > 【Actual Output】:The error message is displayed for all the tabs except SQL > tab . -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26255) Custom error/exception is not thrown for the SQL tab when UI filters are added in spark-sql launch
[ https://issues.apache.org/jira/browse/SPARK-26255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720340#comment-16720340 ] ASF GitHub Bot commented on SPARK-26255: chakravarthiT opened a new pull request #23312: [SPARK-26255]Custom error/exception is not thrown for the SQL tab when UI filters are added in spark-sql launch URL: https://github.com/apache/spark/pull/23312 ## What changes were proposed in this pull request? User specified filters are not applied to SQL tab in yarn mode, as it is overridden by the yarn AmIp filter. So we need to append user provided filters (spark.ui.filters) with yarn filter. ## How was this patch tested? 【Test step】: 1) Launch spark sql with authentication filter as below: 2) spark-sql --master yarn --conf spark.ui.filters=org.apache.hadoop.security.authentication.server.AuthenticationFilter --conf spark.org.apache.hadoop.security.authentication.server.AuthenticationFilter.params="type=simple" 3) Go to Yarn application list UI link 4) Launch the application master for the Spark-SQL app ID and access all the tabs by appending tab name. 5) It will display an error for all tabs including SQL tab.(before able to access SQL tab,as Authentication filter is not applied for SQL tab) 6) Also can be verified with info logs,that Authentication filter applied to SQL tab.(before it is not applied). I have attached the behaviour below in following order.. 1) Command used 2) Before fix (logs and UI) 3) After fix (logs and UI) **1) COMMAND USED**: launching spark-sql with authentication filter. ![image](https://user-images.githubusercontent.com/45845595/49947295-e7e97400-ff16-11e8-8c9a-10659487ddee.png) **2) BEFORE FIX:** **UI result:** able to access SQL tab. ![image](https://user-images.githubusercontent.com/45845595/49948398-62b38e80-ff19-11e8-95dc-e74f9e3c2ba7.png) **logs**: authentication filter not applied to SQL tab. ![image](https://user-images.githubusercontent.com/45845595/49947343-ff286180-ff16-11e8-9de0-3f8db140bc32.png) **3) AFTER FIX:** **UI result**: Not able to access SQL tab. ![image](https://user-images.githubusercontent.com/45845595/49947360-0d767d80-ff17-11e8-9e9e-a95311949164.png) **in logs**: Both yarn filter and Authentication filter applied to SQL tab. ![image](https://user-images.githubusercontent.com/45845595/49947377-1a936c80-ff17-11e8-9f44-700eb3dc0ded.png) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Custom error/exception is not thrown for the SQL tab when UI filters are > added in spark-sql launch > -- > > Key: SPARK-26255 > URL: https://issues.apache.org/jira/browse/SPARK-26255 > Project: Spark > Issue Type: Bug > Components: SQL, Web UI >Affects Versions: 2.3.2 > Environment: 【Test Environment】: > Server OS :-SUSE > No. of Cluster Node:-3 > Spark Version:- 2.3.2 > Hadoop Version:-3.1 >Reporter: Sushanta Sen >Priority: Major > > 【Detailed description】:Custom error is not thrown for the SQL tab when UI > filters are added in spark-sql launch > 【Precondition】: > 1.Cluster is up and running【Test step】: > 1. Launch spark sql as below: > [spark-sql --master yarn --conf > spark.ui.filters=org.apache.hadoop.security.authentication.server.AuthenticationFilter > --conf > spark.org.apache.hadoop.security.authentication.server.AuthenticationFilter.params="type=simple"] > 2. Go to Yarn application list UI link > 3. Launch the application master for the Spark-SQL app ID > 4. It will display an error > 5. Append /executors, /stages, /jobs, /environment, /SQL > 【Expect Output】:An error should be displayed "An error has occurred. Please > check for all the TABS > 【Actual Output】:The error message is displayed for all the tabs except SQL > tab . -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-26222) Scan: track file listing time
[ https://issues.apache.org/jira/browse/SPARK-26222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720088#comment-16720088 ] Yuanjian Li edited comment on SPARK-26222 at 12/13/18 3:43 PM: --- Hi Reynold, the first PR track all file listing time into a independent phase, which happened in analyze, optimize and DataFrameReader/Writer, is this match your thoughts? Or you want to display all the 'real'(real means currently the scan node sql metrics is read from cache) file listing file time spending in FileSourceScanExec node? was (Author: xuanyuan): Hi Reynold, the first PR track all file listing time into a independent phase, which happened in analyze, optimize and DataFrameReader/Writer, is this match your thoughts? > Scan: track file listing time > - > > Key: SPARK-26222 > URL: https://issues.apache.org/jira/browse/SPARK-26222 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.4.0 >Reporter: Reynold Xin >Priority: Major > > We should track file listing time and add it to the scan node's SQL metric, > so we have visibility how much is spent in file listing. It'd be useful to > track not just duration, but also start and end time so we can construct a > timeline. > This requires a little bit design to define what file listing time means, > when we are reading from cache, vs not cache. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26364) Clean up import statements in pandas udf tests
Li Jin created SPARK-26364: -- Summary: Clean up import statements in pandas udf tests Key: SPARK-26364 URL: https://issues.apache.org/jira/browse/SPARK-26364 Project: Spark Issue Type: Improvement Components: Tests Affects Versions: 2.4.0 Reporter: Li Jin Per discussion [https://github.com/apache/spark/pull/22305/files#r241215618] we should clean up the import statements in test_pandas_udf* and move them to the top. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26362) Deprecate 'spark.driver.allowMultipleContexts' to disallow multiple Spark contexts
[ https://issues.apache.org/jira/browse/SPARK-26362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26362: Assignee: (was: Apache Spark) > Deprecate 'spark.driver.allowMultipleContexts' to disallow multiple Spark > contexts > -- > > Key: SPARK-26362 > URL: https://issues.apache.org/jira/browse/SPARK-26362 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Hyukjin Kwon >Priority: Major > > Multiple Spark contexts are discouraged and it has been warning from 4 years > ago (see SPARK-4180). > It could cause arbitrary and mysterious error cases. (Honestly, I didn't even > know Spark allows it). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26362) Deprecate 'spark.driver.allowMultipleContexts' to disallow multiple Spark contexts
[ https://issues.apache.org/jira/browse/SPARK-26362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26362: Assignee: Apache Spark > Deprecate 'spark.driver.allowMultipleContexts' to disallow multiple Spark > contexts > -- > > Key: SPARK-26362 > URL: https://issues.apache.org/jira/browse/SPARK-26362 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Hyukjin Kwon >Assignee: Apache Spark >Priority: Major > > Multiple Spark contexts are discouraged and it has been warning from 4 years > ago (see SPARK-4180). > It could cause arbitrary and mysterious error cases. (Honestly, I didn't even > know Spark allows it). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26362) Deprecate 'spark.driver.allowMultipleContexts' to disallow multiple Spark contexts
[ https://issues.apache.org/jira/browse/SPARK-26362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720318#comment-16720318 ] ASF GitHub Bot commented on SPARK-26362: HyukjinKwon opened a new pull request #23311: [SPARK-26362][CORE] Deprecate 'spark.driver.allowMultipleContexts' to discourage multiple creation of SparkContexts URL: https://github.com/apache/spark/pull/23311 ## What changes were proposed in this pull request? Multiple SparkContexts are discouraged and it has been warning for last 4 years ago (see SPARK-4180). It could cause arbitrary and mysterious error cases. Honestly, I didn't even know Spark still allows it, which looks never officially supported - see SPARK-2243. I believe It should be good timing now to deprecate this configuration. ## How was this patch tested? Manually tested: ```bash $ ./bin/spark-shell --conf=spark.driver.allowMultipleContexts=true ... Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 18/12/13 23:36:48 WARN SparkContext: 'spark.driver.allowMultipleContexts' is deprecated as of Spark 3.0.0, and creation of multiple SparkContexts will be disallowed afterward. ... ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Deprecate 'spark.driver.allowMultipleContexts' to disallow multiple Spark > contexts > -- > > Key: SPARK-26362 > URL: https://issues.apache.org/jira/browse/SPARK-26362 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Hyukjin Kwon >Priority: Major > > Multiple Spark contexts are discouraged and it has been warning from 4 years > ago (see SPARK-4180). > It could cause arbitrary and mysterious error cases. (Honestly, I didn't even > know Spark allows it). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-26254) Move delegation token providers into a separate project
[ https://issues.apache.org/jira/browse/SPARK-26254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720241#comment-16720241 ] Gabor Somogyi edited comment on SPARK-26254 at 12/13/18 3:29 PM: - {quote}There was concern about using ServiceLoader before, but if the interface being loaded is private to Spark, it's fine with me. {quote} org.apache.spark.deploy.security.HadoopDelegationTokenProvider fulfils it. {quote}Keep HDFS and HBase in core {quote} clear and same idea. {quote}move the Kafka one to some Kafka package {quote} you mean module isn't it? * If we move inside core then the kafka deps remain * If we move to kafka-sql then DStreams will not reach it My suggestion is to create a module something like kafka-token-provider and kafka-sql (+ later DStreams) can depend on that. {quote}the Hive one to the Hive module {quote} clear and same idea (sql/hive). was (Author: gsomogyi): {quote}There was concern about using ServiceLoader before, but if the interface being loaded is private to Spark, it's fine with me. {quote} org.apache.spark.deploy.security.HadoopDelegationTokenProvider fulfils it. {quote}Keep HDFS and HBase in core {quote} clear and same idea. {quote}move the Kafka one to some Kafka package {quote} you mean module isn't it? * If we move inside core then the kafka deps remain * If we move to kafka-sql then DStreams will not reach it My suggestion is to create a module something like kafka-token-provider and kafka-sql (+ later DStreams) can depend on that. {quote}the Hive one to the Hive module {quote} clear and same idea. For example hive-token-provider which extracts the ugly dependencies from core. > Move delegation token providers into a separate project > --- > > Key: SPARK-26254 > URL: https://issues.apache.org/jira/browse/SPARK-26254 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Gabor Somogyi >Priority: Major > > There was a discussion in > [PR#22598|https://github.com/apache/spark/pull/22598] that there are several > provided dependencies inside core project which shouldn't be there (for ex. > hive and kafka). This jira is to solve this problem. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26361) RsuseExchange is not available in case of ColumnPruning
miaojianlong created SPARK-26361: Summary: RsuseExchange is not available in case of ColumnPruning Key: SPARK-26361 URL: https://issues.apache.org/jira/browse/SPARK-26361 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.2.0 Reporter: miaojianlong RsuseExchange is not available if there is a ColumnPruning. see this code: {code:java} val df = spark.createDataFrame(Seq((1, 1, 1))).toDF("a", "b", "c") val groupDf = df.groupBy("a").agg("b"->"max", "c"->"min") // val df1 = groupDf.select("a", "max(b)") val df1 = groupDf.drop("max(b)") val df2 = groupDf.withColumn("d", lit(1)) df1.join(df2, "a").explain(true){code} {code:java} == Analyzed Logical Plan == a: int, min(c): int, max(b): int, min(c): int, d: int Project [a#6, min(c)#16, max(b)#29, min(c)#30, d#24] +- Join Inner, (a#6 = a#34) :- Project [a#6, min(c)#16] : +- Aggregate [a#6], [a#6, max(b#7) AS max(b)#15, min(c#8) AS min(c)#16] : +- Project [_1#0 AS a#6, _2#1 AS b#7, _3#2 AS c#8] : +- LocalRelation [_1#0, _2#1, _3#2] +- Project [a#34, max(b)#29, min(c)#30, 1 AS d#24] +- Aggregate [a#34], [a#34, max(b#35) AS max(b)#29, min(c#36) AS min(c)#30] +- Project [_1#0 AS a#34, _2#1 AS b#35, _3#2 AS c#36] +- LocalRelation [_1#0, _2#1, _3#2] == Optimized Logical Plan == Project [a#6, min(c)#16, max(b)#29, min(c)#30, d#24] +- Join Inner, (a#6 = a#34) :- Aggregate [a#6], [a#6, min(c#8) AS min(c)#16] : +- LocalRelation [a#6, c#8] +- Aggregate [a#34], [a#34, max(b#35) AS max(b)#29, min(c#36) AS min(c)#30, 1 AS d#24] +- LocalRelation [a#34, b#35, c#36] == Physical Plan == *(4) Project [a#6, min(c)#16, max(b)#29, min(c)#30, d#24] +- *(4) BroadcastHashJoin [a#6], [a#34], Inner, BuildLeft :- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) : +- *(2) HashAggregate(keys=[a#6], functions=[min(c#8)], output=[a#6, min(c)#16]) : +- Exchange hashpartitioning(a#6, 200) : +- *(1) HashAggregate(keys=[a#6], functions=[partial_min(c#8)], output=[a#6, min#44]) : +- LocalTableScan [a#6, c#8] +- *(4) HashAggregate(keys=[a#34], functions=[max(b#35), min(c#36)], output=[a#34, max(b)#29, min(c)#30, d#24]) +- Exchange hashpartitioning(a#34, 200) +- *(3) HashAggregate(keys=[a#34], functions=[partial_max(b#35), partial_min(c#36)], output=[a#34, max#47, min#48]) +- LocalTableScan [a#34, b#35, c#36] {code} I look at the code, ReuseExchange first judged that schame is consistent, and some scenes will do a lot of complicated operations for a certain df and then join, this time it is very likely that the scheme will not match。 I feel that we need to consider solving this situation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26313) move read related methods from Table to read related mix-in traits
[ https://issues.apache.org/jira/browse/SPARK-26313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved SPARK-26313. -- Resolution: Fixed Fixed in https://github.com/apache/spark/pull/23266 > move read related methods from Table to read related mix-in traits > -- > > Key: SPARK-26313 > URL: https://issues.apache.org/jira/browse/SPARK-26313 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26313) move read related methods from Table to read related mix-in traits
[ https://issues.apache.org/jira/browse/SPARK-26313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon updated SPARK-26313: - Fix Version/s: 3.0.0 > move read related methods from Table to read related mix-in traits > -- > > Key: SPARK-26313 > URL: https://issues.apache.org/jira/browse/SPARK-26313 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.0.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26362) Deprecate 'spark.driver.allowMultipleContexts' to disallow multiple Spark contexts
[ https://issues.apache.org/jira/browse/SPARK-26362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon updated SPARK-26362: - Summary: Deprecate 'spark.driver.allowMultipleContexts' to disallow multiple Spark contexts (was: Remove 'spark.driver.allowMultipleContexts' to disallow multiple Spark contexts) > Deprecate 'spark.driver.allowMultipleContexts' to disallow multiple Spark > contexts > -- > > Key: SPARK-26362 > URL: https://issues.apache.org/jira/browse/SPARK-26362 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Hyukjin Kwon >Priority: Major > > Multiple Spark contexts are discouraged and it has been warning from 4 years > ago (see SPARK-4180). > It could cause arbitrary and mysterious error cases. (Honestly, I didn't even > know Spark allows it). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26340) Ensure cores per executor is greater than cpu per task
[ https://issues.apache.org/jira/browse/SPARK-26340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720239#comment-16720239 ] ASF GitHub Bot commented on SPARK-26340: srowen closed pull request #23290: [SPARK-26340][Core] Ensure cores per executor is greater than cpu per task URL: https://github.com/apache/spark/pull/23290 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 21c5cbc04d813..8d135d3e083d7 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -605,6 +605,15 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria } } +if (contains("spark.executor.cores") && contains("spark.task.cpus")) { + val executorCores = getInt("spark.executor.cores", 1) + val taskCpus = getInt("spark.task.cpus", 1) + + if (executorCores < taskCpus) { +throw new SparkException("spark.executor.cores must not be less than spark.task.cpus.") + } +} + val encryptionEnabled = get(NETWORK_ENCRYPTION_ENABLED) || get(SASL_ENCRYPTION_ENABLED) require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED), s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.") diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index df274d949bae3..7cb03deae1391 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -138,6 +138,13 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst assert(sc.appName === "My other app") } + test("creating SparkContext with cpus per tasks bigger than cores per executors") { +val conf = new SparkConf(false) + .set("spark.executor.cores", "1") + .set("spark.task.cpus", "2") +intercept[SparkException] { sc = new SparkContext(conf) } + } + test("nested property names") { // This wasn't supported by some external conf parsing libraries System.setProperty("spark.test.a", "a") This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Ensure cores per executor is greater than cpu per task > -- > > Key: SPARK-26340 > URL: https://issues.apache.org/jira/browse/SPARK-26340 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.2.2, 2.3.2 >Reporter: Nicolas Fraison >Assignee: Nicolas Fraison >Priority: Minor > Fix For: 3.0.0 > > > No check is performed to ensure spark.task.cpus is lower then > spark.executor.cores. Which can lead to jobs not able to assign tasks without > any understandable issues > The check is only performed in the case of dynamic allocation usage in > ExecutorAllocationManager > Adding the check in TaskSchedulerImpl ensure that an issue is thrown to the > driver -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26363) Remove redundant field `executorLogs` in TaskData
[ https://issues.apache.org/jira/browse/SPARK-26363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26363: Assignee: (was: Apache Spark) > Remove redundant field `executorLogs` in TaskData > - > > Key: SPARK-26363 > URL: https://issues.apache.org/jira/browse/SPARK-26363 > Project: Spark > Issue Type: Improvement > Components: Web UI >Affects Versions: 3.0.0 >Reporter: Gengliang Wang >Priority: Major > > In https://github.com/apache/spark/pull/21688, a new filed `executorLogs` is > added to `TaskData` in `api.scala`: > 1. The field should not belong to `TaskData` (from the meaning of wording). > 2. This is redundant with ExecutorSummary. > 3. For each row in the task table, the executor log value is lookup in KV > store every time, which can be avoided for better performance in large scale. > This PR propose to reuse the executor details of request "/allexecutors" , so > that we can have a cleaner api data structure, and redundant KV store queries > are avoided. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26363) Avoid duplicated KV store lookups for task table
[ https://issues.apache.org/jira/browse/SPARK-26363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gengliang Wang updated SPARK-26363: --- Summary: Avoid duplicated KV store lookups for task table (was: Remove redundant field `executorLogs` in TaskData) > Avoid duplicated KV store lookups for task table > - > > Key: SPARK-26363 > URL: https://issues.apache.org/jira/browse/SPARK-26363 > Project: Spark > Issue Type: Improvement > Components: Web UI >Affects Versions: 3.0.0 >Reporter: Gengliang Wang >Priority: Major > > In https://github.com/apache/spark/pull/21688, a new filed `executorLogs` is > added to `TaskData` in `api.scala`: > 1. The field should not belong to `TaskData` (from the meaning of wording). > 2. This is redundant with ExecutorSummary. > 3. For each row in the task table, the executor log value is lookup in KV > store every time, which can be avoided for better performance in large scale. > This PR propose to reuse the executor details of request "/allexecutors" , so > that we can have a cleaner api data structure, and redundant KV store queries > are avoided. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26363) Remove redundant field `executorLogs` in TaskData
Gengliang Wang created SPARK-26363: -- Summary: Remove redundant field `executorLogs` in TaskData Key: SPARK-26363 URL: https://issues.apache.org/jira/browse/SPARK-26363 Project: Spark Issue Type: Improvement Components: Web UI Affects Versions: 3.0.0 Reporter: Gengliang Wang In https://github.com/apache/spark/pull/21688, a new filed `executorLogs` is added to `TaskData` in `api.scala`: 1. The field should not belong to `TaskData` (from the meaning of wording). 2. This is redundant with ExecutorSummary. 3. For each row in the task table, the executor log value is lookup in KV store every time, which can be avoided for better performance in large scale. This PR propose to reuse the executor details of request "/allexecutors" , so that we can have a cleaner api data structure, and redundant KV store queries are avoided. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26363) Remove redundant field `executorLogs` in TaskData
[ https://issues.apache.org/jira/browse/SPARK-26363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26363: Assignee: Apache Spark > Remove redundant field `executorLogs` in TaskData > - > > Key: SPARK-26363 > URL: https://issues.apache.org/jira/browse/SPARK-26363 > Project: Spark > Issue Type: Improvement > Components: Web UI >Affects Versions: 3.0.0 >Reporter: Gengliang Wang >Assignee: Apache Spark >Priority: Major > > In https://github.com/apache/spark/pull/21688, a new filed `executorLogs` is > added to `TaskData` in `api.scala`: > 1. The field should not belong to `TaskData` (from the meaning of wording). > 2. This is redundant with ExecutorSummary. > 3. For each row in the task table, the executor log value is lookup in KV > store every time, which can be avoided for better performance in large scale. > This PR propose to reuse the executor details of request "/allexecutors" , so > that we can have a cleaner api data structure, and redundant KV store queries > are avoided. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26363) Remove redundant field `executorLogs` in TaskData
[ https://issues.apache.org/jira/browse/SPARK-26363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720259#comment-16720259 ] ASF GitHub Bot commented on SPARK-26363: gengliangwang opened a new pull request #23310: [SPARK-26363][WebUI] Remove redundant field `executorLogs` in TaskData URL: https://github.com/apache/spark/pull/23310 ## What changes were proposed in this pull request? In https://github.com/apache/spark/pull/21688, a new filed `executorLogs` is added to `TaskData` in `api.scala`: 1. The field should not belong to `TaskData` (from the meaning of wording). 2. This is redundant with ExecutorSummary. 3. For each row in the task table, the executor log value is lookup in KV store every time, which can be avoided for better performance. ![image](https://user-images.githubusercontent.com/1097932/49946230-841c7680-ff29-11e8-8b83-d8f7553bfe5e.png) This PR propose to reuse the executor details of request "/allexecutors" , so that we can have a cleaner api data structure, and redundant KV store queries are avoided. (Before https://github.com/apache/spark/pull/21688 , stage page used a hash map to avoid duplicated executor logs lookup. But I think reusing the result of "allexecutors" is better.) ## How was this patch tested? Manual check This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove redundant field `executorLogs` in TaskData > - > > Key: SPARK-26363 > URL: https://issues.apache.org/jira/browse/SPARK-26363 > Project: Spark > Issue Type: Improvement > Components: Web UI >Affects Versions: 3.0.0 >Reporter: Gengliang Wang >Priority: Major > > In https://github.com/apache/spark/pull/21688, a new filed `executorLogs` is > added to `TaskData` in `api.scala`: > 1. The field should not belong to `TaskData` (from the meaning of wording). > 2. This is redundant with ExecutorSummary. > 3. For each row in the task table, the executor log value is lookup in KV > store every time, which can be avoided for better performance in large scale. > This PR propose to reuse the executor details of request "/allexecutors" , so > that we can have a cleaner api data structure, and redundant KV store queries > are avoided. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26305) Breakthrough the memory limitation of broadcast join
[ https://issues.apache.org/jira/browse/SPARK-26305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720257#comment-16720257 ] Lantao Jin commented on SPARK-26305: Add a design doc. Not totally completed. > Breakthrough the memory limitation of broadcast join > > > Key: SPARK-26305 > URL: https://issues.apache.org/jira/browse/SPARK-26305 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Lantao Jin >Priority: Major > > If the join between a big table and a small one faces data skewing issue, we > usually use a broadcast hint in SQL to resolve it. However, current broadcast > join has many limitations. The primary restriction is memory. The small table > which is broadcasted must be fulfilled to memory in driver/executors side. > Although it will spill to disk when the memory is insufficient, it still > causes OOM if the small table actually is not absolutely small, it's > relatively small. In our company, we have many real big data SQL analysis > jobs which handle dozens of hundreds terabytes join and shuffle. For example, > the size of large table is 100TB, and the small one is 1 times less, > still 10GB. In this case, broadcast join couldn't be finished since the small > one is still larger than expected. If the join is data skewing, the sortmerge > join always failed. > Hive has a skew join hint which could trigger two-stage task to handle the > skew key and normal key separately. I guess Databricks Runtime has the > similar implementation. However, the skew join hint needs SQL users know the > data in table like their children. They must know which key is skewing in a > join. It's very hard to know since the data is changing day by day and the > join key isn't fixed in different queries. The users have to set a huge > partition number to try their luck. > So, do we have a simple, rude and efficient way to resolve it? Back to the > limitation, if the broadcasted table no needs to fill to memory, in other > words, driver/executor stores the broadcasted table to disk only. The problem > mentioned above could be resolved. > A new hint like BROADCAST_DISK or an additional parameter in original > BROADCAST hint will be introduced to cover this case. The original broadcast > behavior won’t be changed. > I will offer a design doc if you have same feeling about it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-26362) Remove 'spark.driver.allowMultipleContexts' to disallow multiple Spark contexts
Hyukjin Kwon created SPARK-26362: Summary: Remove 'spark.driver.allowMultipleContexts' to disallow multiple Spark contexts Key: SPARK-26362 URL: https://issues.apache.org/jira/browse/SPARK-26362 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 3.0.0 Reporter: Hyukjin Kwon Multiple Spark contexts are discouraged and it has been warning from 4 years ago (see SPARK-4180). It could cause arbitrary and mysterious error cases. (Honestly, I didn't even know Spark allows it). -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26254) Move delegation token providers into a separate project
[ https://issues.apache.org/jira/browse/SPARK-26254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720241#comment-16720241 ] Gabor Somogyi commented on SPARK-26254: --- {quote}There was concern about using ServiceLoader before, but if the interface being loaded is private to Spark, it's fine with me. {quote} org.apache.spark.deploy.security.HadoopDelegationTokenProvider fulfils it. {quote}Keep HDFS and HBase in core {quote} clear and same idea. {quote}move the Kafka one to some Kafka package {quote} you mean module isn't it? * If we move inside core then the kafka deps remain * If we move to kafka-sql then DStreams will not reach it My suggestion is to create a module something like kafka-token-provider and kafka-sql (+ later DStreams) can depend on that. {quote}the Hive one to the Hive module {quote} clear and same idea. For example hive-token-provider which extracts the ugly dependencies from core. > Move delegation token providers into a separate project > --- > > Key: SPARK-26254 > URL: https://issues.apache.org/jira/browse/SPARK-26254 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Gabor Somogyi >Priority: Major > > There was a discussion in > [PR#22598|https://github.com/apache/spark/pull/22598] that there are several > provided dependencies inside core project which shouldn't be there (for ex. > hive and kafka). This jira is to solve this problem. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26360) Avoid extra validateQuery call in createStreamingWriteSupport
[ https://issues.apache.org/jira/browse/SPARK-26360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wu Wenjie updated SPARK-26360: -- Description: When I'm reading structured streaming source code, I find there is a redundant KafkaWriter.validateQuery() function call in createStreamingWriteSupport func in class `KafkaSourceProvider`. {code:scala} // KafkaSourceProvider.scala override def createStreamingWriteSupport( queryId: String, schema: StructType, mode: OutputMode, options: DataSourceOptions): StreamingWriteSupport = { . // validate once here KafkaWriter.validateQuery(schema.toAttributes, producerParams, topic) // validate twice here new KafkaStreamingWriteSupport(topic, producerParams, schema) } // KafkaStreamingWriteSupport.scala class KafkaStreamingWriteSupport( topic: Option[String], producerParams: ju.Map[String, Object], schema: StructType) extends StreamingWriteSupport { validateQuery(schema.toAttributes, producerParams, topic) } {code} I think we just need to remove one of these two. was: When I'm reading structured streaming source code, I find there is a extra KafkaWriter.validateQuery() function call in createStreamingWriteSupport func in class KafkaSourceProvider. {code:scala} // KafkaSourceProvider.scala override def createStreamingWriteSupport( queryId: String, schema: StructType, mode: OutputMode, options: DataSourceOptions): StreamingWriteSupport = { . // validate once here KafkaWriter.validateQuery(schema.toAttributes, producerParams, topic) // validate twice here new KafkaStreamingWriteSupport(topic, producerParams, schema) } // KafkaStreamingWriteSupport.scala class KafkaStreamingWriteSupport( topic: Option[String], producerParams: ju.Map[String, Object], schema: StructType) extends StreamingWriteSupport { validateQuery(schema.toAttributes, producerParams, topic) } {code} I think we just need to remove one of these two. > Avoid extra validateQuery call in createStreamingWriteSupport > - > > Key: SPARK-26360 > URL: https://issues.apache.org/jira/browse/SPARK-26360 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.3.2, 2.4.0 >Reporter: Wu Wenjie >Priority: Trivial > > When I'm reading structured streaming source code, I find there is a > redundant KafkaWriter.validateQuery() function call in > createStreamingWriteSupport func in class `KafkaSourceProvider`. > {code:scala} > // KafkaSourceProvider.scala > override def createStreamingWriteSupport( > queryId: String, > schema: StructType, > mode: OutputMode, > options: DataSourceOptions): StreamingWriteSupport = { >. > // validate once here > KafkaWriter.validateQuery(schema.toAttributes, producerParams, topic) > // validate twice here > new KafkaStreamingWriteSupport(topic, producerParams, schema) > } > // KafkaStreamingWriteSupport.scala > class KafkaStreamingWriteSupport( > topic: Option[String], > producerParams: ju.Map[String, Object], > schema: StructType) > extends StreamingWriteSupport { > validateQuery(schema.toAttributes, producerParams, topic) > > } > {code} > > I think we just need to remove one of these two. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26340) Ensure cores per executor is greater than cpu per task
[ https://issues.apache.org/jira/browse/SPARK-26340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-26340. --- Resolution: Fixed Fix Version/s: 3.0.0 Issue resolved by pull request 23290 [https://github.com/apache/spark/pull/23290] > Ensure cores per executor is greater than cpu per task > -- > > Key: SPARK-26340 > URL: https://issues.apache.org/jira/browse/SPARK-26340 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.2.2, 2.3.2 >Reporter: Nicolas Fraison >Assignee: Nicolas Fraison >Priority: Minor > Fix For: 3.0.0 > > > No check is performed to ensure spark.task.cpus is lower then > spark.executor.cores. Which can lead to jobs not able to assign tasks without > any understandable issues > The check is only performed in the case of dynamic allocation usage in > ExecutorAllocationManager > Adding the check in TaskSchedulerImpl ensure that an issue is thrown to the > driver -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26340) Ensure cores per executor is greater than cpu per task
[ https://issues.apache.org/jira/browse/SPARK-26340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen reassigned SPARK-26340: - Assignee: Nicolas Fraison > Ensure cores per executor is greater than cpu per task > -- > > Key: SPARK-26340 > URL: https://issues.apache.org/jira/browse/SPARK-26340 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.2.2, 2.3.2 >Reporter: Nicolas Fraison >Assignee: Nicolas Fraison >Priority: Minor > Fix For: 3.0.0 > > > No check is performed to ensure spark.task.cpus is lower then > spark.executor.cores. Which can lead to jobs not able to assign tasks without > any understandable issues > The check is only performed in the case of dynamic allocation usage in > ExecutorAllocationManager > Adding the check in TaskSchedulerImpl ensure that an issue is thrown to the > driver -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26360) Avoid extra validateQuery call in createStreamingWriteSupport
[ https://issues.apache.org/jira/browse/SPARK-26360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wu Wenjie updated SPARK-26360: -- Shepherd: Wenchen Fan > Avoid extra validateQuery call in createStreamingWriteSupport > - > > Key: SPARK-26360 > URL: https://issues.apache.org/jira/browse/SPARK-26360 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.3.2, 2.4.0 >Reporter: Wu Wenjie >Priority: Trivial > > When I'm reading structured streaming source code, I find there is a extra > KafkaWriter.validateQuery() function call in createStreamingWriteSupport func > in class > KafkaSourceProvider. > {code:scala} > // KafkaSourceProvider.scala > override def createStreamingWriteSupport( > queryId: String, > schema: StructType, > mode: OutputMode, > options: DataSourceOptions): StreamingWriteSupport = { >. > // validate once here > KafkaWriter.validateQuery(schema.toAttributes, producerParams, topic) > // validate twice here > new KafkaStreamingWriteSupport(topic, producerParams, schema) > } > // KafkaStreamingWriteSupport.scala > class KafkaStreamingWriteSupport( > topic: Option[String], > producerParams: ju.Map[String, Object], > schema: StructType) > extends StreamingWriteSupport { > validateQuery(schema.toAttributes, producerParams, topic) > > } > {code} > > I think we just need to remove one of these two. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26360) Avoid extra validateQuery call in createStreamingWriteSupport
[ https://issues.apache.org/jira/browse/SPARK-26360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wu Wenjie updated SPARK-26360: -- Shepherd: Sean Owen (was: Wenchen Fan) > Avoid extra validateQuery call in createStreamingWriteSupport > - > > Key: SPARK-26360 > URL: https://issues.apache.org/jira/browse/SPARK-26360 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.3.2, 2.4.0 >Reporter: Wu Wenjie >Priority: Trivial > > When I'm reading structured streaming source code, I find there is a extra > KafkaWriter.validateQuery() function call in createStreamingWriteSupport func > in class > KafkaSourceProvider. > {code:scala} > // KafkaSourceProvider.scala > override def createStreamingWriteSupport( > queryId: String, > schema: StructType, > mode: OutputMode, > options: DataSourceOptions): StreamingWriteSupport = { >. > // validate once here > KafkaWriter.validateQuery(schema.toAttributes, producerParams, topic) > // validate twice here > new KafkaStreamingWriteSupport(topic, producerParams, schema) > } > // KafkaStreamingWriteSupport.scala > class KafkaStreamingWriteSupport( > topic: Option[String], > producerParams: ju.Map[String, Object], > schema: StructType) > extends StreamingWriteSupport { > validateQuery(schema.toAttributes, producerParams, topic) > > } > {code} > > I think we just need to remove one of these two. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26360) Avoid extra validateQuery call in createStreamingWriteSupport
[ https://issues.apache.org/jira/browse/SPARK-26360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720200#comment-16720200 ] ASF GitHub Bot commented on SPARK-26360: JasonWayne opened a new pull request #23309: [SPARK-26360]remove extra validateQuery call URL: https://github.com/apache/spark/pull/23309 ## What changes were proposed in this pull request? remove an redundant `KafkaWriter.validateQuery` call in `KafkaSourceProvider ` ## How was this patch tested? Just removing duplicate codes, so I just build and run unit tests. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Avoid extra validateQuery call in createStreamingWriteSupport > - > > Key: SPARK-26360 > URL: https://issues.apache.org/jira/browse/SPARK-26360 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.3.2, 2.4.0 >Reporter: Wu Wenjie >Priority: Trivial > > When I'm reading structured streaming source code, I find there is a extra > KafkaWriter.validateQuery() function call in createStreamingWriteSupport func > in class > KafkaSourceProvider. > {code:scala} > // KafkaSourceProvider.scala > override def createStreamingWriteSupport( > queryId: String, > schema: StructType, > mode: OutputMode, > options: DataSourceOptions): StreamingWriteSupport = { >. > // validate once here > KafkaWriter.validateQuery(schema.toAttributes, producerParams, topic) > // validate twice here > new KafkaStreamingWriteSupport(topic, producerParams, schema) > } > // KafkaStreamingWriteSupport.scala > class KafkaStreamingWriteSupport( > topic: Option[String], > producerParams: ju.Map[String, Object], > schema: StructType) > extends StreamingWriteSupport { > validateQuery(schema.toAttributes, producerParams, topic) > > } > {code} > > I think we just need to remove one of these two. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26359) Spark checkpoint restore fails after query restart
[ https://issues.apache.org/jira/browse/SPARK-26359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kaspar Tint updated SPARK-26359: Environment: Spark 2.4.0 deployed in standalone-client mode Checkpointing is done to S3 The Spark application in question is responsible for running 4 different queries Queries are written using Structured Streaming We are using the following algorithm for hopes of better performance: spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version: "2" # When the default is 1 was: Spark 2.4.0 deployed in standalone-client mode Checkpointing is done to S3 The Spark application in question is responsible for running 4 different queries Queries are written using Structured Streaming > Spark checkpoint restore fails after query restart > -- > > Key: SPARK-26359 > URL: https://issues.apache.org/jira/browse/SPARK-26359 > Project: Spark > Issue Type: Bug > Components: Spark Submit, Structured Streaming >Affects Versions: 2.4.0 > Environment: Spark 2.4.0 deployed in standalone-client mode > Checkpointing is done to S3 > The Spark application in question is responsible for running 4 different > queries > Queries are written using Structured Streaming > We are using the following algorithm for hopes of better performance: > spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version: "2" # When the > default is 1 >Reporter: Kaspar Tint >Priority: Major > Attachments: driver-redacted, metadata, redacted-offsets, > state-redacted, worker-redacted > > > We had an incident where one of our structured streaming queries could not be > restarted after an usual S3 checkpointing failure. Now to clarify before > everything else - we are aware of the issues with S3 and are working towards > moving to HDFS but this will take time. S3 will cause queries to fail quite > often during peak hours and we have separate logic to handle this that will > attempt to restart the failed queries if possible. > In this particular case we could not restart one of the failed queries. Seems > like between detecting a failure in the query and starting it up again > something went really wrong with Spark and state in checkpoint folder got > corrupted for some reason. > The issue starts with the usual *FileNotFoundException* that happens with S3 > {code:java} > 2018-12-10 21:09:25.785 ERROR MicroBatchExecution: Query feedback [id = > c074233a-2563-40fc-8036-b5e38e2e2c42, runId = > e607eb6e-8431-4269-acab-cc2c1f9f09dd] > terminated with error > java.io.FileNotFoundException: No such file or directory: > s3a://some.domain/spark/checkpoints/49/feedback/offsets/.37 > 348.8227943f-a848-4af5-b5bf-1fea81775b24.tmp > at > org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2255) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088) > at > org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2715) > at > org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131) > at > org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726) > at > org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699) > at org.apache.hadoop.fs.FileContext.rename(FileContext.java:965) > at > org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:331) > at > org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.org$apache$spark$sql$execution$streaming$HDFSMetadataLog$$writeBatchToFile(HDFSMetadataL > og.scala:126) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply$mcZ$sp(HDFSMetadataLog.scala:112) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcZ$sp$3.apply$mcV$sp(MicroBatchExecution.scala:382) > at >
[jira] [Assigned] (SPARK-26360) Avoid extra validateQuery call in createStreamingWriteSupport
[ https://issues.apache.org/jira/browse/SPARK-26360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26360: Assignee: Apache Spark > Avoid extra validateQuery call in createStreamingWriteSupport > - > > Key: SPARK-26360 > URL: https://issues.apache.org/jira/browse/SPARK-26360 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.3.2, 2.4.0 >Reporter: Wu Wenjie >Assignee: Apache Spark >Priority: Trivial > > When I'm reading structured streaming source code, I find there is a extra > KafkaWriter.validateQuery() function call in createStreamingWriteSupport func > in class > KafkaSourceProvider. > {code:scala} > // KafkaSourceProvider.scala > override def createStreamingWriteSupport( > queryId: String, > schema: StructType, > mode: OutputMode, > options: DataSourceOptions): StreamingWriteSupport = { >. > // validate once here > KafkaWriter.validateQuery(schema.toAttributes, producerParams, topic) > // validate twice here > new KafkaStreamingWriteSupport(topic, producerParams, schema) > } > // KafkaStreamingWriteSupport.scala > class KafkaStreamingWriteSupport( > topic: Option[String], > producerParams: ju.Map[String, Object], > schema: StructType) > extends StreamingWriteSupport { > validateQuery(schema.toAttributes, producerParams, topic) > > } > {code} > > I think we just need to remove one of these two. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-26360) Avoid extra validateQuery call in createStreamingWriteSupport
[ https://issues.apache.org/jira/browse/SPARK-26360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-26360: Assignee: (was: Apache Spark) > Avoid extra validateQuery call in createStreamingWriteSupport > - > > Key: SPARK-26360 > URL: https://issues.apache.org/jira/browse/SPARK-26360 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.3.2, 2.4.0 >Reporter: Wu Wenjie >Priority: Trivial > > When I'm reading structured streaming source code, I find there is a extra > KafkaWriter.validateQuery() function call in createStreamingWriteSupport func > in class > KafkaSourceProvider. > {code:scala} > // KafkaSourceProvider.scala > override def createStreamingWriteSupport( > queryId: String, > schema: StructType, > mode: OutputMode, > options: DataSourceOptions): StreamingWriteSupport = { >. > // validate once here > KafkaWriter.validateQuery(schema.toAttributes, producerParams, topic) > // validate twice here > new KafkaStreamingWriteSupport(topic, producerParams, schema) > } > // KafkaStreamingWriteSupport.scala > class KafkaStreamingWriteSupport( > topic: Option[String], > producerParams: ju.Map[String, Object], > schema: StructType) > extends StreamingWriteSupport { > validateQuery(schema.toAttributes, producerParams, topic) > > } > {code} > > I think we just need to remove one of these two. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-26335) Add a property for Dataset#show not to care about wide characters when padding them
[ https://issues.apache.org/jira/browse/SPARK-26335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Keiji Yoshida resolved SPARK-26335. --- Resolution: Won't Fix > Add a property for Dataset#show not to care about wide characters when > padding them > --- > > Key: SPARK-26335 > URL: https://issues.apache.org/jira/browse/SPARK-26335 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Keiji Yoshida >Priority: Major > Attachments: Screen Shot 2018-12-11 at 17.53.54.png > > > h2. Issue > https://issues.apache.org/jira/browse/SPARK-25108 makes Dataset#show care > about wide characters when padding them. That is useful for humans to read a > result of Dataset#show. On the other hand, that makes it impossible for > programs to parse a result of Dataset#show because each cell's length can be > different from its header's length. My company develops and manages a > Jupyter/Apache Zeppelin-like visualization tool named "OASIS" > ([https://databricks.com/session/oasis-collaborative-data-analysis-platform-using-apache-spark]). > On this application, a result of Dataset#show on a Scala or Python process > is parsed to visualize it as an HTML table format. (A screenshot of OASIS has > been attached to this ticket as a file named "Screen Shot 2018-12-11 at > 17.53.54.png".) > h2. Solution > Add a property for Dataset#show not to care about wide characters when > padding them. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26335) Add a property for Dataset#show not to care about wide characters when padding them
[ https://issues.apache.org/jira/browse/SPARK-26335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720201#comment-16720201 ] Keiji Yoshida commented on SPARK-26335: --- https://github.com/apache/spark/pull/23307#issuecomment-446978389 > Add a property for Dataset#show not to care about wide characters when > padding them > --- > > Key: SPARK-26335 > URL: https://issues.apache.org/jira/browse/SPARK-26335 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0 >Reporter: Keiji Yoshida >Priority: Major > Attachments: Screen Shot 2018-12-11 at 17.53.54.png > > > h2. Issue > https://issues.apache.org/jira/browse/SPARK-25108 makes Dataset#show care > about wide characters when padding them. That is useful for humans to read a > result of Dataset#show. On the other hand, that makes it impossible for > programs to parse a result of Dataset#show because each cell's length can be > different from its header's length. My company develops and manages a > Jupyter/Apache Zeppelin-like visualization tool named "OASIS" > ([https://databricks.com/session/oasis-collaborative-data-analysis-platform-using-apache-spark]). > On this application, a result of Dataset#show on a Scala or Python process > is parsed to visualize it as an HTML table format. (A screenshot of OASIS has > been attached to this ticket as a file named "Screen Shot 2018-12-11 at > 17.53.54.png".) > h2. Solution > Add a property for Dataset#show not to care about wide characters when > padding them. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26335) Add a property for Dataset#show not to care about wide characters when padding them
[ https://issues.apache.org/jira/browse/SPARK-26335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16720199#comment-16720199 ] ASF GitHub Bot commented on SPARK-26335: kjmrknsn closed pull request #23307: [SPARK-26335][SQL] Add a property for Dataset#show not to care about wide characters when padding them URL: https://github.com/apache/spark/pull/23307 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index b4ea1ee950217..49c721873377b 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2822,6 +2822,19 @@ private[spark] object Utils extends Logging { if (str == null) 0 else str.length + fullWidthRegex.findAllIn(str).size } + /** + * Return a width of a given string. + * + * @param str a string + * @param halfWidth If it is set to true, the number of half widths of a given string will be + * returned. + * Otherwise, the number of characters of a given string will be returned. + * @return a width of a given string + */ + def stringWidth(str: String, halfWidth: Boolean): Int = { +if (str == null) 0 else if (halfWidth) stringHalfWidth(str) else str.length + } + def sanitizeDirName(str: String): String = { str.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase(Locale.ROOT) } diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index b2ff1cce3eb0b..ea6c72d553543 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -1193,6 +1193,44 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { // scalastyle:on nonascii } + test("stringWidth") { +// scalastyle:off nonascii +assert(Utils.stringWidth(null, false) == 0) +assert(Utils.stringWidth(null, true) == 0) +assert(Utils.stringWidth("", false) == 0) +assert(Utils.stringWidth("", true) == 0) +assert(Utils.stringWidth("ab c", false) == 4) +assert(Utils.stringWidth("ab c", true) == 4) +assert(Utils.stringWidth("1098", false) == 4) +assert(Utils.stringWidth("1098", true) == 4) +assert(Utils.stringWidth("mø", false) == 2) +assert(Utils.stringWidth("mø", true) == 2) +assert(Utils.stringWidth("γύρ", false) == 3) +assert(Utils.stringWidth("γύρ", true) == 3) +assert(Utils.stringWidth("pê", false) == 2) +assert(Utils.stringWidth("pê", true) == 2) +assert(Utils.stringWidth("ー", false) == 1) +assert(Utils.stringWidth("ー", true) == 2) +assert(Utils.stringWidth("测", false) == 1) +assert(Utils.stringWidth("测", true) == 2) +assert(Utils.stringWidth("か", false) == 1) +assert(Utils.stringWidth("か", true) == 2) +assert(Utils.stringWidth("걸", false) == 1) +assert(Utils.stringWidth("걸", true) == 2) +assert(Utils.stringWidth("à", false) == 1) +assert(Utils.stringWidth("à", true) == 1) +assert(Utils.stringWidth("焼", false) == 1) +assert(Utils.stringWidth("焼", true) == 2) +assert(Utils.stringWidth("羍む", false) == 2) +assert(Utils.stringWidth("羍む", true) == 4) +assert(Utils.stringWidth("뺭ᾘ", false) == 2) +assert(Utils.stringWidth("뺭ᾘ", true) == 3) +assert(Utils.stringWidth("\u0967\u0968\u0969", false) == 3) +assert(Utils.stringWidth("\u0967\u0968\u0969", true) == 3) +// scalastyle:on nonascii + } + + test("trimExceptCRLF standalone") { val crlfSet = Set("\r", "\n") val nonPrintableButCRLF = (0 to 32).map(_.toChar.toString).toSet -- crlfSet diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 86e068bf632bd..3b4351560c061 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1635,6 +1635,18 @@ object SQLConf { "java.time.* packages are used for the same purpose.") .booleanConf .createWithDefault(false) + + val DATASET_SHOW_HANDLE_FULL_WIDTH_CHARACTERS = +buildConf("spark.sql.dataset.show.handleFullWidthCharacters") + .doc("If it is set to true, a width of a full width character will be calculated as two " + +"half widths. That makes it easy for humans to view a result of " + +"`org.apache.spark.sql.Dataset#show`. On the other hand, that makes it impossible for " + +"programs
[jira] [Updated] (SPARK-26360) Avoid extra validateQuery call in createStreamingWriteSupport
[ https://issues.apache.org/jira/browse/SPARK-26360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wu Wenjie updated SPARK-26360: -- Description: When I'm reading structured streaming source code, I find there is a extra KafkaWriter.validateQuery() function call in createStreamingWriteSupport func in class KafkaSourceProvider. {code:scala} // KafkaSourceProvider.scala override def createStreamingWriteSupport( queryId: String, schema: StructType, mode: OutputMode, options: DataSourceOptions): StreamingWriteSupport = { . // validate once here KafkaWriter.validateQuery(schema.toAttributes, producerParams, topic) // validate twice here new KafkaStreamingWriteSupport(topic, producerParams, schema) } // KafkaStreamingWriteSupport.scala class KafkaStreamingWriteSupport( topic: Option[String], producerParams: ju.Map[String, Object], schema: StructType) extends StreamingWriteSupport { validateQuery(schema.toAttributes, producerParams, topic) } {code} I think we just need to remove one of these two. was: When I'm reading structured streaming source code, I find there is a extra KafkaWriter.validateQuery() function call in createStreamingWriteSupport func in class KafkaSourceProvider. {code:scala} // KafkaSourceProvider.scala override def createStreamingWriteSupport( queryId: String, schema: StructType, mode: OutputMode, options: DataSourceOptions): StreamingWriteSupport = { . // validate once here KafkaWriter.validateQuery(schema.toAttributes, producerParams, topic) // validate twice here new KafkaStreamingWriteSupport(topic, producerParams, schema) } // KafkaStreamingWriteSupport.scala class KafkaStreamingWriteSupport( topic: Option[String], producerParams: ju.Map[String, Object], schema: StructType) extends StreamingWriteSupport { validateQuery(schema.toAttributes, producerParams, topic) } {code} I think we just need to remove it. > Avoid extra validateQuery call in createStreamingWriteSupport > - > > Key: SPARK-26360 > URL: https://issues.apache.org/jira/browse/SPARK-26360 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.3.2, 2.4.0 >Reporter: Wu Wenjie >Priority: Trivial > > When I'm reading structured streaming source code, I find there is a extra > KafkaWriter.validateQuery() function call in createStreamingWriteSupport func > in class > KafkaSourceProvider. > {code:scala} > // KafkaSourceProvider.scala > override def createStreamingWriteSupport( > queryId: String, > schema: StructType, > mode: OutputMode, > options: DataSourceOptions): StreamingWriteSupport = { >. > // validate once here > KafkaWriter.validateQuery(schema.toAttributes, producerParams, topic) > // validate twice here > new KafkaStreamingWriteSupport(topic, producerParams, schema) > } > // KafkaStreamingWriteSupport.scala > class KafkaStreamingWriteSupport( > topic: Option[String], > producerParams: ju.Map[String, Object], > schema: StructType) > extends StreamingWriteSupport { > validateQuery(schema.toAttributes, producerParams, topic) > > } > {code} > > I think we just need to remove one of these two. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org