Re: Spark/Parquet/Statistics question

2017-11-21 Thread Rabin Banerjee
Spark is not adding any STAT meta in parquet files in Version 1.6.x.
Scanning all files for filter.

(1 to 30).map(i => (i, i.toString)).toDF("a",
"b").sort("a").coalesce(1).write.format("parquet").saveAsTable("metrics")

./parquet-meta /user/hive/warehouse/metrics/*.parquet

file:
file:/user/hive/warehouse/metrics/part-r-0-6552bc8f-ec05-4ce8-ad8d-dc22bdd2e502.gz.parquet


creator: parquet-mr version 1.6.0

extra:   org.apache.spark.sql.parquet.row.metadata =
{"type":"struct","fields":[{"name":"a","type":"integer","nullable":true,"metadata":{}},{"name":"b","type":"string","nullable":true,"metadata":{}}]}



file schema: spark_schema



a:   OPTIONAL INT32 R:0 D:1

b:   OPTIONAL BINARY O:UTF8 R:0 D:1


row group 1: RC:30 TS:4089139 OFFSET:4



a:INT32 GZIP DO:0 FPO:4 SZ:415087/1200095/2.89 VC:30
ENC:BIT_PACKED,RLE,PLAIN

b:BINARY GZIP DO:0 FPO:415091 SZ:667334/2889044/4.33 VC:30
ENC:BIT_PACKED,RLE,PLAIN

On Tue, Jan 17, 2017 at 9:41 PM, Michael Segel 
wrote:

> Hi,
> Lexicographically speaking, Min/Max should work because String(s)  support
> a comparator operator.  So anything which supports an equality test (<,>,
> <= , >= , == …) can also support min and max functions as well.
>
> I guess the question is if Spark does support this, and if not, why?
> Yes, it makes sense.
>
>
>
> > On Jan 17, 2017, at 9:17 AM, Jörn Franke  wrote:
> >
> > Hallo,
> >
> > I am not sure what you mean by min/max for strings. I do not know if
> this makes sense. What the ORC format has is bloom filters for strings etc.
> - are you referring to this?
> >
> > In order to apply min/max filters Spark needs to read the meta data of
> the file. If the filter is applied or not - this you can see from the
> number of bytes read.
> >
> >
> > Best regards
> >
> >> On 17 Jan 2017, at 15:28, djiang  wrote:
> >>
> >> Hi,
> >>
> >> I have been looking into how Spark stores statistics (min/max) in
> Parquet as
> >> well as how it uses the info for query optimization.
> >> I have got a few questions.
> >> First setup: Spark 2.1.0, the following sets up a Dataframe of 1000
> rows,
> >> with a long type and a string type column.
> >> They are sorted by different columns, though.
> >>
> >> scala> spark.sql("select id, cast(id as string) text from
> >> range(1000)").sort("id").write.parquet("/secret/spark21-sortById")
> >> scala> spark.sql("select id, cast(id as string) text from
> >> range(1000)").sort("Text").write.parquet("/secret/spark21-sortByText")
> >>
> >> I added some code to parquet-tools to print out stats and examine the
> >> generated parquet files:
> >>
> >> hadoop jar parquet-tools-1.9.1-SNAPSHOT.jar meta
> >> /secret/spark21-sortById/part-0-39f7ac12-6038-46ee-b5c3-
> d7a5a06e4425.snappy.parquet
> >> file:
> >> file:/secret/spark21-sortById/part-0-39f7ac12-6038-46ee-
> b5c3-d7a5a06e4425.snappy.parquet
> >> creator: parquet-mr version 1.8.1 (build
> >> 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)
> >> extra:   org.apache.spark.sql.parquet.row.metadata =
> >> {"type":"struct","fields":[{"name":"id","type":"long","
> nullable":false,"metadata":{}},{"name":"text","type":"
> string","nullable":false,"metadata":{}}]}
> >>
> >> file schema: spark_schema
> >> 
> 
> >> id:  REQUIRED INT64 R:0 D:0
> >> text:REQUIRED BINARY O:UTF8 R:0 D:0
> >>
> >> row group 1: RC:5 TS:133 OFFSET:4
> >> 
> 
> >> id:   INT64 SNAPPY DO:0 FPO:4 SZ:71/81/1.14 VC:5
> >> ENC:PLAIN,BIT_PACKED STA:[min: 0, max: 4, num_nulls: 0]
> >> text: BINARY SNAPPY DO:0 FPO:75 SZ:53/52/0.98 VC:5
> >> ENC:PLAIN,BIT_PACKED
> >>
> >> hadoop jar parquet-tools-1.9.1-SNAPSHOT.jar meta
> >> /secret/spark21-sortByText/part-0-3d7eac74-5ca0-44a0-
> b8a6-d67cc38a2bde.snappy.parquet
> >> file:
> >> file:/secret/spark21-sortByText/part-0-3d7eac74-5ca0-44a0-b8a6-
> d67cc38a2bde.snappy.parquet
> >> creator: parquet-mr version 1.8.1 (build
> >> 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)
> >> extra:   org.apache.spark.sql.parquet.row.metadata =
> >> {"type":"struct","fields":[{"name":"id","type":"long","
> nullable":false,"metadata":{}},{"name":"text","type":"
> string","nullable":false,"metadata":{}}]}
> >>
> >> file schema: spark_schema
> >> 
> 
> >> id:  REQUIRED INT64 R:0 D:0
> >> text:REQUIRED BINARY O:UTF8 R:0 D:0
> >>
> >> row group 1: RC:5 TS:140 OFFSET:4
> >> 

Re: Spark/Parquet/Statistics question

2017-01-17 Thread Michael Segel
Hi, 
Lexicographically speaking, Min/Max should work because String(s)  support a 
comparator operator.  So anything which supports an equality test (<,>, <= , >= 
, == …) can also support min and max functions as well. 

I guess the question is if Spark does support this, and if not, why? 
Yes, it makes sense. 



> On Jan 17, 2017, at 9:17 AM, Jörn Franke  wrote:
> 
> Hallo,
> 
> I am not sure what you mean by min/max for strings. I do not know if this 
> makes sense. What the ORC format has is bloom filters for strings etc. - are 
> you referring to this? 
> 
> In order to apply min/max filters Spark needs to read the meta data of the 
> file. If the filter is applied or not - this you can see from the number of 
> bytes read.
> 
> 
> Best regards
> 
>> On 17 Jan 2017, at 15:28, djiang  wrote:
>> 
>> Hi, 
>> 
>> I have been looking into how Spark stores statistics (min/max) in Parquet as
>> well as how it uses the info for query optimization.
>> I have got a few questions.
>> First setup: Spark 2.1.0, the following sets up a Dataframe of 1000 rows,
>> with a long type and a string type column.
>> They are sorted by different columns, though.
>> 
>> scala> spark.sql("select id, cast(id as string) text from
>> range(1000)").sort("id").write.parquet("/secret/spark21-sortById")
>> scala> spark.sql("select id, cast(id as string) text from
>> range(1000)").sort("Text").write.parquet("/secret/spark21-sortByText")
>> 
>> I added some code to parquet-tools to print out stats and examine the
>> generated parquet files:
>> 
>> hadoop jar parquet-tools-1.9.1-SNAPSHOT.jar meta
>> /secret/spark21-sortById/part-0-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet
>>  
>> file:   
>> file:/secret/spark21-sortById/part-0-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet
>>  
>> creator: parquet-mr version 1.8.1 (build
>> 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) 
>> extra:   org.apache.spark.sql.parquet.row.metadata =
>> {"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}},{"name":"text","type":"string","nullable":false,"metadata":{}}]}
>>  
>> 
>> file schema: spark_schema 
>> 
>> id:  REQUIRED INT64 R:0 D:0
>> text:REQUIRED BINARY O:UTF8 R:0 D:0
>> 
>> row group 1: RC:5 TS:133 OFFSET:4 
>> 
>> id:   INT64 SNAPPY DO:0 FPO:4 SZ:71/81/1.14 VC:5
>> ENC:PLAIN,BIT_PACKED STA:[min: 0, max: 4, num_nulls: 0]
>> text: BINARY SNAPPY DO:0 FPO:75 SZ:53/52/0.98 VC:5
>> ENC:PLAIN,BIT_PACKED
>> 
>> hadoop jar parquet-tools-1.9.1-SNAPSHOT.jar meta
>> /secret/spark21-sortByText/part-0-3d7eac74-5ca0-44a0-b8a6-d67cc38a2bde.snappy.parquet
>>  
>> file:   
>> file:/secret/spark21-sortByText/part-0-3d7eac74-5ca0-44a0-b8a6-d67cc38a2bde.snappy.parquet
>>  
>> creator: parquet-mr version 1.8.1 (build
>> 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) 
>> extra:   org.apache.spark.sql.parquet.row.metadata =
>> {"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}},{"name":"text","type":"string","nullable":false,"metadata":{}}]}
>>  
>> 
>> file schema: spark_schema 
>> 
>> id:  REQUIRED INT64 R:0 D:0
>> text:REQUIRED BINARY O:UTF8 R:0 D:0
>> 
>> row group 1: RC:5 TS:140 OFFSET:4 
>> 
>> id:   INT64 SNAPPY DO:0 FPO:4 SZ:71/81/1.14 VC:5
>> ENC:PLAIN,BIT_PACKED STA:[min: 0, max: 101, num_nulls: 0]
>> text: BINARY SNAPPY DO:0 FPO:75 SZ:60/59/0.98 VC:5
>> ENC:PLAIN,BIT_PACKED
>> 
>> So the question is why is Spark, particularly, 2.1.0, only generate min/max
>> for numeric columns, but not strings(BINARY) fields, even if the string
>> field is included in the sort? Maybe I missed a configuraiton?
>> 
>> The second issue, is how can I confirm Spark is utilizing the min/max?
>> scala> sc.setLogLevel("INFO")
>> scala> spark.sql("select * from parquet.`/secret/spark21-sortById` where
>> id=4").show
>> I got many lines like this:
>> 17/01/17 09:23:35 INFO FilterCompat: Filtering using predicate:
>> and(noteq(id, null), eq(id, 4))
>> 17/01/17 09:23:35 INFO FileScanRDD: Reading File path:
>> file:///secret/spark21-sortById/part-0-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet,
>> range: 0-558, partition values: [empty row]
>> ...
>> 17/01/17 09:23:35 INFO FilterCompat: Filtering using predicate:
>> and(noteq(id, null), eq(id, 4))
>> 17/01/17 09:23:35 INFO FileScanRDD: Reading File path:
>> file:///secret/spark21-sortById/part-00193-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet,
>> range: 0-574, partition values: [empty row]
>> ...
>> 
>> The question is it looks like Spark is scanning every file, even if from the
>> min/max, 

Re: Spark/Parquet/Statistics question

2017-01-17 Thread Dong Jiang
Hi,

Thanks for the response.
I am referring to a presentation by Ryan Blue. 
http://www.slideshare.net/RyanBlue3/parquet-performance-tuning-the-missing-guide
Specifically, on page 27, clearly, you can generate min/max for BINARY. I don’t 
know, for sure, if this is generated by Spark or Pig.
In terms of filtering, is reviewing the number of bytes read, the only way to 
figure out? It is not direct evidence if file skipping is working or not. If I 
understand correctly, Spark will first read min/max value to determine which 
files can be skipped and then start a second read to get the actual data, thus 
reading from number of bytes read can also be tricky.

Thanks,

On 1/17/17, 10:17 AM, "Jörn Franke"  wrote:

Hallo,

I am not sure what you mean by min/max for strings. I do not know if this 
makes sense. What the ORC format has is bloom filters for strings etc. - are 
you referring to this? 

In order to apply min/max filters Spark needs to read the meta data of the 
file. If the filter is applied or not - this you can see from the number of 
bytes read.


Best regards

> On 17 Jan 2017, at 15:28, djiang  wrote:
> 
> Hi, 
> 
> I have been looking into how Spark stores statistics (min/max) in Parquet 
as
> well as how it uses the info for query optimization.
> I have got a few questions.
> First setup: Spark 2.1.0, the following sets up a Dataframe of 1000 rows,
> with a long type and a string type column.
> They are sorted by different columns, though.
> 
> scala> spark.sql("select id, cast(id as string) text from
> range(1000)").sort("id").write.parquet("/secret/spark21-sortById")
> scala> spark.sql("select id, cast(id as string) text from
> range(1000)").sort("Text").write.parquet("/secret/spark21-sortByText")
> 
> I added some code to parquet-tools to print out stats and examine the
> generated parquet files:
> 
> hadoop jar parquet-tools-1.9.1-SNAPSHOT.jar meta
> 
/secret/spark21-sortById/part-0-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet
 
> file:   
> 
file:/secret/spark21-sortById/part-0-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet
 
> creator: parquet-mr version 1.8.1 (build
> 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) 
> extra:   org.apache.spark.sql.parquet.row.metadata =
> 
{"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}},{"name":"text","type":"string","nullable":false,"metadata":{}}]}
 
> 
> file schema: spark_schema 
> 

> id:  REQUIRED INT64 R:0 D:0
> text:REQUIRED BINARY O:UTF8 R:0 D:0
> 
> row group 1: RC:5 TS:133 OFFSET:4 
> 

> id:   INT64 SNAPPY DO:0 FPO:4 SZ:71/81/1.14 VC:5
> ENC:PLAIN,BIT_PACKED STA:[min: 0, max: 4, num_nulls: 0]
> text: BINARY SNAPPY DO:0 FPO:75 SZ:53/52/0.98 VC:5
> ENC:PLAIN,BIT_PACKED
> 
> hadoop jar parquet-tools-1.9.1-SNAPSHOT.jar meta
> 
/secret/spark21-sortByText/part-0-3d7eac74-5ca0-44a0-b8a6-d67cc38a2bde.snappy.parquet
 
> file:   
> 
file:/secret/spark21-sortByText/part-0-3d7eac74-5ca0-44a0-b8a6-d67cc38a2bde.snappy.parquet
 
> creator: parquet-mr version 1.8.1 (build
> 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) 
> extra:   org.apache.spark.sql.parquet.row.metadata =
> 
{"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}},{"name":"text","type":"string","nullable":false,"metadata":{}}]}
 
> 
> file schema: spark_schema 
> 

> id:  REQUIRED INT64 R:0 D:0
> text:REQUIRED BINARY O:UTF8 R:0 D:0
> 
> row group 1: RC:5 TS:140 OFFSET:4 
> 

> id:   INT64 SNAPPY DO:0 FPO:4 SZ:71/81/1.14 VC:5
> ENC:PLAIN,BIT_PACKED STA:[min: 0, max: 101, num_nulls: 0]
> text: BINARY SNAPPY DO:0 FPO:75 SZ:60/59/0.98 VC:5
> ENC:PLAIN,BIT_PACKED
> 
> So the question is why is Spark, particularly, 2.1.0, only generate 
min/max
> for numeric columns, but not strings(BINARY) fields, even if the string
> field is included in the sort? Maybe I missed a configuraiton?
> 
> The second issue, is how can I confirm Spark is utilizing the min/max?
> scala> sc.setLogLevel("INFO")
> scala> spark.sql("select * from parquet.`/secret/spark21-sortById` where
> id=4").show
> I got many lines like this:
> 17/01/17 09:23:35 INFO FilterCompat: Filtering using predicate:
> and(noteq(id, null), eq(id, 4))
> 17/01/17 09:23:35 INFO FileScanRDD: Reading 

Re: Spark/Parquet/Statistics question

2017-01-17 Thread Jörn Franke
Hallo,

I am not sure what you mean by min/max for strings. I do not know if this makes 
sense. What the ORC format has is bloom filters for strings etc. - are you 
referring to this? 

In order to apply min/max filters Spark needs to read the meta data of the 
file. If the filter is applied or not - this you can see from the number of 
bytes read.


Best regards

> On 17 Jan 2017, at 15:28, djiang  wrote:
> 
> Hi, 
> 
> I have been looking into how Spark stores statistics (min/max) in Parquet as
> well as how it uses the info for query optimization.
> I have got a few questions.
> First setup: Spark 2.1.0, the following sets up a Dataframe of 1000 rows,
> with a long type and a string type column.
> They are sorted by different columns, though.
> 
> scala> spark.sql("select id, cast(id as string) text from
> range(1000)").sort("id").write.parquet("/secret/spark21-sortById")
> scala> spark.sql("select id, cast(id as string) text from
> range(1000)").sort("Text").write.parquet("/secret/spark21-sortByText")
> 
> I added some code to parquet-tools to print out stats and examine the
> generated parquet files:
> 
> hadoop jar parquet-tools-1.9.1-SNAPSHOT.jar meta
> /secret/spark21-sortById/part-0-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet
>  
> file:   
> file:/secret/spark21-sortById/part-0-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet
>  
> creator: parquet-mr version 1.8.1 (build
> 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) 
> extra:   org.apache.spark.sql.parquet.row.metadata =
> {"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}},{"name":"text","type":"string","nullable":false,"metadata":{}}]}
>  
> 
> file schema: spark_schema 
> 
> id:  REQUIRED INT64 R:0 D:0
> text:REQUIRED BINARY O:UTF8 R:0 D:0
> 
> row group 1: RC:5 TS:133 OFFSET:4 
> 
> id:   INT64 SNAPPY DO:0 FPO:4 SZ:71/81/1.14 VC:5
> ENC:PLAIN,BIT_PACKED STA:[min: 0, max: 4, num_nulls: 0]
> text: BINARY SNAPPY DO:0 FPO:75 SZ:53/52/0.98 VC:5
> ENC:PLAIN,BIT_PACKED
> 
> hadoop jar parquet-tools-1.9.1-SNAPSHOT.jar meta
> /secret/spark21-sortByText/part-0-3d7eac74-5ca0-44a0-b8a6-d67cc38a2bde.snappy.parquet
>  
> file:   
> file:/secret/spark21-sortByText/part-0-3d7eac74-5ca0-44a0-b8a6-d67cc38a2bde.snappy.parquet
>  
> creator: parquet-mr version 1.8.1 (build
> 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) 
> extra:   org.apache.spark.sql.parquet.row.metadata =
> {"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{}},{"name":"text","type":"string","nullable":false,"metadata":{}}]}
>  
> 
> file schema: spark_schema 
> 
> id:  REQUIRED INT64 R:0 D:0
> text:REQUIRED BINARY O:UTF8 R:0 D:0
> 
> row group 1: RC:5 TS:140 OFFSET:4 
> 
> id:   INT64 SNAPPY DO:0 FPO:4 SZ:71/81/1.14 VC:5
> ENC:PLAIN,BIT_PACKED STA:[min: 0, max: 101, num_nulls: 0]
> text: BINARY SNAPPY DO:0 FPO:75 SZ:60/59/0.98 VC:5
> ENC:PLAIN,BIT_PACKED
> 
> So the question is why is Spark, particularly, 2.1.0, only generate min/max
> for numeric columns, but not strings(BINARY) fields, even if the string
> field is included in the sort? Maybe I missed a configuraiton?
> 
> The second issue, is how can I confirm Spark is utilizing the min/max?
> scala> sc.setLogLevel("INFO")
> scala> spark.sql("select * from parquet.`/secret/spark21-sortById` where
> id=4").show
> I got many lines like this:
> 17/01/17 09:23:35 INFO FilterCompat: Filtering using predicate:
> and(noteq(id, null), eq(id, 4))
> 17/01/17 09:23:35 INFO FileScanRDD: Reading File path:
> file:///secret/spark21-sortById/part-0-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet,
> range: 0-558, partition values: [empty row]
> ...
> 17/01/17 09:23:35 INFO FilterCompat: Filtering using predicate:
> and(noteq(id, null), eq(id, 4))
> 17/01/17 09:23:35 INFO FileScanRDD: Reading File path:
> file:///secret/spark21-sortById/part-00193-39f7ac12-6038-46ee-b5c3-d7a5a06e4425.snappy.parquet,
> range: 0-574, partition values: [empty row]
> ...
> 
> The question is it looks like Spark is scanning every file, even if from the
> min/max, Spark should be able to determine only part-0 has the relevant
> data. Or maybe I read it wrong, that Spark is skipping the files? Maybe
> Spark can only use partition value for data skipping?
> 
> Thanks,
> 
> Dong
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Parquet-Statistics-question-tp28312.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
>