Re: Apache Spark - Exception on adding column to Structured Streaming DataFrame

2018-02-05 Thread M Singh
Hi TD:
Just wondering if you have any insight for me or need more info.
Thanks 

On Thursday, February 1, 2018 7:43 AM, M Singh 
 wrote:
 

 Hi TD:
Here is the udpated code with explain and full stack trace.
Please let me know what could be the issue and what to look for in the explain 
output.
Updated code:
import scala.collection.immutableimport org.apache.spark.sql.functions._import 
org.joda.time._import org.apache.spark.sql._import 
org.apache.spark.sql.types._import org.apache.spark.sql.streaming._import 
org.apache.log4j._
object StreamingTest {  def main(args:Array[String]) : Unit = {    val 
sparkBuilder = SparkSession      .builder.      
config("spark.sql.streaming.checkpointLocation", "./checkpointes").      
appName("StreamingTest").master("local[4]")          val spark = 
sparkBuilder.getOrCreate()       val schema = StructType(        Array(         
 StructField("id", StringType, false),           StructField("visit", 
StringType, false)          ))    var dataframeInput = 
spark.readStream.option("sep","\t").schema(schema).csv("./data/")    var 
dataframe2 = dataframeInput.select("*")    dataframe2 = 
dataframe2.withColumn("cts", current_timestamp().cast("long"))    
dataframe2.explain(true)    val query = 
dataframe2.writeStream.option("trucate","false").format("console").start    
query.awaitTermination()  }}
Explain output:
== Parsed Logical Plan ==Project [id#0, visit#1, cast(current_timestamp() as 
bigint) AS cts#6L]+- AnalysisBarrier      +- Project [id#0, visit#1]         +- 
StreamingRelation 
DataSource(org.apache.spark.sql.SparkSession@3c74aa0d,csv,List(),Some(StructType(StructField(id,StringType,false),
 StructField(visit,StringType,false))),List(),None,Map(sep ->  , path -> 
./data/),None), FileSource[./data/], [id#0, visit#1]
== Analyzed Logical Plan ==id: string, visit: string, cts: bigintProject [id#0, 
visit#1, cast(current_timestamp() as bigint) AS cts#6L]+- Project [id#0, 
visit#1]   +- StreamingRelation 
DataSource(org.apache.spark.sql.SparkSession@3c74aa0d,csv,List(),Some(StructType(StructField(id,StringType,false),
 StructField(visit,StringType,false))),List(),None,Map(sep ->  , path -> 
./data/),None), FileSource[./data/], [id#0, visit#1]
== Optimized Logical Plan ==Project [id#0, visit#1, 1517499591 AS cts#6L]+- 
StreamingRelation 
DataSource(org.apache.spark.sql.SparkSession@3c74aa0d,csv,List(),Some(StructType(StructField(id,StringType,false),
 StructField(visit,StringType,false))),List(),None,Map(sep ->  , path -> 
./data/),None), FileSource[./data/], [id#0, visit#1]
== Physical Plan ==*(1) Project [id#0, visit#1, 1517499591 AS cts#6L]+- 
StreamingRelation FileSource[./data/], [id#0, visit#1]
Here is the exception:
18/02/01 07:39:52 ERROR MicroBatchExecution: Query [id = 
a0e573f0-e93b-48d9-989c-1aaa73539b58, runId = 
b5c618cb-30c7-4eff-8f09-ea1d064878ae] terminated with 
errororg.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call 
to dataType on unresolved object, tree: 'cts at 
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105)
 at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
 at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.spark.sql.types.StructType$.fromAttributes(StructType.scala:435) at 
org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:157)
 at org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:157) 
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:448)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:134)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:122)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:122)
 at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
 at 

Re: Apache Spark - Exception on adding column to Structured Streaming DataFrame

2018-02-01 Thread M Singh
Hi TD:
Here is the udpated code with explain and full stack trace.
Please let me know what could be the issue and what to look for in the explain 
output.
Updated code:
import scala.collection.immutableimport org.apache.spark.sql.functions._import 
org.joda.time._import org.apache.spark.sql._import 
org.apache.spark.sql.types._import org.apache.spark.sql.streaming._import 
org.apache.log4j._
object StreamingTest {  def main(args:Array[String]) : Unit = {    val 
sparkBuilder = SparkSession      .builder.      
config("spark.sql.streaming.checkpointLocation", "./checkpointes").      
appName("StreamingTest").master("local[4]")          val spark = 
sparkBuilder.getOrCreate()       val schema = StructType(        Array(         
 StructField("id", StringType, false),           StructField("visit", 
StringType, false)          ))    var dataframeInput = 
spark.readStream.option("sep","\t").schema(schema).csv("./data/")    var 
dataframe2 = dataframeInput.select("*")    dataframe2 = 
dataframe2.withColumn("cts", current_timestamp().cast("long"))    
dataframe2.explain(true)    val query = 
dataframe2.writeStream.option("trucate","false").format("console").start    
query.awaitTermination()  }}
Explain output:
== Parsed Logical Plan ==Project [id#0, visit#1, cast(current_timestamp() as 
bigint) AS cts#6L]+- AnalysisBarrier      +- Project [id#0, visit#1]         +- 
StreamingRelation 
DataSource(org.apache.spark.sql.SparkSession@3c74aa0d,csv,List(),Some(StructType(StructField(id,StringType,false),
 StructField(visit,StringType,false))),List(),None,Map(sep ->  , path -> 
./data/),None), FileSource[./data/], [id#0, visit#1]
== Analyzed Logical Plan ==id: string, visit: string, cts: bigintProject [id#0, 
visit#1, cast(current_timestamp() as bigint) AS cts#6L]+- Project [id#0, 
visit#1]   +- StreamingRelation 
DataSource(org.apache.spark.sql.SparkSession@3c74aa0d,csv,List(),Some(StructType(StructField(id,StringType,false),
 StructField(visit,StringType,false))),List(),None,Map(sep ->  , path -> 
./data/),None), FileSource[./data/], [id#0, visit#1]
== Optimized Logical Plan ==Project [id#0, visit#1, 1517499591 AS cts#6L]+- 
StreamingRelation 
DataSource(org.apache.spark.sql.SparkSession@3c74aa0d,csv,List(),Some(StructType(StructField(id,StringType,false),
 StructField(visit,StringType,false))),List(),None,Map(sep ->  , path -> 
./data/),None), FileSource[./data/], [id#0, visit#1]
== Physical Plan ==*(1) Project [id#0, visit#1, 1517499591 AS cts#6L]+- 
StreamingRelation FileSource[./data/], [id#0, visit#1]
Here is the exception:
18/02/01 07:39:52 ERROR MicroBatchExecution: Query [id = 
a0e573f0-e93b-48d9-989c-1aaa73539b58, runId = 
b5c618cb-30c7-4eff-8f09-ea1d064878ae] terminated with 
errororg.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call 
to dataType on unresolved object, tree: 'cts at 
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105)
 at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
 at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.spark.sql.types.StructType$.fromAttributes(StructType.scala:435) at 
org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:157)
 at org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:157) 
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:448)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:134)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:122)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:122)
 at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:122)
 at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
 at 

Re: Apache Spark - Exception on adding column to Structured Streaming DataFrame

2018-01-31 Thread Tathagata Das
Could you give the full stack trace of the exception?

Also, can you do `dataframe2.explain(true)` and show us the plan output?



On Wed, Jan 31, 2018 at 3:35 PM, M Singh 
wrote:

> Hi Folks:
>
> I have to add a column to a structured *streaming* dataframe but when I
> do that (using select or withColumn) I get an exception.  I can add a
> column in structured *non-streaming* structured dataframe. I could not
> find any documentation on how to do this in the following doc  [
> https://spark.apache.org/docs/latest/
> *structured-streaming-programming-guide*.html]
>
> I am using spark 2.4.0-SNAPSHOT
>
> Please let me know what I could be missing.
>
> Thanks for your help.
>
> (I am also attaching the source code for the structured streaming,
> structured non-streaming classes and input file with this email)
>
> 
> org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call
> to dataType on unresolved object, tree: 'cts
> at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(
> unresolved.scala:105)
> at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(
> StructType.scala:435)
> 
>
> Here is the input file (in the ./data directory) - note tokens are
> separated by '\t'
>
> 1 v1
> 2 v1
> 2 v2
> 3 v3
> 3 v1
>
> Here is the code with dataframe (*non-streaming*) which works:
>
> import scala.collection.immutable
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql._
> import org.apache.spark.sql.types._
>
> object StructuredTest {
>   def main(args:Array[String]) : Unit = {
> val sparkBuilder = SparkSession
>   .builder.
>   appName("StreamingTest").master("local[4]")
>
> val spark = sparkBuilder.getOrCreate()
>
> val schema = StructType(
> Array(
>   StructField("id", StringType, false),
>   StructField("visit", StringType, false)
>   ))
> var dataframe = spark.read.option("sep","\t").schema(schema).csv(
> "./data/")
> var dataframe2 = dataframe.select(expr("*"), current_timestamp().as(
> "cts"))
> dataframe2.show(false)
> spark.stop()
>
>   }
> }
>
> Output of the above code is:
>
> +---+-+---+
> |id |visit|cts|
> +---+-+---+
> |1  |v1   |2018-01-31 15:07:00.758|
> |2  |v1   |2018-01-31 15:07:00.758|
> |2  |v2   |2018-01-31 15:07:00.758|
> |3  |v3   |2018-01-31 15:07:00.758|
> |3  |v1   |2018-01-31 15:07:00.758|
> +---+-+---+
>
>
> Here is the code with *structured streaming* which throws the exception:
>
> import scala.collection.immutable
> import org.apache.spark.sql.functions._
> import org.joda.time._
> import org.apache.spark.sql._
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.streaming._
> import org.apache.log4j._
>
> object StreamingTest {
>   def main(args:Array[String]) : Unit = {
> val sparkBuilder = SparkSession
>   .builder.
>   config("spark.sql.streaming.checkpointLocation", "./checkpointes").
>   appName("StreamingTest").master("local[4]")
>
> val spark = sparkBuilder.getOrCreate()
>
> val schema = StructType(
> Array(
>   StructField("id", StringType, false),
>   StructField("visit", StringType, false)
>   ))
> var dataframeInput = spark.readStream.option("sep","\t"
> ).schema(schema).csv("./data/")
> var dataframe2 = dataframeInput.select("*")
> dataframe2 = dataframe2.withColumn("cts", current_timestamp())
> val query = dataframe2.writeStream.option("trucate","false").format("
> console").start
> query.awaitTermination()
>   }
> }
>
> Here is the exception:
>
> 18/01/31 15:10:25 ERROR MicroBatchExecution: Query [id =
> 0fe655de-9096-4d69-b6a5-c593400d2eba, runId = 
> 2394a402-dd52-49b4-854e-cb46684bf4d8]
> terminated with error
> *org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call
> to dataType on unresolved object, tree: 'cts*
> at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(
> unresolved.scala:105)
> at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(
> StructType.scala:435)
> at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(
> StructType.scala:435)
>
> I've also used snippets (shown in bold below) from (
> https://docs.databricks.com/spark/latest/structured-
> streaming/examples.html)
> but still get the same exception:
>
> Here is the code:
>
> import scala.collection.immutable
> import org.apache.spark.sql.functions._
> import org.joda.time._
> import org.apache.spark.sql._
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.streaming._
> import org.apache.log4j._
>
> object StreamingTest {
>   def main(args:Array[String]) : Unit = {
> val sparkBuilder = SparkSession
>   .builder.
>   config("spark.sql.streaming.checkpointLocation", "./checkpointes").
>   appName("StreamingTest").master("local[4]")
>
> val 

Apache Spark - Exception on adding column to Structured Streaming DataFrame

2018-01-31 Thread M Singh
Hi Folks:
I have to add a column to a structured streaming dataframe but when I do that 
(using select or withColumn) I get an exception.  I can add a column in 
structured non-streaming structured dataframe. I could not find any 
documentation on how to do this in the following doc  
[https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html]
I am using spark 2.4.0-SNAPSHOT
Please let me know what I could be missing.

Thanks for your help.
(I am also attaching the source code for the structured streaming, structured 
non-streaming classes and input file with this email)
org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid 
call to dataType on unresolved object, tree: 'cts at 
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105)
 at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
Here is the input file (in the ./data directory) - note tokens are separated by 
'\t'
1 v12 v12 v23 v33 v1
Here is the code with dataframe (non-streaming) which works:
import scala.collection.immutableimport org.apache.spark.sql.functions._import 
org.apache.spark.sql._import org.apache.spark.sql.types._
object StructuredTest {  def main(args:Array[String]) : Unit = {    val 
sparkBuilder = SparkSession      .builder.      
appName("StreamingTest").master("local[4]")          val spark = 
sparkBuilder.getOrCreate()       val schema = StructType(        Array(         
 StructField("id", StringType, false),           StructField("visit", 
StringType, false)          ))    var dataframe = 
spark.read.option("sep","\t").schema(schema).csv("./data/")    var dataframe2 = 
dataframe.select(expr("*"), current_timestamp().as("cts"))    
dataframe2.show(false)    spark.stop()      }}
Output of the above code is:
+---+-+---+|id |visit|cts                    
|+---+-+---+|1  |v1   |2018-01-31 15:07:00.758||2  |v1  
 |2018-01-31 15:07:00.758||2  |v2   |2018-01-31 15:07:00.758||3  |v3   
|2018-01-31 15:07:00.758||3  |v1   |2018-01-31 
15:07:00.758|+---+-+---+

Here is the code with structured streaming which throws the exception:
import scala.collection.immutableimport org.apache.spark.sql.functions._import 
org.joda.time._import org.apache.spark.sql._import 
org.apache.spark.sql.types._import org.apache.spark.sql.streaming._import 
org.apache.log4j._
object StreamingTest {  def main(args:Array[String]) : Unit = {    val 
sparkBuilder = SparkSession      .builder.      
config("spark.sql.streaming.checkpointLocation", "./checkpointes").      
appName("StreamingTest").master("local[4]")          val spark = 
sparkBuilder.getOrCreate()       val schema = StructType(        Array(         
 StructField("id", StringType, false),           StructField("visit", 
StringType, false)          ))    var dataframeInput = 
spark.readStream.option("sep","\t").schema(schema).csv("./data/")    var 
dataframe2 = dataframeInput.select("*")    dataframe2 = 
dataframe2.withColumn("cts", current_timestamp())    val query = 
dataframe2.writeStream.option("trucate","false").format("console").start    
query.awaitTermination()  }}
Here is the exception:
18/01/31 15:10:25 ERROR MicroBatchExecution: Query [id = 
0fe655de-9096-4d69-b6a5-c593400d2eba, runId = 
2394a402-dd52-49b4-854e-cb46684bf4d8] terminated with 
errororg.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call 
to dataType on unresolved object, tree: 'cts at 
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105)
 at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
 at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
I've also used snippets (shown in bold below) from 
(https://docs.databricks.com/spark/latest/structured-streaming/examples.html)but
 still get the same exception:
Here is the code:
import scala.collection.immutableimport org.apache.spark.sql.functions._import 
org.joda.time._import org.apache.spark.sql._import 
org.apache.spark.sql.types._import org.apache.spark.sql.streaming._import 
org.apache.log4j._
object StreamingTest {  def main(args:Array[String]) : Unit = {    val 
sparkBuilder = SparkSession      .builder.      
config("spark.sql.streaming.checkpointLocation", "./checkpointes").      
appName("StreamingTest").master("local[4]")          val spark = 
sparkBuilder.getOrCreate()       val schema = StructType(        Array(         
 StructField("id", StringType, false),           StructField("visit", 
StringType, false)          ))    var dataframeInput = 
spark.readStream.option("sep","\t").schema(schema).csv("./data/")    var 
dataframe2 = dataframeInput.select(      
current_timestamp().cast("timestamp").alias("timestamp"),      expr("*"))    
val query = 
dataframe2.writeStream.option("trucate","false").format("console").start
    query.awaitTermination()