Great! Even, val dfAllEvents = sparkSession.table("oldEvents").union(sparkSession.table("newEvents")) doesn’t work. Will this be addressed in 2.2?
From: Michael Armbrust <mich...@databricks.com> Date: Friday, July 7, 2017 at 5:42 PM To: "Lalwani, Jayesh" <jayesh.lalw...@capitalone.com> Cc: "user@spark.apache.org" <user@spark.apache.org>, #MM - Heartbeat <mm-heartb...@capitalone.com> Subject: Re: Union of 2 streaming data frames Ah, looks like you are hitting SPARK-20441<https://issues.apache.org/jira/browse/SPARK-20441>. Should be fixed in 2.2. On Fri, Jul 7, 2017 at 2:37 PM, Lalwani, Jayesh <jayesh.lalw...@capitalone.com<mailto:jayesh.lalw...@capitalone.com>> wrote: I created a small sample code to verify this. It looks like union using Spark SQL doesn’t work. Calling union on dataframe works. https://gist.github.com/GaalDornick/8920577ca92842f44d7bfd3a277c7545. I’m on 2.1.0 I get the following exception. If I change val dfAllEvents = sparkSession.sql("select * from oldEvents union select * from newEvents") to val dfAllEvents = dfNewEvents.union(dfOldEvents) it works fine 17/07/07 17:33:34 ERROR StreamExecution: Query [id = 3bae26a1-7ee3-45ab-a98d-9346eaf03d08, runId = 063af01f-9878-452e-aa30-7c21e2ef4c18] terminated with error org.apache.spark.sql.AnalysisException: resolved attribute(s) acctId#29 missing from eventType#2,acctId#0,eventId#37L,acctId#36,eventType#38,eventId#1L in operator !Join Inner, (acctId#0 = acctId#29);; Distinct +- Union :- Project [acctId#0, eventId#1L, eventType#2] : +- SubqueryAlias oldevents, `oldEvents` : +- Project [acctId#0, eventId#1L, eventType#2] : +- !Join Inner, (acctId#0 = acctId#29) : :- SubqueryAlias alloldevents, `allOldEvents` : : +- Relation[acctId#0,eventId#1L,eventType#2] json : +- SubqueryAlias newevents, `newEvents` : +- Relation[acctId#36,eventId#37L,eventType#38] json +- Project [acctId#29, eventId#30L, eventType#31] +- SubqueryAlias newevents, `newEvents` +- Relation[acctId#29,eventId#30L,eventType#31] json at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:57) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:337) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:57) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:48) at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:68) at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:67) at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:60) at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:60) at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79) at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$3.apply(StreamExecution.scala:496) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$3.apply(StreamExecution.scala:488) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46) at org.apache.spark.sql.execution.streaming.StreamExecution.org<http://org.apache.spark.sql.execution.streaming.StreamExecution.org>$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:488) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:255) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46) at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:244) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43) at org.apache.spark.sql.execution.streaming.StreamExecution.org<http://org.apache.spark.sql.execution.streaming.StreamExecution.org>$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:239) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:177) From: Michael Armbrust <mich...@databricks.com<mailto:mich...@databricks.com>> Date: Friday, July 7, 2017 at 2:30 PM To: "Lalwani, Jayesh" <jayesh.lalw...@capitalone.com<mailto:jayesh.lalw...@capitalone.com>> Cc: "user@spark.apache.org<mailto:user@spark.apache.org>" <user@spark.apache.org<mailto:user@spark.apache.org>> Subject: Re: Union of 2 streaming data frames df.union(df2) should be supported when both DataFrames are created from a streaming source. What error are you seeing? On Fri, Jul 7, 2017 at 11:27 AM, Lalwani, Jayesh <jayesh.lalw...@capitalone.com<mailto:jayesh.lalw...@capitalone.com>> wrote: In structured streaming, Is there a way to Union 2 streaming data frames? Are there any plans to support Union of 2 streaming dataframes soon? I can understand the inherent complexity in joining 2 streaming data frames. But, Union is just concatenating 2 microbatches, innit? The problem that we are trying to solve is that we have a Kafka stream that is receiving events. Each event is assosciated with an account ID. We have a data store that stores historical events for hundreds of millions of accounts. What we want to do is for the events coming in the input stream, we want to add in all the historical events from the data store and give it to a model. Initially, the way we were planning to do this is a) read from Kafka into a streaming dataframe. Call this inputDF. b) In a mapWithPartition method, get all the unique accounts in the partition. Look up all the historical events for those unique accounts and return them. Let’s call this historicalDF c) Union inputDF with historicalDF. Call this allDF d) Call mapWithPartition on allDF and give the records to the model Of course, this doesn’t work because both inputDF and historicalDF are streaming data frames. What we ended up doing is in step b) we output the input records with the historical records, which works but seems like a hacky way of doing things. The operation that does lookup does union too. This works for now because the data from the data store doesn’t require any transformation or aggregation. But, if it did, we would like to do that using Spark SQL, whereas this solution forces us to doing any transformation of historical data in Scala Is there a Sparky way of doing this? ________________________________ The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. ________________________________ The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. ________________________________________________________ The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.