[ https://issues.apache.org/jira/browse/SPARK-24019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16444202#comment-16444202 ]
Barry Becker edited comment on SPARK-24019 at 4/19/18 3:07 PM: --------------------------------------------------------------- Lowering to minor because I found a way to specify the derivative window function without getting the above error. The main fix was to remove rangeBetween from the window spec. Here is what I now use and it seems to give the result I am looking for without error: {code:java} val window = Window.partitionBy("category").orderBy("sequence_num") // Consider the three sequential series points (Xlag, Ylag), (X, Y), (Xlead, Ylead). // This defines the derivative as (Ylead - Ylag) / (Xlead - Xlag) // If the lead or lag points are null, then we fall back on using the middle point. val yLead = coalesce(lead("value", 1).over(window), col("value")) val yLag = coalesce(lag("value", 1).over(window), col("value")) val xLead = coalesce(lead("sequence_num", 1).over(window), col("sequence_num")) val xLag = coalesce(lag("sequence_num", 1).over(window), col("sequence_num")) val derivative: Column = (yLead - yLag) / (xLead - xLag) val resultDf = simpleDf.withColumn("derivative", derivative) resultDf.show() assertResult(strip("""1, b, 100.0, -30.0 |2, b, 70.0, -20.0 |3, b, 60.0, -10.0 |1, a, 2.1, 0.2999999999999998 |2, a, 2.4, 0.8 |3, a, 3.7, 0.6000000000000001 |4, a, 3.6, -0.10000000000000009""") ) { resultDf.collect().map(row => row.mkString(", ")).mkString("\n") }{code} was (Author: barrybecker4): Lowering to minor because I found a way to specify the deriviative window function without getting the above error. The main fix was to remove rangeBetween from the window spec. Here is what I now use and it seems to give the result I am looking for without error: {code:java} val window = Window.partitionBy("category").orderBy("sequence_num") // Consider the three sequential series points (Xlag, Ylag), (X, Y), (Xlead, Ylead). // This defines the derivative as (Ylead - Ylag) / (Xlead - Xlag) // If the lead or lag points are null, then we fall back on using the middle point. val yLead = coalesce(lead("value", 1).over(window), col("value")) val yLag = coalesce(lag("value", 1).over(window), col("value")) val xLead = coalesce(lead("sequence_num", 1).over(window), col("sequence_num")) val xLag = coalesce(lag("sequence_num", 1).over(window), col("sequence_num")) val derivative: Column = (yLead - yLag) / (xLead - xLag) val resultDf = simpleDf.withColumn("derivative", derivative) resultDf.show() assertResult(strip("""1, b, 100.0, -30.0 |2, b, 70.0, -20.0 |3, b, 60.0, -10.0 |1, a, 2.1, 0.2999999999999998 |2, a, 2.4, 0.8 |3, a, 3.7, 0.6000000000000001 |4, a, 3.6, -0.10000000000000009""") ) { resultDf.collect().map(row => row.mkString(", ")).mkString("\n") }{code} > AnalysisException for Window function expression to compute derivative > ---------------------------------------------------------------------- > > Key: SPARK-24019 > URL: https://issues.apache.org/jira/browse/SPARK-24019 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.1.1 > Environment: Ubuntu, spark 2.1.1, standalone. > Reporter: Barry Becker > Priority: Minor > > I am using spark 2.1.1 currently. > I created an expression to compute the derivative of some series data using a > window function. > I have a simple reproducible case of the error. > I'm only filing this bug because the error message says "Please file a bug > report with this error message, stack trace, and the query." > Here they are: > {code:java} > ((coalesce(lead(value#9, 1, null) windowspecdefinition(category#8, > sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING), > value#9) - coalesce(lag(value#9, 1, null) windowspecdefinition(category#8, > sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING), > value#9)) / cast((coalesce(lead(sequence_num#7, 1, null) > windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN > 1 FOLLOWING AND 1 FOLLOWING), sequence_num#7) - coalesce(lag(sequence_num#7, > 1, null) windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, > ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING), sequence_num#7)) as double)) > windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, RANGE > BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS derivative#14 has multiple > Window Specifications (ArrayBuffer(windowspecdefinition(category#8, > sequence_num#7 ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT > ROW), windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, ROWS > BETWEEN 1 FOLLOWING AND 1 FOLLOWING), windowspecdefinition(category#8, > sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING))). > Please file a bug report with this error message, stack trace, and the query.; > org.apache.spark.sql.AnalysisException: ((coalesce(lead(value#9, 1, null) > windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN > 1 FOLLOWING AND 1 FOLLOWING), value#9) - coalesce(lag(value#9, 1, null) > windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN > 1 PRECEDING AND 1 PRECEDING), value#9)) / cast((coalesce(lead(sequence_num#7, > 1, null) windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, > ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING), sequence_num#7) - > coalesce(lag(sequence_num#7, 1, null) windowspecdefinition(category#8, > sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING), > sequence_num#7)) as double)) windowspecdefinition(category#8, sequence_num#7 > ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS > derivative#14 has multiple Window Specifications > (ArrayBuffer(windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, > RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), > windowspecdefinition(category#8, sequence_num#7 ASC NULLS FIRST, ROWS BETWEEN > 1 FOLLOWING AND 1 FOLLOWING), windowspecdefinition(category#8, sequence_num#7 > ASC NULLS FIRST, ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING))). > Please file a bug report with this error message, stack trace, and the query.; > at > org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39) > at > org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:57) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$78.apply(Analyzer.scala:1772){code} > And here is a simple unit test that can be used to reproduce the problem: > {code:java} > import com.mineset.spark.testsupport.SparkTestCase.SPARK_SESSION > import org.apache.spark.sql.Column > import org.apache.spark.sql.expressions.Window > import org.apache.spark.sql.functions._ > import org.scalatest.FunSuite > import com.mineset.spark.testsupport.SparkTestCase._ > /** > * Test to see that window functions work as expected on spark. > * @author Barry Becker > */ > class WindowFunctionSuite extends FunSuite { > val simpleDf = createSimpleData() > test("Window function for finding derivatives for 2 series") { > val window = > Window.partitionBy("category").orderBy("sequence_num")//.rangeBetween(-1, 1) > // Consider the three sequential series points (Xlag, Ylag), (X, Y), (Xlead, > Ylead). > // This defines the derivative as (Ylead - Ylag) / (Xlead - Xlag) > // If the lead or lag points are null, then we fall back on using the middle > point. > val yLead = coalesce(lead("value", 1).over(window), col("value")) > val yLag = coalesce(lag("value", 1).over(window), col("value")) > val xLead = coalesce(lead("sequence_num", 1).over(window), > col("sequence_num")) > val xLag = coalesce(lag("sequence_num", 1).over(window), col("sequence_num")) > val derivative: Column = (yLead - yLag) / (xLead - xLag) > val resultDf = simpleDf.withColumn("derivative", derivative.over(window)) > resultDf.show() > assertResult("???") { > resultDf.collect().map(row => row.mkString(", ")).mkString("\n") > } > } > def createSimpleData() = { > val data = Seq( > (1, "a", 2.1), > (2, "a", 2.4), > (1, "b", 100.0), > (3, "a", 3.7), > (2, "b", 70.0), > (4, "a", 3.6), > (3, "b", 60.0)) > SPARK_SESSION.sqlContext.createDataFrame(data).toDF("sequence_num", > "category", "value") > } > }{code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org