Ah, looks like you are hitting SPARK-20441 <https://issues.apache.org/jira/browse/SPARK-20441>. Should be fixed in 2.2.
On Fri, Jul 7, 2017 at 2:37 PM, Lalwani, Jayesh < jayesh.lalw...@capitalone.com> wrote: > I created a small sample code to verify this. It looks like union using > Spark SQL doesn’t work. Calling union on dataframe works. > https://gist.github.com/GaalDornick/8920577ca92842f44d7bfd3a277c7545. I’m > on 2.1.0 > > > > I get the following exception. If I change val dfAllEvents = > sparkSession.sql("select * from oldEvents union select * from newEvents") > to val dfAllEvents = dfNewEvents.union(dfOldEvents) it works fine > > > > 17/07/07 17:33:34 ERROR StreamExecution: Query [id = > 3bae26a1-7ee3-45ab-a98d-9346eaf03d08, runId = > 063af01f-9878-452e-aa30-7c21e2ef4c18] > terminated with error > > org.apache.spark.sql.AnalysisException: resolved attribute(s) acctId#29 > missing from > eventType#2,acctId#0,eventId#37L,acctId#36,eventType#38,eventId#1L > in operator !Join Inner, (acctId#0 = acctId#29);; > > Distinct > > +- Union > > :- Project [acctId#0, eventId#1L, eventType#2] > > : +- SubqueryAlias oldevents, `oldEvents` > > : +- Project [acctId#0, eventId#1L, eventType#2] > > : +- !Join Inner, (acctId#0 = acctId#29) > > : :- SubqueryAlias alloldevents, `allOldEvents` > > : : +- Relation[acctId#0,eventId#1L,eventType#2] json > > : +- SubqueryAlias newevents, `newEvents` > > : +- Relation[acctId#36,eventId#37L,eventType#38] json > > +- Project [acctId#29, eventId#30L, eventType#31] > > +- SubqueryAlias newevents, `newEvents` > > +- Relation[acctId#29,eventId#30L,eventType#31] json > > > > at org.apache.spark.sql.catalyst. > analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40) > > at org.apache.spark.sql.catalyst.analysis.Analyzer. > failAnalysis(Analyzer.scala:57) > > at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$ > anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:337) > > at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$ > anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67) > > at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp( > TreeNode.scala:128) > > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ > foreachUp$1.apply(TreeNode.scala:127) > > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ > foreachUp$1.apply(TreeNode.scala:127) > > at scala.collection.immutable.List.foreach(List.scala:381) > > at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp( > TreeNode.scala:127) > > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ > foreachUp$1.apply(TreeNode.scala:127) > > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ > foreachUp$1.apply(TreeNode.scala:127) > > at scala.collection.immutable.List.foreach(List.scala:381) > > at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp( > TreeNode.scala:127) > > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ > foreachUp$1.apply(TreeNode.scala:127) > > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ > foreachUp$1.apply(TreeNode.scala:127) > > at scala.collection.immutable.List.foreach(List.scala:381) > > at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp( > TreeNode.scala:127) > > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ > foreachUp$1.apply(TreeNode.scala:127) > > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ > foreachUp$1.apply(TreeNode.scala:127) > > at scala.collection.immutable.List.foreach(List.scala:381) > > at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp( > TreeNode.scala:127) > > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ > foreachUp$1.apply(TreeNode.scala:127) > > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ > foreachUp$1.apply(TreeNode.scala:127) > > at scala.collection.immutable.List.foreach(List.scala:381) > > at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp( > TreeNode.scala:127) > > at org.apache.spark.sql.catalyst. > analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67) > > at org.apache.spark.sql.catalyst.analysis.Analyzer. > checkAnalysis(Analyzer.scala:57) > > at org.apache.spark.sql.execution.QueryExecution. > assertAnalyzed(QueryExecution.scala:48) > > at org.apache.spark.sql.execution.QueryExecution. > withCachedData$lzycompute(QueryExecution.scala:68) > > at org.apache.spark.sql.execution.QueryExecution. > withCachedData(QueryExecution.scala:67) > > at org.apache.spark.sql.execution.streaming. > IncrementalExecution.optimizedPlan$lzycompute( > IncrementalExecution.scala:60) > > at org.apache.spark.sql.execution.streaming. > IncrementalExecution.optimizedPlan(IncrementalExecution.scala:60) > > at org.apache.spark.sql.execution.QueryExecution. > sparkPlan$lzycompute(QueryExecution.scala:79) > > at org.apache.spark.sql.execution.QueryExecution. > sparkPlan(QueryExecution.scala:75) > > at org.apache.spark.sql.execution.QueryExecution. > executedPlan$lzycompute(QueryExecution.scala:84) > > at org.apache.spark.sql.execution.QueryExecution. > executedPlan(QueryExecution.scala:84) > > at org.apache.spark.sql.execution.streaming. > StreamExecution$$anonfun$org$apache$spark$sql$execution$ > streaming$StreamExecution$$runBatch$3.apply(StreamExecution.scala:496) > > at org.apache.spark.sql.execution.streaming. > StreamExecution$$anonfun$org$apache$spark$sql$execution$ > streaming$StreamExecution$$runBatch$3.apply(StreamExecution.scala:488) > > at org.apache.spark.sql.execution.streaming. > ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262) > > at org.apache.spark.sql.execution.streaming. > StreamExecution.reportTimeTaken(StreamExecution.scala:46) > > at org.apache.spark.sql.execution.streaming. > StreamExecution.org$apache$spark$sql$execution$streaming$ > StreamExecution$$runBatch(StreamExecution.scala:488) > > at org.apache.spark.sql.execution.streaming. > StreamExecution$$anonfun$org$apache$spark$sql$execution$ > streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$ > mcV$sp(StreamExecution.scala:255) > > at org.apache.spark.sql.execution.streaming. > StreamExecution$$anonfun$org$apache$spark$sql$execution$ > streaming$StreamExecution$$runBatches$1$$anonfun$1.apply( > StreamExecution.scala:244) > > at org.apache.spark.sql.execution.streaming. > StreamExecution$$anonfun$org$apache$spark$sql$execution$ > streaming$StreamExecution$$runBatches$1$$anonfun$1.apply( > StreamExecution.scala:244) > > at org.apache.spark.sql.execution.streaming. > ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262) > > at org.apache.spark.sql.execution.streaming. > StreamExecution.reportTimeTaken(StreamExecution.scala:46) > > at org.apache.spark.sql.execution.streaming. > StreamExecution$$anonfun$org$apache$spark$sql$execution$ > streaming$StreamExecution$$runBatches$1.apply$mcZ$sp( > StreamExecution.scala:244) > > at org.apache.spark.sql.execution.streaming. > ProcessingTimeExecutor.execute(TriggerExecutor.scala:43) > > at org.apache.spark.sql.execution.streaming. > StreamExecution.org$apache$spark$sql$execution$streaming$ > StreamExecution$$runBatches(StreamExecution.scala:239) > > at org.apache.spark.sql.execution.streaming. > StreamExecution$$anon$1.run(StreamExecution.scala:177) > > > > > > > > > > *From: *Michael Armbrust <mich...@databricks.com> > *Date: *Friday, July 7, 2017 at 2:30 PM > *To: *"Lalwani, Jayesh" <jayesh.lalw...@capitalone.com> > *Cc: *"user@spark.apache.org" <user@spark.apache.org> > *Subject: *Re: Union of 2 streaming data frames > > > > df.union(df2) should be supported when both DataFrames are created from a > streaming source. What error are you seeing? > > > > On Fri, Jul 7, 2017 at 11:27 AM, Lalwani, Jayesh < > jayesh.lalw...@capitalone.com> wrote: > > In structured streaming, Is there a way to Union 2 streaming data frames? > Are there any plans to support Union of 2 streaming dataframes soon? I can > understand the inherent complexity in joining 2 streaming data frames. But, > Union is just concatenating 2 microbatches, innit? > > > > The problem that we are trying to solve is that we have a Kafka stream > that is receiving events. Each event is assosciated with an account ID. We > have a data store that stores historical events for hundreds of millions > of accounts. What we want to do is for the events coming in the input > stream, we want to add in all the historical events from the data store and > give it to a model. > > > > Initially, the way we were planning to do this is > a) read from Kafka into a streaming dataframe. Call this inputDF. > b) In a mapWithPartition method, get all the unique accounts in the > partition. Look up all the historical events for those unique accounts and > return them. Let’s call this historicalDF > > c) Union inputDF with historicalDF. Call this allDF > > d) Call mapWithPartition on allDF and give the records to the model > > > > Of course, this doesn’t work because both inputDF and historicalDF are > streaming data frames. > > > > What we ended up doing is in step b) we output the input records with the > historical records, which works but seems like a hacky way of doing things. > The operation that does lookup does union too. This works for now because > the data from the data store doesn’t require any transformation or > aggregation. But, if it did, we would like to do that using Spark SQL, > whereas this solution forces us to doing any transformation of historical > data in Scala > > > > Is there a Sparky way of doing this? > > > ------------------------------ > > The information contained in this e-mail is confidential and/or > proprietary to Capital One and/or its affiliates and may only be used > solely in performance of work or services for Capital One. The information > transmitted herewith is intended only for use by the individual or entity > to which it is addressed. If the reader of this message is not the intended > recipient, you are hereby notified that any review, retransmission, > dissemination, distribution, copying or other use of, or taking of any > action in reliance upon this information is strictly prohibited. If you > have received this communication in error, please contact the sender and > delete the material from your computer. > > > > ------------------------------ > > The information contained in this e-mail is confidential and/or > proprietary to Capital One and/or its affiliates and may only be used > solely in performance of work or services for Capital One. The information > transmitted herewith is intended only for use by the individual or entity > to which it is addressed. If the reader of this message is not the intended > recipient, you are hereby notified that any review, retransmission, > dissemination, distribution, copying or other use of, or taking of any > action in reliance upon this information is strictly prohibited. If you > have received this communication in error, please contact the sender and > delete the material from your computer. >