[ 
https://issues.apache.org/jira/browse/SPARK-23406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362068#comment-16362068
 ] 

Apache Spark commented on SPARK-23406:
--------------------------------------

User 'tdas' has created a pull request for this issue:
https://github.com/apache/spark/pull/20598

> Enable stream-stream self joins 
> --------------------------------
>
>                 Key: SPARK-23406
>                 URL: https://issues.apache.org/jira/browse/SPARK-23406
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 2.3.0
>            Reporter: Tathagata Das
>            Assignee: Tathagata Das
>            Priority: Major
>
> Currently stream-stream self join throws the following error
> {code}
> val df = spark.readStream.format("rate").option("numRowsPerSecond", 
> "1").option("numPartitions", "1").load()
> display(df.withColumn("key", $"value" / 10).join(df.withColumn("key", 
> $"value" / 5), "key"))
> {code}
> error:
> {code}
> Failure when resolving conflicting references in Join:
> 'Join UsingJoin(Inner,List(key))
> :- Project [timestamp#850, value#851L, (cast(value#851L as double) / cast(10 
> as double)) AS key#855]
> : +- StreamingRelation 
> DataSource(org.apache.spark.sql.SparkSession@7f1d2a68,rate,List(),None,List(),None,Map(numPartitions
>  -> 1, numRowsPerSecond -> 1),None), rate, [timestamp#850, value#851L]
> +- Project [timestamp#850, value#851L, (cast(value#851L as double) / cast(5 
> as double)) AS key#860]
>  +- StreamingRelation 
> DataSource(org.apache.spark.sql.SparkSession@7f1d2a68,rate,List(),None,List(),None,Map(numPartitions
>  -> 1, numRowsPerSecond -> 1),None), rate, [timestamp#850, value#851L]
> Conflicting attributes: timestamp#850,value#851L
> ;;
> 'Join UsingJoin(Inner,List(key))
> :- Project [timestamp#850, value#851L, (cast(value#851L as double) / cast(10 
> as double)) AS key#855]
> : +- StreamingRelation 
> DataSource(org.apache.spark.sql.SparkSession@7f1d2a68,rate,List(),None,List(),None,Map(numPartitions
>  -> 1, numRowsPerSecond -> 1),None), rate, [timestamp#850, value#851L]
> +- Project [timestamp#850, value#851L, (cast(value#851L as double) / cast(5 
> as double)) AS key#860]
>  +- StreamingRelation 
> DataSource(org.apache.spark.sql.SparkSession@7f1d2a68,rate,List(),None,List(),None,Map(numPartitions
>  -> 1, numRowsPerSecond -> 1),None), rate, [timestamp#850, value#851L]
> at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39)
>  at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:101)
>  at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:378)
>  at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:98)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:148)
>  at 
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:98)
>  at 
> org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:101)
>  at 
> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:71)
>  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:73)
>  at 
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3063)
>  at org.apache.spark.sql.Dataset.join(Dataset.scala:787)
>  at org.apache.spark.sql.Dataset.join(Dataset.scala:756)
>  at org.apache.spark.sql.Dataset.join(Dataset.scala:731)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to