Re: Spark SQL DataFrame: Nullable column and filtering

2015-08-01 Thread Martin Senne
Dear all,

after some fiddling I have arrived at this solution:

/**
 * Customized left outer join on common column.
 */
def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF:
DataFrame, commonColumnName: String): DataFrame = {
  val joinedDF = leftDF.as('left).join(rightDF.as('right),
leftDF(commonColumnName) === rightDF(commonColumnName), leftouter)

  import joinedDF.sqlContext.implicits._
  val leftColumns = leftDF.columns
.map((cn: String) = $left.$cn)
  val rightColumns = rightDF.columns.filterNot(cn =
cn.equals(commonColumnName))
.map((cn: String) = $right.$cn)

  joinedDF.select( leftColumns ++ rightColumns: _*)
}

Comments welcome

Alternatives I tried:

   - Not Working: If at least the right alias for rightDF is present, one
   could try

   joinedDF.drop(right. + columnname)

   but his does not work (no column is dropped).
   Unfortunately, drop does not support arguments of type Column /
   ColumnNames. *@Michael: Should I create a feature request in Jira for
   drop supporting Columns?*

   -

   Working: Without using aliases via as(...), but using column
renaming instead:

   rightDF.withColumnRenamed( communColumnName, right_ +
commoncolumnName) to rename the right dataframe column and then do the
join criterion as
   leftDF(commonColumnName) === rightDF(right_ + commonColumnName)

   In my opinion not so neat. Opinions?


Things I observed:

   - Column handling does not seem consistent
  - select() supports alias, while drop( ... ) only supports
  strings.
  - DataFrame.apply(  ) and DataFrame.col do also not support alias.
  - Thus the only way to handly ambiguous columnNames is via select at
  the moment. Can someone please confirm this!
  - Alias information is not displayed via DataFrame.printSchema. (or
   at least I did not find a way of how to)

Cheers,

Martin

2015-07-31 22:51 GMT+02:00 Martin Senne martin.se...@googlemail.com:

 Dear Michael, dear all,

 a minimal example is listed below.

 After some further analysis I could figure out, that the problem is
 related to the *leftOuterJoinWithRemovalOfEqualColumn*-Method, as I use
 columns of the left and right dataframes when doing the select on the
 joined table.

   /**
* Customized left outer join on common column.
*/
   def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: 
 DataFrame, commonColumnName: String): DataFrame = {

 val leftColumns = leftDF.columns.map((cn: String) = leftDF(cn))
 val rightColumns = rightDF.columns.filterNot(cn = 
 cn.equals(commonColumnName)).map(cn = rightDF(cn))

 leftDF.join(rightDF, leftDF(commonColumnName) === 
 rightDF(commonColumnName), leftouter)
   .select(leftColumns ++ rightColumns: _*)
   }

 As the column y of the right table has nullable=false, this is then also 
 transferred to the joined-Table y-Column, as I use rightDF( y ).

 Thus, I need to use columns of the joined table for the select.

 *Question now: The joined table has column names x, a, x, y. How do I 
 discard the second x column?*

 All my approaches failed (assuming here, that joinedDF is the joined 
 DataFrame.


- Using joinedDFdrop( x ) discards both x columns.
- Using joinedDF(x) does not work as it is ambigious
- Also using rightDF.as( aliasname)  in order to differentiate the
column x (from left DataFrame) with x (from right DataFrame) did not
work out, as I found no way as use select( $aliasname.x) really
programmatically. Could someone sketch the code?

 Any help welcome, thanks


 Martin



 
 import org.apache.spark.sql.types.{StructField, StructType}
 import org.apache.spark.{SparkContext, SparkConf}
 import org.apache.spark.sql.{DataFrame, SQLContext}

 object OtherEntities {

   case class Record( x:Int, a: String)
   case class Mapping( x: Int, y: Int )

   val records = Seq( Record(1, hello), Record(2, bob))
   val mappings = Seq( Mapping(2, 5) )
 }

 object MinimalShowcase {

   /**
* Customized left outer join on common column.
*/
   def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: 
 DataFrame, commonColumnName: String): DataFrame = {

 val leftColumns = leftDF.columns.map((cn: String) = leftDF(cn))
 val rightColumns = rightDF.columns.filterNot(cn = 
 cn.equals(commonColumnName)).map(cn = rightDF(cn))

 leftDF.join(rightDF, leftDF(commonColumnName) === 
 rightDF(commonColumnName), leftouter)
   .select(leftColumns ++ rightColumns: _*)
   }


   /**
* Set, if a column is nullable.
* @param df source DataFrame
* @param cn is the column name to change
* @param nullable is the flag to set, such that the column is either 
 nullable or not
*/
   def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) 
 : DataFrame = {

 val schema = df.schema
 val newSchema = StructType(schema.map {
   case StructField( c, t, _, m) if c.equals(cn) = StructField( c, 

Re: Spark SQL DataFrame: Nullable column and filtering

2015-07-31 Thread Martin Senne
Dear Michael, dear all,

a minimal example is listed below.

After some further analysis I could figure out, that the problem is related
to the *leftOuterJoinWithRemovalOfEqualColumn*-Method, as I use columns of
the left and right dataframes when doing the select on the joined table.

  /**
   * Customized left outer join on common column.
   */
  def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame,
rightDF: DataFrame, commonColumnName: String): DataFrame = {

val leftColumns = leftDF.columns.map((cn: String) = leftDF(cn))
val rightColumns = rightDF.columns.filterNot(cn =
cn.equals(commonColumnName)).map(cn = rightDF(cn))

leftDF.join(rightDF, leftDF(commonColumnName) ===
rightDF(commonColumnName), leftouter)
  .select(leftColumns ++ rightColumns: _*)
  }

As the column y of the right table has nullable=false, this is then
also transferred to the joined-Table y-Column, as I use rightDF( y
).

Thus, I need to use columns of the joined table for the select.

*Question now: The joined table has column names x, a, x, y.
How do I discard the second x column?*

All my approaches failed (assuming here, that joinedDF is the joined DataFrame.


   - Using joinedDFdrop( x ) discards both x columns.
   - Using joinedDF(x) does not work as it is ambigious
   - Also using rightDF.as( aliasname)  in order to differentiate the
   column x (from left DataFrame) with x (from right DataFrame) did not
   work out, as I found no way as use select( $aliasname.x) really
   programmatically. Could someone sketch the code?

Any help welcome, thanks


Martin




import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{DataFrame, SQLContext}

object OtherEntities {

  case class Record( x:Int, a: String)
  case class Mapping( x: Int, y: Int )

  val records = Seq( Record(1, hello), Record(2, bob))
  val mappings = Seq( Mapping(2, 5) )
}

object MinimalShowcase {

  /**
   * Customized left outer join on common column.
   */
  def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame,
rightDF: DataFrame, commonColumnName: String): DataFrame = {

val leftColumns = leftDF.columns.map((cn: String) = leftDF(cn))
val rightColumns = rightDF.columns.filterNot(cn =
cn.equals(commonColumnName)).map(cn = rightDF(cn))

leftDF.join(rightDF, leftDF(commonColumnName) ===
rightDF(commonColumnName), leftouter)
  .select(leftColumns ++ rightColumns: _*)
  }


  /**
   * Set, if a column is nullable.
   * @param df source DataFrame
   * @param cn is the column name to change
   * @param nullable is the flag to set, such that the column is
either nullable or not
   */
  def setNullableStateOfColumn( df: DataFrame, cn: String, nullable:
Boolean) : DataFrame = {

val schema = df.schema
val newSchema = StructType(schema.map {
  case StructField( c, t, _, m) if c.equals(cn) = StructField( c,
t, nullable = nullable, m)
  case y: StructField = y
})
df.sqlContext.createDataFrame( df.rdd, newSchema)
  }


  def main (args: Array[String]) {
val conf = new SparkConf()
  .setAppName(Minimal)
  .setMaster(local[*])

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

val recordDF = sc.parallelize(OtherEntities.records, 4).toDF()
val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF()
val mappingWithNullDF = setNullableStateOfColumn(mappingDF, y, true)

val joinedDF = recordDF.join(mappingDF, recordDF(x) ===
mappingDF(x), leftouter)
println(joinedDF:)
joinedDF.show
joinedDF.printSchema
joinedDF.filter(joinedDF(y).isNotNull).show

//joinedDF:
//+-+-+++
//|x|a|   x|   y|
//+-+-+++
//|1|hello|null|null|
//|2|  bob|   2|   5|
//+-+-+++
//
//root
//|-- x: integer (nullable = false)
//|-- a: string (nullable = true)
//|-- x: integer (nullable = true)
//|-- y: integer (nullable = true)
//
//+-+---+-+-+
//|x|  a|x|y|
//+-+---+-+-+
//|2|bob|2|5|
//+-+---+-+-+


val extrajoinedDF =
leftOuterJoinWithRemovalOfEqualColumn(recordDF, mappingDF, x)
println(extrajoinedDF:)
extrajoinedDF.show
extrajoinedDF.printSchema
extrajoinedDF.filter(extrajoinedDF(y).isNotNull).show

//extrajoinedDF:
//+-+-++
//|x|a|   y|
//+-+-++
//|1|hello|null|
//|2|  bob|   5|
//+-+-++
//
//root
//|-- x: integer (nullable = false)
//|-- a: string (nullable = true)
//|-- y: integer (nullable = false)
//
//+-+-++
//|x|a|   y|
//+-+-++
//|1|hello|null|
//|2|  bob|   5|
//+-+-++



val joined2DF = recordDF.join(mappingWithNullDF, recordDF(x) ===
mappingWithNullDF(x), leftouter)
println(joined2DF:)

Re: Spark SQL DataFrame: Nullable column and filtering

2015-07-30 Thread Michael Armbrust
Perhaps I'm missing what you are trying to accomplish, but if you'd like to
avoid the null values do an inner join instead of an outer join.

Additionally, I'm confused about how the result of joinedDF.filter(joinedDF(
y).isNotNull).show still contains null values in the column y. This
doesn't really have anything to do with nullable, which is only a hint to
the system so that we can avoid null checking when we know that there are
no null values. If you provide the full code i can try and see if this is a
bug.

On Thu, Jul 30, 2015 at 11:53 AM, Martin Senne martin.se...@googlemail.com
wrote:

 Dear Michael, dear all,

 motivation:

 object OtherEntities {

   case class Record( x:Int, a: String)
   case class Mapping( x: Int, y: Int )

   val records = Seq( Record(1, hello), Record(2, bob))
   val mappings = Seq( Mapping(2, 5) )
 }

 Now I want to perform an *left outer join* on records and mappings (with the 
 ON JOIN criterion on columns (recordDF(x) === mappingDF(x)  shorthand 
 is in *leftOuterJoinWithRemovalOfEqualColumn*

 val sqlContext = new SQLContext(sc)
 // used to implicitly convert an RDD to a DataFrame.
 import sqlContext.implicits._

 val recordDF= sc.parallelize(OtherEntities.records, 4).toDF()
 val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF()

 val joinedDF = recordDF.leftOuterJoinWithRemovalOfEqualColumn( mappingDF, x)

 joinedDF.filter(joinedDF(y).isNotNull).show


 Currently, the output is

 +-+-++

 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 instead of

 +-+---+-+

 |x|  a|y|
 +-+---+-+
 |2|bob|5|
 +-+---+-+

 The last output can be achieved by the method of changing nullable=false
 to nullable=true described in my first post.

 *Thus, I need this schema modification as to make outer joins work.*

 Cheers and thanks,

 Martin



 2015-07-30 20:23 GMT+02:00 Michael Armbrust mich...@databricks.com:

 We don't yet updated nullability information based on predicates as we
 don't actually leverage this information in many places yet.  Why do you
 want to update the schema?

 On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 
 martin.se...@googlemail.com wrote:

 Hi all,

 1. *Columns in dataframes can be nullable and not nullable. Having a
 nullable column of Doubles, I can use the following Scala code to filter
 all
 non-null rows:*

   val df = . // some code that creates a DataFrame
   df.filter( df(columnname).isNotNull() )

 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 root
  |-- x: integer (nullable = false)
  |-- a: string (nullable = true)
  |-- y: integer (nullable = true)

 And with the filter expression
 +-+---+-+
 |x|  a|y|
 +-+---+-+
 |2|bob|5|
 +-+---+-+


 Unfortunetaly and while this is a true for a nullable column (according
 to
 df.printSchema), it is not true for a column that is not nullable:


 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 root
  |-- x: integer (nullable = false)
  |-- a: string (nullable = true)
  |-- y: integer (nullable = false)

 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 such that the output is not affected by the filter. Is this intended?


 2. *What is the cheapest (in sense of performance) to turn a non-nullable
 column into a nullable column?
 A came uo with this:*

   /**
* Set, if a column is nullable.
* @param df source DataFrame
* @param cn is the column name to change
* @param nullable is the flag to set, such that the column is either
 nullable or not
*/
   def setNullableStateOfColumn( df: DataFrame, cn: String, nullable:
 Boolean) : DataFrame = {

 val schema = df.schema
 val newSchema = StructType(schema.map {
   case StructField( c, t, _, m) if c.equals(cn) = StructField( c, t,
 nullable = nullable, m)
   case y: StructField = y
 })
 df.sqlContext.createDataFrame( df.rdd, newSchema)
   }

 Is there a cheaper solution?

 3. *Any comments?*

 Cheers and thx in advance,

 Martin






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Spark SQL DataFrame: Nullable column and filtering

2015-07-30 Thread Martin Senne
Dear Michael, dear all,

distinguishing those records that have a match in mapping from those that
don't is the crucial point.

Record(x : Int,  a: String)
Mapping(x: Int, y: Int)

Thus

Record(1, hello)
Record(2, bob)
Mapping(2, 5)

yield (2, bob, 5) on an inner join.
BUT I'm also interested in (1, hello, null) as there is no counterpart in
mapping (this is the left outer join part)

I need to distinguish 1 and 2 because of later inserts (case 1, hello) or
updates (case 2, bon).

Cheers and thanks,

Martin
Am 30.07.2015 22:58 schrieb Michael Armbrust mich...@databricks.com:

 Perhaps I'm missing what you are trying to accomplish, but if you'd like
to avoid the null values do an inner join instead of an outer join.

 Additionally, I'm confused about how the result
of joinedDF.filter(joinedDF(y).isNotNull).show still contains null values
in the column y. This doesn't really have anything to do with nullable,
which is only a hint to the system so that we can avoid null checking when
we know that there are no null values. If you provide the full code i can
try and see if this is a bug.

 On Thu, Jul 30, 2015 at 11:53 AM, Martin Senne 
martin.se...@googlemail.com wrote:

 Dear Michael, dear all,

 motivation:

 object OtherEntities {

   case class Record( x:Int, a: String)
   case class Mapping( x: Int, y: Int )

   val records = Seq( Record(1, hello), Record(2, bob))
   val mappings = Seq( Mapping(2, 5) )
 }

 Now I want to perform an left outer join on records and mappings (with
the ON JOIN criterion on columns (recordDF(x) === mappingDF(x) 
shorthand is in leftOuterJoinWithRemovalOfEqualColumn

 val sqlContext = new SQLContext(sc)
 // used to implicitly convert an RDD to a DataFrame.
 import sqlContext.implicits._

 val recordDF= sc.parallelize(OtherEntities.records, 4).toDF()
 val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF()

 val joinedDF = recordDF.leftOuterJoinWithRemovalOfEqualColumn(
mappingDF, x)

 joinedDF.filter(joinedDF(y).isNotNull).show


 Currently, the output is


+-+-++

 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 instead of


+-+---+-+

 |x|  a|y|
 +-+---+-+
 |2|bob|5|
 +-+---+-+

 The last output can be achieved by the method of changing nullable=false
to nullable=true described in my first post.

 Thus, I need this schema modification as to make outer joins work.

 Cheers and thanks,

 Martin



 2015-07-30 20:23 GMT+02:00 Michael Armbrust mich...@databricks.com:

 We don't yet updated nullability information based on predicates as we
don't actually leverage this information in many places yet.  Why do you
want to update the schema?

 On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 
martin.se...@googlemail.com wrote:

 Hi all,

 1. *Columns in dataframes can be nullable and not nullable. Having a
 nullable column of Doubles, I can use the following Scala code to
filter all
 non-null rows:*

   val df = . // some code that creates a DataFrame
   df.filter( df(columnname).isNotNull() )

 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 root
  |-- x: integer (nullable = false)
  |-- a: string (nullable = true)
  |-- y: integer (nullable = true)

 And with the filter expression
 +-+---+-+
 |x|  a|y|
 +-+---+-+
 |2|bob|5|
 +-+---+-+


 Unfortunetaly and while this is a true for a nullable column
(according to
 df.printSchema), it is not true for a column that is not nullable:


 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 root
  |-- x: integer (nullable = false)
  |-- a: string (nullable = true)
  |-- y: integer (nullable = false)

 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 such that the output is not affected by the filter. Is this intended?


 2. *What is the cheapest (in sense of performance) to turn a
non-nullable
 column into a nullable column?
 A came uo with this:*

   /**
* Set, if a column is nullable.
* @param df source DataFrame
* @param cn is the column name to change
* @param nullable is the flag to set, such that the column is either
 nullable or not
*/
   def setNullableStateOfColumn( df: DataFrame, cn: String, nullable:
 Boolean) : DataFrame = {

 val schema = df.schema
 val newSchema = StructType(schema.map {
   case StructField( c, t, _, m) if c.equals(cn) = StructField( c,
t,
 nullable = nullable, m)
   case y: StructField = y
 })
 df.sqlContext.createDataFrame( df.rdd, newSchema)
   }

 Is there a cheaper solution?

 3. *Any comments?*

 Cheers and thx in advance,

 Martin






 --
 View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html
 Sent from the Apache Spark User List mailing list archive at
Nabble.com.

 -
 To unsubscribe, e-mail: 

Re: Spark SQL DataFrame: Nullable column and filtering

2015-07-30 Thread Martin Senne
Dear Michael, dear all,

motivation:

object OtherEntities {

  case class Record( x:Int, a: String)
  case class Mapping( x: Int, y: Int )

  val records = Seq( Record(1, hello), Record(2, bob))
  val mappings = Seq( Mapping(2, 5) )
}

Now I want to perform an *left outer join* on records and mappings
(with the ON JOIN criterion on columns (recordDF(x) ===
mappingDF(x)  shorthand is in
*leftOuterJoinWithRemovalOfEqualColumn*

val sqlContext = new SQLContext(sc)
// used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

val recordDF= sc.parallelize(OtherEntities.records, 4).toDF()
val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF()

val joinedDF = recordDF.leftOuterJoinWithRemovalOfEqualColumn( mappingDF, x)

joinedDF.filter(joinedDF(y).isNotNull).show


Currently, the output is

+-+-++

|x|a|   y|
+-+-++
|1|hello|null|
|2|  bob|   5|
+-+-++

instead of

+-+---+-+

|x|  a|y|
+-+---+-+
|2|bob|5|
+-+---+-+

The last output can be achieved by the method of changing nullable=false to
nullable=true described in my first post.

*Thus, I need this schema modification as to make outer joins work.*

Cheers and thanks,

Martin



2015-07-30 20:23 GMT+02:00 Michael Armbrust mich...@databricks.com:

 We don't yet updated nullability information based on predicates as we
 don't actually leverage this information in many places yet.  Why do you
 want to update the schema?

 On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 
 martin.se...@googlemail.com wrote:

 Hi all,

 1. *Columns in dataframes can be nullable and not nullable. Having a
 nullable column of Doubles, I can use the following Scala code to filter
 all
 non-null rows:*

   val df = . // some code that creates a DataFrame
   df.filter( df(columnname).isNotNull() )

 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 root
  |-- x: integer (nullable = false)
  |-- a: string (nullable = true)
  |-- y: integer (nullable = true)

 And with the filter expression
 +-+---+-+
 |x|  a|y|
 +-+---+-+
 |2|bob|5|
 +-+---+-+


 Unfortunetaly and while this is a true for a nullable column (according to
 df.printSchema), it is not true for a column that is not nullable:


 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 root
  |-- x: integer (nullable = false)
  |-- a: string (nullable = true)
  |-- y: integer (nullable = false)

 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 such that the output is not affected by the filter. Is this intended?


 2. *What is the cheapest (in sense of performance) to turn a non-nullable
 column into a nullable column?
 A came uo with this:*

   /**
* Set, if a column is nullable.
* @param df source DataFrame
* @param cn is the column name to change
* @param nullable is the flag to set, such that the column is either
 nullable or not
*/
   def setNullableStateOfColumn( df: DataFrame, cn: String, nullable:
 Boolean) : DataFrame = {

 val schema = df.schema
 val newSchema = StructType(schema.map {
   case StructField( c, t, _, m) if c.equals(cn) = StructField( c, t,
 nullable = nullable, m)
   case y: StructField = y
 })
 df.sqlContext.createDataFrame( df.rdd, newSchema)
   }

 Is there a cheaper solution?

 3. *Any comments?*

 Cheers and thx in advance,

 Martin






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Spark SQL DataFrame: Nullable column and filtering

2015-07-30 Thread Michael Armbrust
We don't yet updated nullability information based on predicates as we
don't actually leverage this information in many places yet.  Why do you
want to update the schema?

On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 martin.se...@googlemail.com
wrote:

 Hi all,

 1. *Columns in dataframes can be nullable and not nullable. Having a
 nullable column of Doubles, I can use the following Scala code to filter
 all
 non-null rows:*

   val df = . // some code that creates a DataFrame
   df.filter( df(columnname).isNotNull() )

 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 root
  |-- x: integer (nullable = false)
  |-- a: string (nullable = true)
  |-- y: integer (nullable = true)

 And with the filter expression
 +-+---+-+
 |x|  a|y|
 +-+---+-+
 |2|bob|5|
 +-+---+-+


 Unfortunetaly and while this is a true for a nullable column (according to
 df.printSchema), it is not true for a column that is not nullable:


 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 root
  |-- x: integer (nullable = false)
  |-- a: string (nullable = true)
  |-- y: integer (nullable = false)

 +-+-++
 |x|a|   y|
 +-+-++
 |1|hello|null|
 |2|  bob|   5|
 +-+-++

 such that the output is not affected by the filter. Is this intended?


 2. *What is the cheapest (in sense of performance) to turn a non-nullable
 column into a nullable column?
 A came uo with this:*

   /**
* Set, if a column is nullable.
* @param df source DataFrame
* @param cn is the column name to change
* @param nullable is the flag to set, such that the column is either
 nullable or not
*/
   def setNullableStateOfColumn( df: DataFrame, cn: String, nullable:
 Boolean) : DataFrame = {

 val schema = df.schema
 val newSchema = StructType(schema.map {
   case StructField( c, t, _, m) if c.equals(cn) = StructField( c, t,
 nullable = nullable, m)
   case y: StructField = y
 })
 df.sqlContext.createDataFrame( df.rdd, newSchema)
   }

 Is there a cheaper solution?

 3. *Any comments?*

 Cheers and thx in advance,

 Martin






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org