RE: pass one dataframe column value to another dataframe filter expression + Spark 1.5 + scala

2016-02-05 Thread Lohith Samaga M
Hi,
If you can also format the condition file as a csv file similar 
to the main file, then you can join the two dataframes and select only required 
columns.

Best regards / Mit freundlichen Grüßen / Sincères salutations
M. Lohith Samaga

From: Divya Gehlot [mailto:divya.htco...@gmail.com]
Sent: Friday, February 05, 2016 13.12
To: user @spark
Subject: pass one dataframe column value to another dataframe filter expression 
+ Spark 1.5 + scala

Hi,
I have two input datasets
First input dataset like as below :

year,make,model,comment,blank
"2012","Tesla","S","No comment",
1997,Ford,E350,"Go get one now they are going fast",
2015,Chevy,Volt

Second Input dataset :

TagId,condition
1997_cars,year = 1997 and model = 'E350'
2012_cars,year=2012 and model ='S'
2015_cars ,year=2015 and model = 'Volt'

Now my requirement is read first data set and based on the filtering condition 
in second dataset need to tag rows of first input dataset by introducing a new 
column TagId to first input data set
so the expected should look like :

year,make,model,comment,blank,TagId
"2012","Tesla","S","No comment",2012_cars
1997,Ford,E350,"Go get one now they are going fast",1997_cars
2015,Chevy,Volt, ,2015_cars

I tried like :

val sqlContext = new SQLContext(sc)
val carsSchema = StructType(Seq(
StructField("year", IntegerType, true),
StructField("make", StringType, true),
StructField("model", StringType, true),
StructField("comment", StringType, true),
StructField("blank", StringType, true)))

val carTagsSchema = StructType(Seq(
StructField("TagId", StringType, true),
StructField("condition", StringType, true)))


val dfcars = 
sqlContext.read.format("com.databricks.spark.csv").option("header", "true") 
.schema(carsSchema).load("/TestDivya/Spark/cars.csv")
val dftags = 
sqlContext.read.format("com.databricks.spark.csv").option("header", "true") 
.schema(carTagsSchema).load("/TestDivya/Spark/CarTags.csv")

val Amendeddf = dfcars.withColumn("TagId", dfcars("blank"))
val cdtnval = dftags.select("condition")
val df2=dfcars.filter(cdtnval)
:35: error: overloaded method value filter with alternatives:
  (conditionExpr: String)org.apache.spark.sql.DataFrame 
  (condition: org.apache.spark.sql.Column)org.apache.spark.sql.DataFrame
 cannot be applied to (org.apache.spark.sql.DataFrame)
   val df2=dfcars.filter(cdtnval)

another way :

val col = dftags.col("TagId")
val finaldf = dfcars.withColumn("TagId", col)
org.apache.spark.sql.AnalysisException: resolved attribute(s) TagId#5 
missing from comment#3,blank#4,model#2,make#1,year#0 in operator !Project 
[year#0,make#1,model#2,comment#3,blank#4,TagId#5 AS TagId#8];

finaldf.write.format("com.databricks.spark.csv").option("header", 
"true").save("/TestDivya/Spark/carswithtags.csv")


Would really appreciate if somebody give me pointers how can I pass the filter 
condition(second dataframe) to filter function of first dataframe.
Or another solution .
My apppologies for such a naive question as I am new to scala and Spark

Thanks
Information transmitted by this e-mail is proprietary to Mphasis, its 
associated companies and/ or its customers and is intended 
for use only by the individual or entity to which it is addressed, and may 
contain information that is privileged, confidential or 
exempt from disclosure under applicable law. If you are not the intended 
recipient or it appears that this mail has been forwarded 
to you without proper authority, you are notified that any use or dissemination 
of this information in any manner is strictly 
prohibited. In such cases, please notify us immediately at 
mailmas...@mphasis.com and delete this mail from your records.


Re: pass one dataframe column value to another dataframe filter expression + Spark 1.5 + scala

2016-02-05 Thread Ali Tajeldin
I think the tricky part here is that the join condition is encoded in the 
second data frame and not a direct value.

Assuming the second data frame (the tags) is small enough, you can collect it 
(read it into memory) and then construct a "when" expression chain for each of 
the possible tags , then all you need to do is select on data frame 1 (df1) (or 
use "withColumn" to add the column) and provide the constructed "when" chain as 
the new column value.
From a very high level, looking at your example data below, you would construct 
the following expression from df2 (off course, in the real case, you would 
construct the expression programmatically from the collected df2 data and not 
hardcoded).

val e = when(expr("year = 1997 and model = 'E350'"), "1997_cars").
...
   when(expr("year = 2015 and model = 'Volt'"), "2015_cars").
   otherwise("unknown")

then you just need to add the new column to your input as:
df1.withColumn("tag", e)

I caveat this by saying that I have not tried the above (especially using 
"expr" to evaluate partial sql expressions, but should work according to doc).
Sometimes, half the battle is just finding the right API.  the 
"when"/"otherwise" is documented under the "Column" class and 
"withColumn"/"collect" are documented under "DataFrame".

--
Ali

 
On Feb 5, 2016, at 1:56 AM, Lohith Samaga M  wrote:

> Hi,
> If you can also format the condition file as a csv file 
> similar to the main file, then you can join the two dataframes and select 
> only required columns.
>  
> Best regards / Mit freundlichen Grüßen / Sincères salutations
> M. Lohith Samaga
>  
> From: Divya Gehlot [mailto:divya.htco...@gmail.com] 
> Sent: Friday, February 05, 2016 13.12
> To: user @spark
> Subject: pass one dataframe column value to another dataframe filter 
> expression + Spark 1.5 + scala
>  
> Hi,
> I have two input datasets
> First input dataset like as below : 
>  
> year,make,model,comment,blank
> "2012","Tesla","S","No comment",
> 1997,Ford,E350,"Go get one now they are going fast",
> 2015,Chevy,Volt
>  
> Second Input dataset :
>  
> TagId,condition
> 1997_cars,year = 1997 and model = 'E350'
> 2012_cars,year=2012 and model ='S'
> 2015_cars ,year=2015 and model = 'Volt'
>  
> Now my requirement is read first data set and based on the filtering 
> condition in second dataset need to tag rows of first input dataset by 
> introducing a new column TagId to first input data set 
> so the expected should look like :
>  
> year,make,model,comment,blank,TagId
> "2012","Tesla","S","No comment",2012_cars
> 1997,Ford,E350,"Go get one now they are going fast",1997_cars
> 2015,Chevy,Volt, ,2015_cars
>  
> I tried like :
>  
> val sqlContext = new SQLContext(sc)
> val carsSchema = StructType(Seq(
> StructField("year", IntegerType, true),
> StructField("make", StringType, true),
> StructField("model", StringType, true),
> StructField("comment", StringType, true),
> StructField("blank", StringType, true)))
> 
> val carTagsSchema = StructType(Seq(
> StructField("TagId", StringType, true),
> StructField("condition", StringType, true)))
> 
> 
> val dfcars = 
> sqlContext.read.format("com.databricks.spark.csv").option("header", "true") 
> .schema(carsSchema).load("/TestDivya/Spark/cars.csv")
> val dftags = 
> sqlContext.read.format("com.databricks.spark.csv").option("header", "true") 
> .schema(carTagsSchema).load("/TestDivya/Spark/CarTags.csv")
> 
> val Amendeddf = dfcars.withColumn("TagId", dfcars("blank"))
> val cdtnval = dftags.select("condition")
> val df2=dfcars.filter(cdtnval)
> :35: error: overloaded method value filter with alternatives:
>   (conditionExpr: String)org.apache.spark.sql.DataFrame 
>   (condition: org.apache.spark.sql.Column)org.apache.spark.sql.DataFrame
>  cannot be applied to (org.apache.spark.sql.DataFrame)
>val df2=dfcars.filter(cdtnval)
>  
> another way :
>  
> val col = dftags.col("TagId")
> val finaldf = dfcars.withColumn("TagId", col)
> org.apache.spark.sql.AnalysisException: resolved attribute(s) TagId#5 
> missing from comment#3,blank#4,model#2,make#1,year#0 in operator !Project 
> [year#0,make#1,model#2,comment#3,blank#4,TagId#5 AS TagId#8];
> 
> finaldf.write.format("com.databricks.spark.csv").option("header", 
> "true").save("/TestDivya/Spark/carswithtags.csv")
>  
>  
> Would really appreciate if somebody give me pointers how can I pass the 
> filter condition(second dataframe) to filter function of first dataframe.
> Or another solution .
> My apppologies for such a naive question as I am new to scala and Spark 
>  
> Thanks  
> 
> Information transmitted by this e-mail is proprietary to Mphasis, its 
> associated companies and/ or its customers and is intended 
> for use only by the individual