Re: Strange behavior with 'not' and filter pushdown

2017-02-14 Thread Everett Anderson
Wrapping this up -- fix is in 2.1.0 and has been backported to the 2.0.x
branch, as well.

On Mon, Feb 13, 2017 at 6:41 PM, Everett Anderson  wrote:

> Went ahead and opened
>
> https://issues.apache.org/jira/browse/SPARK-19586
>
> though I'd generally expect to just close it as fixed in 2.1.0 and roll on.
>
> On Sat, Feb 11, 2017 at 5:01 PM, Everett Anderson 
> wrote:
>
>> On the plus side, looks like this may be fixed in 2.1.0:
>>
>> == Physical Plan ==
>> *HashAggregate(keys=[], functions=[count(1)])
>> +- Exchange SinglePartition
>>+- *HashAggregate(keys=[], functions=[partial_count(1)])
>>   +- *Project
>>  +- *Filter NOT isnotnull(username#14)
>> +- *FileScan parquet [username#14] Batched: true, Format:
>> Parquet, Location: InMemoryFileIndex[file:/tmp/test_table],
>> PartitionFilters: [], PushedFilters: [Not(IsNotNull(username))],
>> ReadSchema: struct
>>
>>
>>
>> On Fri, Feb 10, 2017 at 11:26 AM, Everett Anderson 
>> wrote:
>>
>>> Bumping this thread.
>>>
>>> Translating "where not(username is not null)" into a filter of  
>>> [IsNotNull(username),
>>> Not(IsNotNull(username))] seems like a rather severe bug.
>>>
>>> Spark 1.6.2:
>>>
>>> explain select count(*) from parquet_table where not( username is not
>>> null)
>>>
>>> == Physical Plan ==
>>> TungstenAggregate(key=[], 
>>> functions=[(count(1),mode=Final,isDistinct=false)],
>>> output=[_c0#1822L])
>>> +- TungstenExchange SinglePartition, None
>>>  +- TungstenAggregate(key=[], 
>>> functions=[(count(1),mode=Partial,isDistinct=false)],
>>> output=[count#1825L])
>>>  +- Project
>>>  +- Filter NOT isnotnull(username#1590)
>>>  +- Scan ParquetRelation[username#1590] InputPaths: ,
>>> PushedFilters: [Not(IsNotNull(username))]
>>>
>>> Spark 2.0.2
>>>
>>> explain select count(*) from parquet_table where not( username is not
>>> null)
>>>
>>> == Physical Plan ==
>>> *HashAggregate(keys=[], functions=[count(1)])
>>> +- Exchange SinglePartition
>>>  +- *HashAggregate(keys=[], functions=[partial_count(1)])
>>>  +- *Project
>>>  +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
>>>  +- *BatchedScan parquet default.[username#35] Format:
>>> ParquetFormat, InputPaths: , PartitionFilters: [],
>>> PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],
>>> ReadSchema: struct
>>>
>>> Example to generate the above:
>>>
>>> // Create some fake data
>>>
>>> import org.apache.spark.sql.Row
>>> import org.apache.spark.sql.Dataset
>>> import org.apache.spark.sql.types._
>>>
>>> val rowsRDD = sc.parallelize(Seq(
>>> Row(1, "fred"),
>>> Row(2, "amy"),
>>> Row(3, null)))
>>>
>>> val schema = StructType(Seq(
>>> StructField("id", IntegerType, nullable = true),
>>> StructField("username", StringType, nullable = true)))
>>>
>>> val data = sqlContext.createDataFrame(rowsRDD, schema)
>>>
>>> val path = "SOME PATH HERE"
>>>
>>> data.write.mode("overwrite").parquet(path)
>>>
>>> val testData = sqlContext.read.parquet(path)
>>>
>>> testData.registerTempTable("filter_test_table")
>>>
>>>
>>> %sql
>>> explain select count(*) from filter_test_table where not( username is
>>> not null)
>>>
>>>
>>> On Wed, Feb 8, 2017 at 4:56 PM, Alexi Kostibas <
>>> akosti...@nuna.com.invalid> wrote:
>>>
 Hi,

 I have an application where I’m filtering data with SparkSQL with
 simple WHERE clauses. I also want the ability to show the unmatched rows
 for any filter, and so am wrapping the previous clause in `NOT()` to get
 the inverse. Example:

 Filter:  username is not null
 Inverse filter:  NOT(username is not null)

 This worked fine in Spark 1.6. After upgrading to Spark 2.0.2, the
 inverse filter always returns zero results. It looks like this is a problem
 with how the filter is getting pushed down to Parquet. Specifically, the
 pushdown includes both the “is not null” filter, AND “not(is not null)”,
 which would obviously result in zero matches. An example below:

 pyspark:
 > x = spark.sql('select my_id from my_table where *username is not
 null*')
 > y = spark.sql('select my_id from my_table where not(*username is not
 null*)')

 > x.explain()
 == Physical Plan ==
 *Project [my_id#6L]
 +- *Filter isnotnull(username#91)
+- *BatchedScan parquet default.my_table[my_id#6L,username#91]
Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
PartitionFilters: [], PushedFilters: [IsNotNull(username)],
ReadSchema: struct
 [1159]> y.explain()
 == Physical Plan ==
 *Project [my_id#6L]
 +- *Filter (isnotnull(username#91) && NOT isnotnull(username#91))usernam
 e
+- *BatchedScan parquet default.my_table[my_id#6L,username#91]
Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
PartitionFilters: [],
PushedFilters: [IsNotNull(username),
 Not(IsNotNull(username))],username
ReadSchema: struct

>>>

Re: Strange behavior with 'not' and filter pushdown

2017-02-13 Thread Everett Anderson
Went ahead and opened

https://issues.apache.org/jira/browse/SPARK-19586

though I'd generally expect to just close it as fixed in 2.1.0 and roll on.

On Sat, Feb 11, 2017 at 5:01 PM, Everett Anderson  wrote:

> On the plus side, looks like this may be fixed in 2.1.0:
>
> == Physical Plan ==
> *HashAggregate(keys=[], functions=[count(1)])
> +- Exchange SinglePartition
>+- *HashAggregate(keys=[], functions=[partial_count(1)])
>   +- *Project
>  +- *Filter NOT isnotnull(username#14)
> +- *FileScan parquet [username#14] Batched: true, Format:
> Parquet, Location: InMemoryFileIndex[file:/tmp/test_table],
> PartitionFilters: [], PushedFilters: [Not(IsNotNull(username))],
> ReadSchema: struct
>
>
>
> On Fri, Feb 10, 2017 at 11:26 AM, Everett Anderson 
> wrote:
>
>> Bumping this thread.
>>
>> Translating "where not(username is not null)" into a filter of  
>> [IsNotNull(username),
>> Not(IsNotNull(username))] seems like a rather severe bug.
>>
>> Spark 1.6.2:
>>
>> explain select count(*) from parquet_table where not( username is not
>> null)
>>
>> == Physical Plan ==
>> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)],
>> output=[_c0#1822L])
>> +- TungstenExchange SinglePartition, None
>>  +- TungstenAggregate(key=[], 
>> functions=[(count(1),mode=Partial,isDistinct=false)],
>> output=[count#1825L])
>>  +- Project
>>  +- Filter NOT isnotnull(username#1590)
>>  +- Scan ParquetRelation[username#1590] InputPaths: ,
>> PushedFilters: [Not(IsNotNull(username))]
>>
>> Spark 2.0.2
>>
>> explain select count(*) from parquet_table where not( username is not
>> null)
>>
>> == Physical Plan ==
>> *HashAggregate(keys=[], functions=[count(1)])
>> +- Exchange SinglePartition
>>  +- *HashAggregate(keys=[], functions=[partial_count(1)])
>>  +- *Project
>>  +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
>>  +- *BatchedScan parquet default.[username#35] Format:
>> ParquetFormat, InputPaths: , PartitionFilters: [],
>> PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],
>> ReadSchema: struct
>>
>> Example to generate the above:
>>
>> // Create some fake data
>>
>> import org.apache.spark.sql.Row
>> import org.apache.spark.sql.Dataset
>> import org.apache.spark.sql.types._
>>
>> val rowsRDD = sc.parallelize(Seq(
>> Row(1, "fred"),
>> Row(2, "amy"),
>> Row(3, null)))
>>
>> val schema = StructType(Seq(
>> StructField("id", IntegerType, nullable = true),
>> StructField("username", StringType, nullable = true)))
>>
>> val data = sqlContext.createDataFrame(rowsRDD, schema)
>>
>> val path = "SOME PATH HERE"
>>
>> data.write.mode("overwrite").parquet(path)
>>
>> val testData = sqlContext.read.parquet(path)
>>
>> testData.registerTempTable("filter_test_table")
>>
>>
>> %sql
>> explain select count(*) from filter_test_table where not( username is not
>> null)
>>
>>
>> On Wed, Feb 8, 2017 at 4:56 PM, Alexi Kostibas <
>> akosti...@nuna.com.invalid> wrote:
>>
>>> Hi,
>>>
>>> I have an application where I’m filtering data with SparkSQL with simple
>>> WHERE clauses. I also want the ability to show the unmatched rows for any
>>> filter, and so am wrapping the previous clause in `NOT()` to get the
>>> inverse. Example:
>>>
>>> Filter:  username is not null
>>> Inverse filter:  NOT(username is not null)
>>>
>>> This worked fine in Spark 1.6. After upgrading to Spark 2.0.2, the
>>> inverse filter always returns zero results. It looks like this is a problem
>>> with how the filter is getting pushed down to Parquet. Specifically, the
>>> pushdown includes both the “is not null” filter, AND “not(is not null)”,
>>> which would obviously result in zero matches. An example below:
>>>
>>> pyspark:
>>> > x = spark.sql('select my_id from my_table where *username is not null*
>>> ')
>>> > y = spark.sql('select my_id from my_table where not(*username is not
>>> null*)')
>>>
>>> > x.explain()
>>> == Physical Plan ==
>>> *Project [my_id#6L]
>>> +- *Filter isnotnull(username#91)
>>>+- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>>>Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>>>PartitionFilters: [], PushedFilters: [IsNotNull(username)],
>>>ReadSchema: struct
>>> [1159]> y.explain()
>>> == Physical Plan ==
>>> *Project [my_id#6L]
>>> +- *Filter (isnotnull(username#91) && NOT isnotnull(username#91))usernam
>>> e
>>>+- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>>>Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>>>PartitionFilters: [],
>>>PushedFilters: [IsNotNull(username),
>>> Not(IsNotNull(username))],username
>>>ReadSchema: struct
>>>
>>> Presently I’m working around this by using the new functionality of NOT
>>> EXISTS in Spark 2, but that seems like overkill.
>>>
>>> Any help appreciated.
>>>
>>>
>>> *Alexi Kostibas*Engineering
>>> *Nuna*
>>> 650 Townsend Street, Suite 425
>>> San Francisco, CA 94103
>>>
>>>
>>
>


Re: Strange behavior with 'not' and filter pushdown

2017-02-11 Thread Everett Anderson
On the plus side, looks like this may be fixed in 2.1.0:

== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_count(1)])
  +- *Project
 +- *Filter NOT isnotnull(username#14)
+- *FileScan parquet [username#14] Batched: true, Format:
Parquet, Location: InMemoryFileIndex[file:/tmp/test_table],
PartitionFilters: [], PushedFilters: [Not(IsNotNull(username))],
ReadSchema: struct



On Fri, Feb 10, 2017 at 11:26 AM, Everett Anderson  wrote:

> Bumping this thread.
>
> Translating "where not(username is not null)" into a filter of  
> [IsNotNull(username),
> Not(IsNotNull(username))] seems like a rather severe bug.
>
> Spark 1.6.2:
>
> explain select count(*) from parquet_table where not( username is not null)
>
> == Physical Plan ==
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)],
> output=[_c0#1822L])
> +- TungstenExchange SinglePartition, None
>  +- TungstenAggregate(key=[], 
> functions=[(count(1),mode=Partial,isDistinct=false)],
> output=[count#1825L])
>  +- Project
>  +- Filter NOT isnotnull(username#1590)
>  +- Scan ParquetRelation[username#1590] InputPaths: ,
> PushedFilters: [Not(IsNotNull(username))]
>
> Spark 2.0.2
>
> explain select count(*) from parquet_table where not( username is not null)
>
> == Physical Plan ==
> *HashAggregate(keys=[], functions=[count(1)])
> +- Exchange SinglePartition
>  +- *HashAggregate(keys=[], functions=[partial_count(1)])
>  +- *Project
>  +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
>  +- *BatchedScan parquet default.[username#35] Format:
> ParquetFormat, InputPaths: , PartitionFilters: [],
> PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],
> ReadSchema: struct
>
> Example to generate the above:
>
> // Create some fake data
>
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.Dataset
> import org.apache.spark.sql.types._
>
> val rowsRDD = sc.parallelize(Seq(
> Row(1, "fred"),
> Row(2, "amy"),
> Row(3, null)))
>
> val schema = StructType(Seq(
> StructField("id", IntegerType, nullable = true),
> StructField("username", StringType, nullable = true)))
>
> val data = sqlContext.createDataFrame(rowsRDD, schema)
>
> val path = "SOME PATH HERE"
>
> data.write.mode("overwrite").parquet(path)
>
> val testData = sqlContext.read.parquet(path)
>
> testData.registerTempTable("filter_test_table")
>
>
> %sql
> explain select count(*) from filter_test_table where not( username is not
> null)
>
>
> On Wed, Feb 8, 2017 at 4:56 PM, Alexi Kostibas  > wrote:
>
>> Hi,
>>
>> I have an application where I’m filtering data with SparkSQL with simple
>> WHERE clauses. I also want the ability to show the unmatched rows for any
>> filter, and so am wrapping the previous clause in `NOT()` to get the
>> inverse. Example:
>>
>> Filter:  username is not null
>> Inverse filter:  NOT(username is not null)
>>
>> This worked fine in Spark 1.6. After upgrading to Spark 2.0.2, the
>> inverse filter always returns zero results. It looks like this is a problem
>> with how the filter is getting pushed down to Parquet. Specifically, the
>> pushdown includes both the “is not null” filter, AND “not(is not null)”,
>> which would obviously result in zero matches. An example below:
>>
>> pyspark:
>> > x = spark.sql('select my_id from my_table where *username is not null*
>> ')
>> > y = spark.sql('select my_id from my_table where not(*username is not
>> null*)')
>>
>> > x.explain()
>> == Physical Plan ==
>> *Project [my_id#6L]
>> +- *Filter isnotnull(username#91)
>>+- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>>Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>>PartitionFilters: [], PushedFilters: [IsNotNull(username)],
>>ReadSchema: struct
>> [1159]> y.explain()
>> == Physical Plan ==
>> *Project [my_id#6L]
>> +- *Filter (isnotnull(username#91) && NOT isnotnull(username#91))username
>>+- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>>Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>>PartitionFilters: [],
>>PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],user
>> name
>>ReadSchema: struct
>>
>> Presently I’m working around this by using the new functionality of NOT
>> EXISTS in Spark 2, but that seems like overkill.
>>
>> Any help appreciated.
>>
>>
>> *Alexi Kostibas*Engineering
>> *Nuna*
>> 650 Townsend Street, Suite 425
>> San Francisco, CA 94103
>>
>>
>


Re: Strange behavior with 'not' and filter pushdown

2017-02-10 Thread Everett Anderson
Bumping this thread.

Translating "where not(username is not null)" into a filter of
[IsNotNull(username),
Not(IsNotNull(username))] seems like a rather severe bug.

Spark 1.6.2:

explain select count(*) from parquet_table where not( username is not null)

== Physical Plan ==
TungstenAggregate(key=[],
functions=[(count(1),mode=Final,isDistinct=false)], output=[_c0#1822L])
+- TungstenExchange SinglePartition, None
 +- TungstenAggregate(key=[],
functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#1825L])
 +- Project
 +- Filter NOT isnotnull(username#1590)
 +- Scan ParquetRelation[username#1590] InputPaths: ,
PushedFilters: [Not(IsNotNull(username))]

Spark 2.0.2

explain select count(*) from parquet_table where not( username is not null)

== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
 +- *HashAggregate(keys=[], functions=[partial_count(1)])
 +- *Project
 +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
 +- *BatchedScan parquet default.[username#35] Format:
ParquetFormat, InputPaths: , PartitionFilters: [],
PushedFilters: [IsNotNull(username), Not(IsNotNull(username))], ReadSchema:
struct

Example to generate the above:

// Create some fake data

import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.types._

val rowsRDD = sc.parallelize(Seq(
Row(1, "fred"),
Row(2, "amy"),
Row(3, null)))

val schema = StructType(Seq(
StructField("id", IntegerType, nullable = true),
StructField("username", StringType, nullable = true)))

val data = sqlContext.createDataFrame(rowsRDD, schema)

val path = "SOME PATH HERE"

data.write.mode("overwrite").parquet(path)

val testData = sqlContext.read.parquet(path)

testData.registerTempTable("filter_test_table")


%sql
explain select count(*) from filter_test_table where not( username is not
null)


On Wed, Feb 8, 2017 at 4:56 PM, Alexi Kostibas 
wrote:

> Hi,
>
> I have an application where I’m filtering data with SparkSQL with simple
> WHERE clauses. I also want the ability to show the unmatched rows for any
> filter, and so am wrapping the previous clause in `NOT()` to get the
> inverse. Example:
>
> Filter:  username is not null
> Inverse filter:  NOT(username is not null)
>
> This worked fine in Spark 1.6. After upgrading to Spark 2.0.2, the inverse
> filter always returns zero results. It looks like this is a problem with
> how the filter is getting pushed down to Parquet. Specifically, the
> pushdown includes both the “is not null” filter, AND “not(is not null)”,
> which would obviously result in zero matches. An example below:
>
> pyspark:
> > x = spark.sql('select my_id from my_table where *username is not null*')
> > y = spark.sql('select my_id from my_table where not(*username is not
> null*)')
>
> > x.explain()
> == Physical Plan ==
> *Project [my_id#6L]
> +- *Filter isnotnull(username#91)
>+- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>PartitionFilters: [], PushedFilters: [IsNotNull(username)],
>ReadSchema: struct
> [1159]> y.explain()
> == Physical Plan ==
> *Project [my_id#6L]
> +- *Filter (isnotnull(username#91) && NOT isnotnull(username#91))username
>+- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>PartitionFilters: [],
>PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],
> username
>ReadSchema: struct
>
> Presently I’m working around this by using the new functionality of NOT
> EXISTS in Spark 2, but that seems like overkill.
>
> Any help appreciated.
>
>
> *Alexi Kostibas*Engineering
> *Nuna*
> 650 Townsend Street, Suite 425
> San Francisco, CA 94103
>
>


Strange behavior with 'not' and filter pushdown

2017-02-08 Thread Alexi Kostibas
Hi,

I have an application where I’m filtering data with SparkSQL with simple WHERE 
clauses. I also want the ability to show the unmatched rows for any filter, and 
so am wrapping the previous clause in `NOT()` to get the inverse. Example:

Filter:  username is not null
Inverse filter:  NOT(username is not null)

This worked fine in Spark 1.6. After upgrading to Spark 2.0.2, the inverse 
filter always returns zero results. It looks like this is a problem with how 
the filter is getting pushed down to Parquet. Specifically, the pushdown 
includes both the “is not null” filter, AND “not(is not null)”, which would 
obviously result in zero matches. An example below:

pyspark:
> x = spark.sql('select my_id from my_table where username is not null')
> y = spark.sql('select my_id from my_table where not(username is not null)')   
>   
>
> x.explain()
== Physical Plan ==
*Project [my_id#6L]
+- *Filter isnotnull(username#91)
   +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
   Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
   PartitionFilters: [], PushedFilters: [IsNotNull(username)],
   ReadSchema: struct
[1159]> y.explain()
== Physical Plan ==
*Project [my_id#6L]
+- *Filter (isnotnull(username#91) && NOT isnotnull(username#91))username
   +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
   Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
   PartitionFilters: [],
   PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],username
   ReadSchema: struct

Presently I’m working around this by using the new functionality of NOT EXISTS 
in Spark 2, but that seems like overkill.

Any help appreciated.

Alexi Kostibas
Engineering
Nuna
650 Townsend Street, Suite 425
San Francisco, CA 94103