Sorry I meant Spark 2.4 in my previous email On Tue, Mar 6, 2018 at 9:15 PM, kant kodali <kanth...@gmail.com> wrote:
> Hi TD, > > I agree I think we are better off either with a full fix or no fix. I am > ok with the complete fix being available in master or some branch. I guess > the solution for me is to just build from the source. > > On a similar note, I am not finding any JIRA tickets related to full outer > joins and update mode for maybe say Spark 2.3. I wonder how hard is it two > implement both of these? It turns out the update mode and full outer join > is very useful and required in my case, therefore, I'm just asking. > > Thanks! > > On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das <tathagata.das1...@gmail.com > > wrote: > >> I thought about it. >> I am not 100% sure whether this fix should go into 2.3.1. >> >> There are two parts to this bug fix to enable self-joins. >> >> 1. Enabling deduping of leaf logical nodes by extending >> MultInstanceRelation >> - This is safe to be backported into the 2.3 branch as it does not >> touch production code paths. >> >> 2. Fixing attribute rewriting in MicroBatchExecution, when the >> micro-batch plan is spliced into the streaming plan. >> - This touches core production code paths and therefore, may not safe >> to backport. >> >> Part 1 enables self-joins in all but a small fraction of self-join >> queries. That small fraction can produce incorrect results, and part 2 >> avoids that. >> >> So for 2.3.1, we can enable self-joins by merging only part 1, but it can >> give wrong results in some cases. I think that is strictly worse than no >> fix. >> >> TD >> >> >> >> On Thu, Feb 22, 2018 at 2:32 PM, kant kodali <kanth...@gmail.com> wrote: >> >>> Hi TD, >>> >>> I pulled your commit that is listed on this ticket >>> https://issues.apache.org/jira/browse/SPARK-23406 specifically I did >>> the following steps and self joins work after I cherry-pick your commit! >>> Good Job! I was hoping it will be part of 2.3.0 but looks like it is >>> targeted for 2.3.1 :( >>> >>> git clone https://github.com/apache/spark.gitcd spark >>> git fetch >>> git checkout branch-2.3 >>> git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959 >>> export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m" >>> ./build/mvn -DskipTests compile >>> ./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr >>> -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn >>> >>> >>> On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das < >>> tathagata.das1...@gmail.com> wrote: >>> >>>> Hey, >>>> >>>> Thanks for testing out stream-stream joins and reporting this issue. I >>>> am going to take a look at this. >>>> >>>> TD >>>> >>>> >>>> >>>> On Tue, Feb 20, 2018 at 8:20 PM, kant kodali <kanth...@gmail.com> >>>> wrote: >>>> >>>>> if I change it to the below code it works. However, I don't believe it >>>>> is the solution I am looking for. I want to be able to do it in raw SQL >>>>> and >>>>> moreover, If a user gives a big chained raw spark SQL join query I am not >>>>> even sure how to make copies of the dataframe to achieve the self-join. Is >>>>> there any other way here? >>>>> >>>>> >>>>> >>>>> import org.apache.spark.sql.streaming.Trigger >>>>> >>>>> val jdf = >>>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", >>>>> "localhost:9092").option("subscribe", >>>>> "join_test").option("startingOffsets", "earliest").load(); >>>>> val jdf1 = >>>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", >>>>> "localhost:9092").option("subscribe", >>>>> "join_test").option("startingOffsets", "earliest").load(); >>>>> >>>>> jdf.createOrReplaceTempView("table") >>>>> jdf1.createOrReplaceTempView("table") >>>>> >>>>> val resultdf = spark.sql("select * from table inner join table1 on >>>>> table.offset=table1.offset") >>>>> >>>>> resultdf.writeStream.outputMode("append").format("console").option("truncate", >>>>> false).trigger(Trigger.ProcessingTime(1000)).start() >>>>> >>>>> >>>>> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali <kanth...@gmail.com> >>>>> wrote: >>>>> >>>>>> If I change it to this >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali <kanth...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi All, >>>>>>> >>>>>>> I have the following code >>>>>>> >>>>>>> import org.apache.spark.sql.streaming.Trigger >>>>>>> >>>>>>> val jdf = >>>>>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", >>>>>>> "localhost:9092").option("subscribe", >>>>>>> "join_test").option("startingOffsets", "earliest").load(); >>>>>>> >>>>>>> jdf.createOrReplaceTempView("table") >>>>>>> >>>>>>> val resultdf = spark.sql("select * from table as x inner join table as >>>>>>> y on x.offset=y.offset") >>>>>>> >>>>>>> resultdf.writeStream.outputMode("update").format("console").option("truncate", >>>>>>> false).trigger(Trigger.ProcessingTime(1000)).start() >>>>>>> >>>>>>> and I get the following exception. >>>>>>> >>>>>>> org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' >>>>>>> given input columns: [x.value, x.offset, x.key, x.timestampType, >>>>>>> x.topic, x.timestamp, x.partition]; line 1 pos 50; >>>>>>> 'Project [*] >>>>>>> +- 'Join Inner, ('x.offset = 'y.offset) >>>>>>> :- SubqueryAlias x >>>>>>> : +- SubqueryAlias table >>>>>>> : +- StreamingRelation >>>>>>> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets >>>>>>> -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> >>>>>>> localhost:9092),None), kafka, [key#28, value#29, topic#30, >>>>>>> partition#31, offset#32L, timestamp#33, timestampType#34] >>>>>>> +- SubqueryAlias y >>>>>>> +- SubqueryAlias table >>>>>>> +- StreamingRelation >>>>>>> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets >>>>>>> -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> >>>>>>> localhost:9092),None), kafka, [key#28, value#29, topic#30, >>>>>>> partition#31, offset#32L, timestamp#33, timestampType#34] >>>>>>> >>>>>>> any idea whats wrong here? >>>>>>> >>>>>>> Thanks! >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >