Re: Spark SQL DataFrame: Nullable column and filtering
Dear all, after some fiddling I have arrived at this solution: /** * Customized left outer join on common column. */ def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: DataFrame, commonColumnName: String): DataFrame = { val joinedDF = leftDF.as('left).join(rightDF.as('right), leftDF(commonColumnName) === rightDF(commonColumnName), leftouter) import joinedDF.sqlContext.implicits._ val leftColumns = leftDF.columns .map((cn: String) = $left.$cn) val rightColumns = rightDF.columns.filterNot(cn = cn.equals(commonColumnName)) .map((cn: String) = $right.$cn) joinedDF.select( leftColumns ++ rightColumns: _*) } Comments welcome Alternatives I tried: - Not Working: If at least the right alias for rightDF is present, one could try joinedDF.drop(right. + columnname) but his does not work (no column is dropped). Unfortunately, drop does not support arguments of type Column / ColumnNames. *@Michael: Should I create a feature request in Jira for drop supporting Columns?* - Working: Without using aliases via as(...), but using column renaming instead: rightDF.withColumnRenamed( communColumnName, right_ + commoncolumnName) to rename the right dataframe column and then do the join criterion as leftDF(commonColumnName) === rightDF(right_ + commonColumnName) In my opinion not so neat. Opinions? Things I observed: - Column handling does not seem consistent - select() supports alias, while drop( ... ) only supports strings. - DataFrame.apply( ) and DataFrame.col do also not support alias. - Thus the only way to handly ambiguous columnNames is via select at the moment. Can someone please confirm this! - Alias information is not displayed via DataFrame.printSchema. (or at least I did not find a way of how to) Cheers, Martin 2015-07-31 22:51 GMT+02:00 Martin Senne martin.se...@googlemail.com: Dear Michael, dear all, a minimal example is listed below. After some further analysis I could figure out, that the problem is related to the *leftOuterJoinWithRemovalOfEqualColumn*-Method, as I use columns of the left and right dataframes when doing the select on the joined table. /** * Customized left outer join on common column. */ def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: DataFrame, commonColumnName: String): DataFrame = { val leftColumns = leftDF.columns.map((cn: String) = leftDF(cn)) val rightColumns = rightDF.columns.filterNot(cn = cn.equals(commonColumnName)).map(cn = rightDF(cn)) leftDF.join(rightDF, leftDF(commonColumnName) === rightDF(commonColumnName), leftouter) .select(leftColumns ++ rightColumns: _*) } As the column y of the right table has nullable=false, this is then also transferred to the joined-Table y-Column, as I use rightDF( y ). Thus, I need to use columns of the joined table for the select. *Question now: The joined table has column names x, a, x, y. How do I discard the second x column?* All my approaches failed (assuming here, that joinedDF is the joined DataFrame. - Using joinedDFdrop( x ) discards both x columns. - Using joinedDF(x) does not work as it is ambigious - Also using rightDF.as( aliasname) in order to differentiate the column x (from left DataFrame) with x (from right DataFrame) did not work out, as I found no way as use select( $aliasname.x) really programmatically. Could someone sketch the code? Any help welcome, thanks Martin import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.sql.{DataFrame, SQLContext} object OtherEntities { case class Record( x:Int, a: String) case class Mapping( x: Int, y: Int ) val records = Seq( Record(1, hello), Record(2, bob)) val mappings = Seq( Mapping(2, 5) ) } object MinimalShowcase { /** * Customized left outer join on common column. */ def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: DataFrame, commonColumnName: String): DataFrame = { val leftColumns = leftDF.columns.map((cn: String) = leftDF(cn)) val rightColumns = rightDF.columns.filterNot(cn = cn.equals(commonColumnName)).map(cn = rightDF(cn)) leftDF.join(rightDF, leftDF(commonColumnName) === rightDF(commonColumnName), leftouter) .select(leftColumns ++ rightColumns: _*) } /** * Set, if a column is nullable. * @param df source DataFrame * @param cn is the column name to change * @param nullable is the flag to set, such that the column is either nullable or not */ def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = { val schema = df.schema val newSchema = StructType(schema.map { case StructField( c, t, _, m) if c.equals(cn) = StructField( c,
Re: Spark SQL DataFrame: Nullable column and filtering
Dear Michael, dear all, a minimal example is listed below. After some further analysis I could figure out, that the problem is related to the *leftOuterJoinWithRemovalOfEqualColumn*-Method, as I use columns of the left and right dataframes when doing the select on the joined table. /** * Customized left outer join on common column. */ def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: DataFrame, commonColumnName: String): DataFrame = { val leftColumns = leftDF.columns.map((cn: String) = leftDF(cn)) val rightColumns = rightDF.columns.filterNot(cn = cn.equals(commonColumnName)).map(cn = rightDF(cn)) leftDF.join(rightDF, leftDF(commonColumnName) === rightDF(commonColumnName), leftouter) .select(leftColumns ++ rightColumns: _*) } As the column y of the right table has nullable=false, this is then also transferred to the joined-Table y-Column, as I use rightDF( y ). Thus, I need to use columns of the joined table for the select. *Question now: The joined table has column names x, a, x, y. How do I discard the second x column?* All my approaches failed (assuming here, that joinedDF is the joined DataFrame. - Using joinedDFdrop( x ) discards both x columns. - Using joinedDF(x) does not work as it is ambigious - Also using rightDF.as( aliasname) in order to differentiate the column x (from left DataFrame) with x (from right DataFrame) did not work out, as I found no way as use select( $aliasname.x) really programmatically. Could someone sketch the code? Any help welcome, thanks Martin import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.sql.{DataFrame, SQLContext} object OtherEntities { case class Record( x:Int, a: String) case class Mapping( x: Int, y: Int ) val records = Seq( Record(1, hello), Record(2, bob)) val mappings = Seq( Mapping(2, 5) ) } object MinimalShowcase { /** * Customized left outer join on common column. */ def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: DataFrame, commonColumnName: String): DataFrame = { val leftColumns = leftDF.columns.map((cn: String) = leftDF(cn)) val rightColumns = rightDF.columns.filterNot(cn = cn.equals(commonColumnName)).map(cn = rightDF(cn)) leftDF.join(rightDF, leftDF(commonColumnName) === rightDF(commonColumnName), leftouter) .select(leftColumns ++ rightColumns: _*) } /** * Set, if a column is nullable. * @param df source DataFrame * @param cn is the column name to change * @param nullable is the flag to set, such that the column is either nullable or not */ def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = { val schema = df.schema val newSchema = StructType(schema.map { case StructField( c, t, _, m) if c.equals(cn) = StructField( c, t, nullable = nullable, m) case y: StructField = y }) df.sqlContext.createDataFrame( df.rdd, newSchema) } def main (args: Array[String]) { val conf = new SparkConf() .setAppName(Minimal) .setMaster(local[*]) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) // used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ val recordDF = sc.parallelize(OtherEntities.records, 4).toDF() val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF() val mappingWithNullDF = setNullableStateOfColumn(mappingDF, y, true) val joinedDF = recordDF.join(mappingDF, recordDF(x) === mappingDF(x), leftouter) println(joinedDF:) joinedDF.show joinedDF.printSchema joinedDF.filter(joinedDF(y).isNotNull).show //joinedDF: //+-+-+++ //|x|a| x| y| //+-+-+++ //|1|hello|null|null| //|2| bob| 2| 5| //+-+-+++ // //root //|-- x: integer (nullable = false) //|-- a: string (nullable = true) //|-- x: integer (nullable = true) //|-- y: integer (nullable = true) // //+-+---+-+-+ //|x| a|x|y| //+-+---+-+-+ //|2|bob|2|5| //+-+---+-+-+ val extrajoinedDF = leftOuterJoinWithRemovalOfEqualColumn(recordDF, mappingDF, x) println(extrajoinedDF:) extrajoinedDF.show extrajoinedDF.printSchema extrajoinedDF.filter(extrajoinedDF(y).isNotNull).show //extrajoinedDF: //+-+-++ //|x|a| y| //+-+-++ //|1|hello|null| //|2| bob| 5| //+-+-++ // //root //|-- x: integer (nullable = false) //|-- a: string (nullable = true) //|-- y: integer (nullable = false) // //+-+-++ //|x|a| y| //+-+-++ //|1|hello|null| //|2| bob| 5| //+-+-++ val joined2DF = recordDF.join(mappingWithNullDF, recordDF(x) === mappingWithNullDF(x), leftouter) println(joined2DF:)
Re: Spark SQL DataFrame: Nullable column and filtering
Perhaps I'm missing what you are trying to accomplish, but if you'd like to avoid the null values do an inner join instead of an outer join. Additionally, I'm confused about how the result of joinedDF.filter(joinedDF( y).isNotNull).show still contains null values in the column y. This doesn't really have anything to do with nullable, which is only a hint to the system so that we can avoid null checking when we know that there are no null values. If you provide the full code i can try and see if this is a bug. On Thu, Jul 30, 2015 at 11:53 AM, Martin Senne martin.se...@googlemail.com wrote: Dear Michael, dear all, motivation: object OtherEntities { case class Record( x:Int, a: String) case class Mapping( x: Int, y: Int ) val records = Seq( Record(1, hello), Record(2, bob)) val mappings = Seq( Mapping(2, 5) ) } Now I want to perform an *left outer join* on records and mappings (with the ON JOIN criterion on columns (recordDF(x) === mappingDF(x) shorthand is in *leftOuterJoinWithRemovalOfEqualColumn* val sqlContext = new SQLContext(sc) // used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ val recordDF= sc.parallelize(OtherEntities.records, 4).toDF() val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF() val joinedDF = recordDF.leftOuterJoinWithRemovalOfEqualColumn( mappingDF, x) joinedDF.filter(joinedDF(y).isNotNull).show Currently, the output is +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ instead of +-+---+-+ |x| a|y| +-+---+-+ |2|bob|5| +-+---+-+ The last output can be achieved by the method of changing nullable=false to nullable=true described in my first post. *Thus, I need this schema modification as to make outer joins work.* Cheers and thanks, Martin 2015-07-30 20:23 GMT+02:00 Michael Armbrust mich...@databricks.com: We don't yet updated nullability information based on predicates as we don't actually leverage this information in many places yet. Why do you want to update the schema? On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 martin.se...@googlemail.com wrote: Hi all, 1. *Columns in dataframes can be nullable and not nullable. Having a nullable column of Doubles, I can use the following Scala code to filter all non-null rows:* val df = . // some code that creates a DataFrame df.filter( df(columnname).isNotNull() ) +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ root |-- x: integer (nullable = false) |-- a: string (nullable = true) |-- y: integer (nullable = true) And with the filter expression +-+---+-+ |x| a|y| +-+---+-+ |2|bob|5| +-+---+-+ Unfortunetaly and while this is a true for a nullable column (according to df.printSchema), it is not true for a column that is not nullable: +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ root |-- x: integer (nullable = false) |-- a: string (nullable = true) |-- y: integer (nullable = false) +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ such that the output is not affected by the filter. Is this intended? 2. *What is the cheapest (in sense of performance) to turn a non-nullable column into a nullable column? A came uo with this:* /** * Set, if a column is nullable. * @param df source DataFrame * @param cn is the column name to change * @param nullable is the flag to set, such that the column is either nullable or not */ def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = { val schema = df.schema val newSchema = StructType(schema.map { case StructField( c, t, _, m) if c.equals(cn) = StructField( c, t, nullable = nullable, m) case y: StructField = y }) df.sqlContext.createDataFrame( df.rdd, newSchema) } Is there a cheaper solution? 3. *Any comments?* Cheers and thx in advance, Martin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL DataFrame: Nullable column and filtering
Dear Michael, dear all, distinguishing those records that have a match in mapping from those that don't is the crucial point. Record(x : Int, a: String) Mapping(x: Int, y: Int) Thus Record(1, hello) Record(2, bob) Mapping(2, 5) yield (2, bob, 5) on an inner join. BUT I'm also interested in (1, hello, null) as there is no counterpart in mapping (this is the left outer join part) I need to distinguish 1 and 2 because of later inserts (case 1, hello) or updates (case 2, bon). Cheers and thanks, Martin Am 30.07.2015 22:58 schrieb Michael Armbrust mich...@databricks.com: Perhaps I'm missing what you are trying to accomplish, but if you'd like to avoid the null values do an inner join instead of an outer join. Additionally, I'm confused about how the result of joinedDF.filter(joinedDF(y).isNotNull).show still contains null values in the column y. This doesn't really have anything to do with nullable, which is only a hint to the system so that we can avoid null checking when we know that there are no null values. If you provide the full code i can try and see if this is a bug. On Thu, Jul 30, 2015 at 11:53 AM, Martin Senne martin.se...@googlemail.com wrote: Dear Michael, dear all, motivation: object OtherEntities { case class Record( x:Int, a: String) case class Mapping( x: Int, y: Int ) val records = Seq( Record(1, hello), Record(2, bob)) val mappings = Seq( Mapping(2, 5) ) } Now I want to perform an left outer join on records and mappings (with the ON JOIN criterion on columns (recordDF(x) === mappingDF(x) shorthand is in leftOuterJoinWithRemovalOfEqualColumn val sqlContext = new SQLContext(sc) // used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ val recordDF= sc.parallelize(OtherEntities.records, 4).toDF() val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF() val joinedDF = recordDF.leftOuterJoinWithRemovalOfEqualColumn( mappingDF, x) joinedDF.filter(joinedDF(y).isNotNull).show Currently, the output is +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ instead of +-+---+-+ |x| a|y| +-+---+-+ |2|bob|5| +-+---+-+ The last output can be achieved by the method of changing nullable=false to nullable=true described in my first post. Thus, I need this schema modification as to make outer joins work. Cheers and thanks, Martin 2015-07-30 20:23 GMT+02:00 Michael Armbrust mich...@databricks.com: We don't yet updated nullability information based on predicates as we don't actually leverage this information in many places yet. Why do you want to update the schema? On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 martin.se...@googlemail.com wrote: Hi all, 1. *Columns in dataframes can be nullable and not nullable. Having a nullable column of Doubles, I can use the following Scala code to filter all non-null rows:* val df = . // some code that creates a DataFrame df.filter( df(columnname).isNotNull() ) +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ root |-- x: integer (nullable = false) |-- a: string (nullable = true) |-- y: integer (nullable = true) And with the filter expression +-+---+-+ |x| a|y| +-+---+-+ |2|bob|5| +-+---+-+ Unfortunetaly and while this is a true for a nullable column (according to df.printSchema), it is not true for a column that is not nullable: +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ root |-- x: integer (nullable = false) |-- a: string (nullable = true) |-- y: integer (nullable = false) +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ such that the output is not affected by the filter. Is this intended? 2. *What is the cheapest (in sense of performance) to turn a non-nullable column into a nullable column? A came uo with this:* /** * Set, if a column is nullable. * @param df source DataFrame * @param cn is the column name to change * @param nullable is the flag to set, such that the column is either nullable or not */ def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = { val schema = df.schema val newSchema = StructType(schema.map { case StructField( c, t, _, m) if c.equals(cn) = StructField( c, t, nullable = nullable, m) case y: StructField = y }) df.sqlContext.createDataFrame( df.rdd, newSchema) } Is there a cheaper solution? 3. *Any comments?* Cheers and thx in advance, Martin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail:
Re: Spark SQL DataFrame: Nullable column and filtering
Dear Michael, dear all, motivation: object OtherEntities { case class Record( x:Int, a: String) case class Mapping( x: Int, y: Int ) val records = Seq( Record(1, hello), Record(2, bob)) val mappings = Seq( Mapping(2, 5) ) } Now I want to perform an *left outer join* on records and mappings (with the ON JOIN criterion on columns (recordDF(x) === mappingDF(x) shorthand is in *leftOuterJoinWithRemovalOfEqualColumn* val sqlContext = new SQLContext(sc) // used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ val recordDF= sc.parallelize(OtherEntities.records, 4).toDF() val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF() val joinedDF = recordDF.leftOuterJoinWithRemovalOfEqualColumn( mappingDF, x) joinedDF.filter(joinedDF(y).isNotNull).show Currently, the output is +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ instead of +-+---+-+ |x| a|y| +-+---+-+ |2|bob|5| +-+---+-+ The last output can be achieved by the method of changing nullable=false to nullable=true described in my first post. *Thus, I need this schema modification as to make outer joins work.* Cheers and thanks, Martin 2015-07-30 20:23 GMT+02:00 Michael Armbrust mich...@databricks.com: We don't yet updated nullability information based on predicates as we don't actually leverage this information in many places yet. Why do you want to update the schema? On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 martin.se...@googlemail.com wrote: Hi all, 1. *Columns in dataframes can be nullable and not nullable. Having a nullable column of Doubles, I can use the following Scala code to filter all non-null rows:* val df = . // some code that creates a DataFrame df.filter( df(columnname).isNotNull() ) +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ root |-- x: integer (nullable = false) |-- a: string (nullable = true) |-- y: integer (nullable = true) And with the filter expression +-+---+-+ |x| a|y| +-+---+-+ |2|bob|5| +-+---+-+ Unfortunetaly and while this is a true for a nullable column (according to df.printSchema), it is not true for a column that is not nullable: +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ root |-- x: integer (nullable = false) |-- a: string (nullable = true) |-- y: integer (nullable = false) +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ such that the output is not affected by the filter. Is this intended? 2. *What is the cheapest (in sense of performance) to turn a non-nullable column into a nullable column? A came uo with this:* /** * Set, if a column is nullable. * @param df source DataFrame * @param cn is the column name to change * @param nullable is the flag to set, such that the column is either nullable or not */ def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = { val schema = df.schema val newSchema = StructType(schema.map { case StructField( c, t, _, m) if c.equals(cn) = StructField( c, t, nullable = nullable, m) case y: StructField = y }) df.sqlContext.createDataFrame( df.rdd, newSchema) } Is there a cheaper solution? 3. *Any comments?* Cheers and thx in advance, Martin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL DataFrame: Nullable column and filtering
We don't yet updated nullability information based on predicates as we don't actually leverage this information in many places yet. Why do you want to update the schema? On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 martin.se...@googlemail.com wrote: Hi all, 1. *Columns in dataframes can be nullable and not nullable. Having a nullable column of Doubles, I can use the following Scala code to filter all non-null rows:* val df = . // some code that creates a DataFrame df.filter( df(columnname).isNotNull() ) +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ root |-- x: integer (nullable = false) |-- a: string (nullable = true) |-- y: integer (nullable = true) And with the filter expression +-+---+-+ |x| a|y| +-+---+-+ |2|bob|5| +-+---+-+ Unfortunetaly and while this is a true for a nullable column (according to df.printSchema), it is not true for a column that is not nullable: +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ root |-- x: integer (nullable = false) |-- a: string (nullable = true) |-- y: integer (nullable = false) +-+-++ |x|a| y| +-+-++ |1|hello|null| |2| bob| 5| +-+-++ such that the output is not affected by the filter. Is this intended? 2. *What is the cheapest (in sense of performance) to turn a non-nullable column into a nullable column? A came uo with this:* /** * Set, if a column is nullable. * @param df source DataFrame * @param cn is the column name to change * @param nullable is the flag to set, such that the column is either nullable or not */ def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = { val schema = df.schema val newSchema = StructType(schema.map { case StructField( c, t, _, m) if c.equals(cn) = StructField( c, t, nullable = nullable, m) case y: StructField = y }) df.sqlContext.createDataFrame( df.rdd, newSchema) } Is there a cheaper solution? 3. *Any comments?* Cheers and thx in advance, Martin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org