Hi,
I have two input datasets
First input dataset like as below :
year,make,model,comment,blank
> "2012","Tesla","S","No comment",
> 1997,Ford,E350,"Go get one now they are going fast",
> 2015,Chevy,Volt
Second Input dataset :
TagId,condition
> 1997_cars,year = 1997 and model = 'E350'
> 2012_cars,year=2012 and model ='S'
> 2015_cars ,year=2015 and model = 'Volt'
Now my requirement is read first data set and based on the filtering
condition in second dataset need to tag rows of first input dataset by
introducing a new column TagId to first input data set
so the expected should look like :
year,make,model,comment,blank,TagId
> "2012","Tesla","S","No comment",2012_cars
> 1997,Ford,E350,"Go get one now they are going fast",1997_cars
> 2015,Chevy,Volt, ,2015_cars
I tried like :
val sqlContext = new SQLContext(sc)
> val carsSchema = StructType(Seq(
> StructField("year", IntegerType, true),
> StructField("make", StringType, true),
> StructField("model", StringType, true),
> StructField("comment", StringType, true),
> StructField("blank", StringType, true)))
>
> val carTagsSchema = StructType(Seq(
> StructField("TagId", StringType, true),
> StructField("condition", StringType, true)))
>
>
> val dfcars =
> sqlContext.read.format("com.databricks.spark.csv").option("header", "true")
> .schema(carsSchema).load("/TestDivya/Spark/cars.csv")
> val dftags =
> sqlContext.read.format("com.databricks.spark.csv").option("header", "true")
> .schema(carTagsSchema).load("/TestDivya/Spark/CarTags.csv")
>
> val Amendeddf = dfcars.withColumn("TagId", dfcars("blank"))
> val cdtnval = dftags.select("condition")
> val df2=dfcars.filter(cdtnval)
> <console>:35: error: overloaded method value filter with alternatives:
> (conditionExpr: String)org.apache.spark.sql.DataFrame <and>
> (condition:
> org.apache.spark.sql.Column)org.apache.spark.sql.DataFrame
> cannot be applied to (org.apache.spark.sql.DataFrame)
> val df2=dfcars.filter(cdtnval)
another way :
val col = dftags.col("TagId")
> val finaldf = dfcars.withColumn("TagId", col)
> org.apache.spark.sql.AnalysisException: resolved attribute(s) TagId#5
> missing from comment#3,blank#4,model#2,make#1,year#0 in operator !Project
> [year#0,make#1,model#2,comment#3,blank#4,TagId#5 AS TagId#8];
>
> finaldf.write.format("com.databricks.spark.csv").option("header",
> "true").save("/TestDivya/Spark/carswithtags.csv")
Would really appreciate if somebody give me pointers how can I pass the
filter condition(second dataframe) to filter function of first dataframe.
Or another solution .
My apppologies for such a naive question as I am new to scala and Spark
Thanks