[jira] [Updated] (SPARK-17593) list files on s3 very slow
[ https://issues.apache.org/jira/browse/SPARK-17593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gaurav Shah updated SPARK-17593: Description: lets say we have following partitioned data: {code} events_v3 -- event_date=2015-01-01 event_hour=0 -- verb=follow part1.parquet.gz event_hour=1 -- verb=click part1.parquet.gz -- event_date=2015-01-02 event_hour=5 -- verb=follow part1.parquet.gz event_hour=10 -- verb=click part1.parquet.gz {code} To read (or write ) parquet partitioned data via spark it makes call to `ListingFileCatalog.listLeafFiles` . Which recursively tries to list all files and folders. In this case if we had 300 dates, we would have created 300 jobs each trying to get filelist from date_directory. This process takes about 10 minutes to finish ( with 2 executors). vs if I use a ruby script to get list of all files recursively in the same folder it takes about 1 minute, on the same machine with just 1 thread. I am confused as to why it would take so much time extra for listing files. spark code: {code:scala} val sparkSession = org.apache.spark.sql.SparkSession.builder .config("spark.sql.hive.metastorePartitionPruning",true) .config("spark.sql.parquet.filterPushdown", true) .config("spark.sql.hive.verifyPartitionPath", false) .config("spark.sql.hive.convertMetastoreParquet.mergeSchema",false) .config("parquet.enable.summary-metadata",false) .config("spark.sql.sources.partitionDiscovery.enabled",false) .getOrCreate() val df = sparkSession.read.option("mergeSchema","false").format("parquet").load("s3n://bucket_name/events_v3") df.createOrReplaceTempView("temp_events") sparkSession.sql( """ |select verb,count(*) from temp_events where event_date = "2016-08-05" group by verb """.stripMargin).show() {code} ruby code: {code:ruby} gem 'aws-sdk', '~> 2' require 'aws-sdk' client = Aws::S3::Client.new(:region=>'us-west-1') next_continuation_token = nil total = 0 loop do a= client.list_objects_v2({ bucket: "bucket", # required max_keys: 1000, prefix: "events_v3/", continuation_token: next_continuation_token , fetch_owner: false, }) puts a.contents.last.key total += a.contents.size next_continuation_token = a.next_continuation_token break unless a.is_truncated end puts "total" puts total {code} tried looking into following bug: https://issues.apache.org/jira/browse/HADOOP-12810 but hadoop 2.7.3 doesn't solve that problem stackoverflow reference: http://stackoverflow.com/questions/39525288/spark-parquet-write-gets-slow-as-partitions-grow was: lets say we have following partitioned data: {code} events_v3 -- event_date=2015-01-01 -- event_hour=0 -- part1.parquet.gz -- event_date=2015-01-02 -- event_hour=5 -- part1.parquet.gz {code} To read (or write ) parquet partitioned data via spark it makes call to `ListingFileCatalog.listLeafFiles` . Which recursively tries to list all files and folders. In this case if we had 300 dates, we would have created 300 jobs each trying to get filelist from date_directory. This process takes about 10 minutes to finish ( with 2 executors). vs if I use a ruby script to get list of all files recursively in the same folder it takes about 1 minute, on the same machine with just 1 thread. I am confused as to why it would take so much time extra for listing files. spark code: {code:scala} val sparkSession = org.apache.spark.sql.SparkSession.builder .config("spark.sql.hive.metastorePartitionPruning",true) .config("spark.sql.parquet.filterPushdown", true) .config("spark.sql.hive.verifyPartitionPath", false) .config("spark.sql.hive.convertMetastoreParquet.mergeSchema",false) .config("parquet.enable.summary-metadata",false) .config("spark.sql.sources.partitionDiscovery.enabled",false) .getOrCreate() val df = sparkSession.read.option("mergeSchema","false").format("parquet").load("s3n://bucket_name/events_v3") df.createOrReplaceTempView("temp_events") sparkSession.sql( """ |select verb,count(*) from temp_events where event_date = "2016-08-05" group by verb """.stripMargin).show() {code} ruby code: {code:ruby} gem 'aws-sdk', '~> 2' require 'aws-sdk' client = Aws::S3::Client.new(:region=>'us-west-1') next_continuation_token = nil total = 0 loop do a= client.list_objects_v2({ bucket: "bucket", # required max_keys: 1000, prefix: "events_v3/", continuation_token: next_continuation_token , fetch_owner: false, }) puts a.contents.last.key total += a.contents.size next_continuation_token = a.next_continuation_token break unless a.is_truncated end puts "total" puts total {code} tried looking into following bug: https://issues.apache.org/jira/browse/HADOOP-12810 but hadoop 2.7.3 doesn't solve that problem stackoverflow reference:
[jira] [Updated] (SPARK-17593) list files on s3 very slow
[ https://issues.apache.org/jira/browse/SPARK-17593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gaurav Shah updated SPARK-17593: Description: lets say we have following partitioned data: {code} events_v3 -- event_date=2015-01-01 -- event_hour=0 -- part1.parquet.gz -- event_date=2015-01-02 -- event_hour=5 -- part1.parquet.gz {code} To read (or write ) parquet partitioned data via spark it makes call to `ListingFileCatalog.listLeafFiles` . Which recursively tries to list all files and folders. In this case if we had 300 dates, we would have created 300 jobs each trying to get filelist from date_directory. This process takes about 10 minutes to finish ( with 2 executors). vs if I use a ruby script to get list of all files recursively in the same folder it takes about 1 minute, on the same machine with just 1 thread. I am confused as to why it would take so much time extra for listing files. spark code: {code:scala} val sparkSession = org.apache.spark.sql.SparkSession.builder .config("spark.sql.hive.metastorePartitionPruning",true) .config("spark.sql.parquet.filterPushdown", true) .config("spark.sql.hive.verifyPartitionPath", false) .config("spark.sql.hive.convertMetastoreParquet.mergeSchema",false) .config("parquet.enable.summary-metadata",false) .config("spark.sql.sources.partitionDiscovery.enabled",false) .getOrCreate() val df = sparkSession.read.option("mergeSchema","false").format("parquet").load("s3n://bucket_name/events_v3") df.createOrReplaceTempView("temp_events") sparkSession.sql( """ |select verb,count(*) from temp_events where event_date = "2016-08-05" group by verb """.stripMargin).show() {code} ruby code: {code:ruby} gem 'aws-sdk', '~> 2' require 'aws-sdk' client = Aws::S3::Client.new(:region=>'us-west-1') next_continuation_token = nil total = 0 loop do a= client.list_objects_v2({ bucket: "bucket", # required max_keys: 1000, prefix: "events_v3/", continuation_token: next_continuation_token , fetch_owner: false, }) puts a.contents.last.key total += a.contents.size next_continuation_token = a.next_continuation_token break unless a.is_truncated end puts "total" puts total {code} tried looking into following bug: https://issues.apache.org/jira/browse/HADOOP-12810 but hadoop 2.7.3 doesn't solve that problem stackoverflow reference: http://stackoverflow.com/questions/39525288/spark-parquet-write-gets-slow-as-partitions-grow was: lets say we have following partitioned data: {code} events_v3 -- event_date=2015-01-01 -- event_hour=2015-01-1 -- part1.parquet.gz -- event_date=2015-01-02 -- event_hour=5 -- part1.parquet.gz {code} To read (or write ) parquet partitioned data via spark it makes call to `ListingFileCatalog.listLeafFiles` . Which recursively tries to list all files and folders. In this case if we had 300 dates, we would have created 300 jobs each trying to get filelist from date_directory. This process takes about 10 minutes to finish ( with 2 executors). vs if I use a ruby script to get list of all files recursively in the same folder it takes about 1 minute, on the same machine with just 1 thread. I am confused as to why it would take so much time extra for listing files. spark code: {code:scala} val sparkSession = org.apache.spark.sql.SparkSession.builder .config("spark.sql.hive.metastorePartitionPruning",true) .config("spark.sql.parquet.filterPushdown", true) .config("spark.sql.hive.verifyPartitionPath", false) .config("spark.sql.hive.convertMetastoreParquet.mergeSchema",false) .config("parquet.enable.summary-metadata",false) .config("spark.sql.sources.partitionDiscovery.enabled",false) .getOrCreate() val df = sparkSession.read.option("mergeSchema","false").format("parquet").load("s3n://bucket_name/events_v3") df.createOrReplaceTempView("temp_events") sparkSession.sql( """ |select verb,count(*) from temp_events where event_date = "2016-08-05" group by verb """.stripMargin).show() {code} ruby code: {code:ruby} gem 'aws-sdk', '~> 2' require 'aws-sdk' client = Aws::S3::Client.new(:region=>'us-west-1') next_continuation_token = nil total = 0 loop do a= client.list_objects_v2({ bucket: "bucket", # required max_keys: 1000, prefix: "events_v3/", continuation_token: next_continuation_token , fetch_owner: false, }) puts a.contents.last.key total += a.contents.size next_continuation_token = a.next_continuation_token break unless a.is_truncated end puts "total" puts total {code} tried looking into following bug: https://issues.apache.org/jira/browse/HADOOP-12810 but hadoop 2.7.3 doesn't solve that problem stackoverflow reference: http://stackoverflow.com/questions/39525288/spark-parquet-write-gets-slow-as-partitions-grow > list files on s3 very slow > -- > > Key: SPARK-17593 > URL: