Hey, Thanks for testing out stream-stream joins and reporting this issue. I am going to take a look at this.
TD On Tue, Feb 20, 2018 at 8:20 PM, kant kodali <kanth...@gmail.com> wrote: > if I change it to the below code it works. However, I don't believe it is > the solution I am looking for. I want to be able to do it in raw SQL and > moreover, If a user gives a big chained raw spark SQL join query I am not > even sure how to make copies of the dataframe to achieve the self-join. Is > there any other way here? > > > > import org.apache.spark.sql.streaming.Trigger > > val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", > "localhost:9092").option("subscribe", "join_test").option("startingOffsets", > "earliest").load(); > val jdf1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", > "localhost:9092").option("subscribe", "join_test").option("startingOffsets", > "earliest").load(); > > jdf.createOrReplaceTempView("table") > jdf1.createOrReplaceTempView("table") > > val resultdf = spark.sql("select * from table inner join table1 on > table.offset=table1.offset") > > resultdf.writeStream.outputMode("append").format("console").option("truncate", > false).trigger(Trigger.ProcessingTime(1000)).start() > > > On Tue, Feb 20, 2018 at 8:16 PM, kant kodali <kanth...@gmail.com> wrote: > >> If I change it to this >> >> >> >> >> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali <kanth...@gmail.com> wrote: >> >>> Hi All, >>> >>> I have the following code >>> >>> import org.apache.spark.sql.streaming.Trigger >>> >>> val jdf = >>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", >>> "localhost:9092").option("subscribe", >>> "join_test").option("startingOffsets", "earliest").load(); >>> >>> jdf.createOrReplaceTempView("table") >>> >>> val resultdf = spark.sql("select * from table as x inner join table as y on >>> x.offset=y.offset") >>> >>> resultdf.writeStream.outputMode("update").format("console").option("truncate", >>> false).trigger(Trigger.ProcessingTime(1000)).start() >>> >>> and I get the following exception. >>> >>> org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given >>> input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, >>> x.timestamp, x.partition]; line 1 pos 50; >>> 'Project [*] >>> +- 'Join Inner, ('x.offset = 'y.offset) >>> :- SubqueryAlias x >>> : +- SubqueryAlias table >>> : +- StreamingRelation >>> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets >>> -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> >>> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, >>> offset#32L, timestamp#33, timestampType#34] >>> +- SubqueryAlias y >>> +- SubqueryAlias table >>> +- StreamingRelation >>> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets >>> -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> >>> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, >>> offset#32L, timestamp#33, timestampType#34] >>> >>> any idea whats wrong here? >>> >>> Thanks! >>> >>> >>> >>> >>> >>> >>> >> >