[jira] [Updated] (SPARK-17593) list files on s3 very slow

2016-09-19 Thread Gaurav Shah (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gaurav Shah updated SPARK-17593:

Description: 
lets say we have following partitioned data:
{code}
events_v3
-- event_date=2015-01-01
 event_hour=0
-- verb=follow
part1.parquet.gz 
 event_hour=1
-- verb=click
part1.parquet.gz 
-- event_date=2015-01-02
 event_hour=5
-- verb=follow
part1.parquet.gz 
 event_hour=10
-- verb=click
part1.parquet.gz 
{code}
To read (or write ) parquet partitioned data via spark it makes call to 
`ListingFileCatalog.listLeafFiles` .  Which recursively tries to list all files 
and folders.

In this case if we had 300 dates, we would have created 300 jobs each trying to 
get filelist from date_directory. This process takes about 10 minutes to finish 
( with 2 executors). vs if I use a ruby script to get list of all files 
recursively in the same folder it takes about 1 minute, on the same machine 
with just 1 thread. 

I am confused as to why it would take so much time extra for listing files.
spark code:
{code:scala}
val sparkSession = org.apache.spark.sql.SparkSession.builder
.config("spark.sql.hive.metastorePartitionPruning",true)
.config("spark.sql.parquet.filterPushdown", true)
.config("spark.sql.hive.verifyPartitionPath", false)
.config("spark.sql.hive.convertMetastoreParquet.mergeSchema",false)
.config("parquet.enable.summary-metadata",false)
.config("spark.sql.sources.partitionDiscovery.enabled",false)

.getOrCreate()
val df = 
sparkSession.read.option("mergeSchema","false").format("parquet").load("s3n://bucket_name/events_v3")
df.createOrReplaceTempView("temp_events")
sparkSession.sql(
  """
|select verb,count(*) from temp_events where event_date = "2016-08-05" 
group by verb
  """.stripMargin).show()
{code}

ruby code:
{code:ruby}
gem 'aws-sdk', '~> 2'
require 'aws-sdk'
client = Aws::S3::Client.new(:region=>'us-west-1')
next_continuation_token = nil
total = 0
loop do
a= client.list_objects_v2({
  bucket: "bucket", # required
  max_keys: 1000,
  prefix: "events_v3/",
  continuation_token: next_continuation_token ,
  fetch_owner: false,
})
puts a.contents.last.key
total += a.contents.size
next_continuation_token = a.next_continuation_token
break unless a.is_truncated
end

puts "total"
puts total
{code}

tried looking into following bug:
https://issues.apache.org/jira/browse/HADOOP-12810
but hadoop 2.7.3 doesn't solve that problem
stackoverflow reference:
http://stackoverflow.com/questions/39525288/spark-parquet-write-gets-slow-as-partitions-grow

  was:
lets say we have following partitioned data:
{code}
events_v3
-- event_date=2015-01-01
-- event_hour=0
   
  -- part1.parquet.gz
  -- event_date=2015-01-02
-- event_hour=5
  -- part1.parquet.gz
{code}
To read (or write ) parquet partitioned data via spark it makes call to 
`ListingFileCatalog.listLeafFiles` .  Which recursively tries to list all files 
and folders.

In this case if we had 300 dates, we would have created 300 jobs each trying to 
get filelist from date_directory. This process takes about 10 minutes to finish 
( with 2 executors). vs if I use a ruby script to get list of all files 
recursively in the same folder it takes about 1 minute, on the same machine 
with just 1 thread. 

I am confused as to why it would take so much time extra for listing files.
spark code:
{code:scala}
val sparkSession = org.apache.spark.sql.SparkSession.builder
.config("spark.sql.hive.metastorePartitionPruning",true)
.config("spark.sql.parquet.filterPushdown", true)
.config("spark.sql.hive.verifyPartitionPath", false)
.config("spark.sql.hive.convertMetastoreParquet.mergeSchema",false)
.config("parquet.enable.summary-metadata",false)
.config("spark.sql.sources.partitionDiscovery.enabled",false)

.getOrCreate()
val df = 
sparkSession.read.option("mergeSchema","false").format("parquet").load("s3n://bucket_name/events_v3")
df.createOrReplaceTempView("temp_events")
sparkSession.sql(
  """
|select verb,count(*) from temp_events where event_date = "2016-08-05" 
group by verb
  """.stripMargin).show()
{code}

ruby code:
{code:ruby}
gem 'aws-sdk', '~> 2'
require 'aws-sdk'
client = Aws::S3::Client.new(:region=>'us-west-1')
next_continuation_token = nil
total = 0
loop do
a= client.list_objects_v2({
  bucket: "bucket", # required
  max_keys: 1000,
  prefix: "events_v3/",
  continuation_token: next_continuation_token ,
  fetch_owner: false,
})
puts a.contents.last.key
total += a.contents.size
next_continuation_token = a.next_continuation_token
break unless a.is_truncated
end

puts "total"
puts total
{code}

tried looking into following bug:
https://issues.apache.org/jira/browse/HADOOP-12810
but hadoop 2.7.3 doesn't solve that problem
stackoverflow reference:

[jira] [Updated] (SPARK-17593) list files on s3 very slow

2016-09-19 Thread Gaurav Shah (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gaurav Shah updated SPARK-17593:

Description: 
lets say we have following partitioned data:
{code}
events_v3
-- event_date=2015-01-01
-- event_hour=0
   
  -- part1.parquet.gz
  -- event_date=2015-01-02
-- event_hour=5
  -- part1.parquet.gz
{code}
To read (or write ) parquet partitioned data via spark it makes call to 
`ListingFileCatalog.listLeafFiles` .  Which recursively tries to list all files 
and folders.

In this case if we had 300 dates, we would have created 300 jobs each trying to 
get filelist from date_directory. This process takes about 10 minutes to finish 
( with 2 executors). vs if I use a ruby script to get list of all files 
recursively in the same folder it takes about 1 minute, on the same machine 
with just 1 thread. 

I am confused as to why it would take so much time extra for listing files.
spark code:
{code:scala}
val sparkSession = org.apache.spark.sql.SparkSession.builder
.config("spark.sql.hive.metastorePartitionPruning",true)
.config("spark.sql.parquet.filterPushdown", true)
.config("spark.sql.hive.verifyPartitionPath", false)
.config("spark.sql.hive.convertMetastoreParquet.mergeSchema",false)
.config("parquet.enable.summary-metadata",false)
.config("spark.sql.sources.partitionDiscovery.enabled",false)

.getOrCreate()
val df = 
sparkSession.read.option("mergeSchema","false").format("parquet").load("s3n://bucket_name/events_v3")
df.createOrReplaceTempView("temp_events")
sparkSession.sql(
  """
|select verb,count(*) from temp_events where event_date = "2016-08-05" 
group by verb
  """.stripMargin).show()
{code}

ruby code:
{code:ruby}
gem 'aws-sdk', '~> 2'
require 'aws-sdk'
client = Aws::S3::Client.new(:region=>'us-west-1')
next_continuation_token = nil
total = 0
loop do
a= client.list_objects_v2({
  bucket: "bucket", # required
  max_keys: 1000,
  prefix: "events_v3/",
  continuation_token: next_continuation_token ,
  fetch_owner: false,
})
puts a.contents.last.key
total += a.contents.size
next_continuation_token = a.next_continuation_token
break unless a.is_truncated
end

puts "total"
puts total
{code}

tried looking into following bug:
https://issues.apache.org/jira/browse/HADOOP-12810
but hadoop 2.7.3 doesn't solve that problem
stackoverflow reference:
http://stackoverflow.com/questions/39525288/spark-parquet-write-gets-slow-as-partitions-grow

  was:
lets say we have following partitioned data:
{code}
events_v3
  -- event_date=2015-01-01
-- event_hour=2015-01-1
  -- part1.parquet.gz
  -- event_date=2015-01-02
-- event_hour=5
  -- part1.parquet.gz
{code}
To read (or write ) parquet partitioned data via spark it makes call to 
`ListingFileCatalog.listLeafFiles` .  Which recursively tries to list all files 
and folders.

In this case if we had 300 dates, we would have created 300 jobs each trying to 
get filelist from date_directory. This process takes about 10 minutes to finish 
( with 2 executors). vs if I use a ruby script to get list of all files 
recursively in the same folder it takes about 1 minute, on the same machine 
with just 1 thread. 

I am confused as to why it would take so much time extra for listing files.
spark code:
{code:scala}
val sparkSession = org.apache.spark.sql.SparkSession.builder
.config("spark.sql.hive.metastorePartitionPruning",true)
.config("spark.sql.parquet.filterPushdown", true)
.config("spark.sql.hive.verifyPartitionPath", false)
.config("spark.sql.hive.convertMetastoreParquet.mergeSchema",false)
.config("parquet.enable.summary-metadata",false)
.config("spark.sql.sources.partitionDiscovery.enabled",false)

.getOrCreate()
val df = 
sparkSession.read.option("mergeSchema","false").format("parquet").load("s3n://bucket_name/events_v3")
df.createOrReplaceTempView("temp_events")
sparkSession.sql(
  """
|select verb,count(*) from temp_events where event_date = "2016-08-05" 
group by verb
  """.stripMargin).show()
{code}

ruby code:
{code:ruby}
gem 'aws-sdk', '~> 2'
require 'aws-sdk'
client = Aws::S3::Client.new(:region=>'us-west-1')
next_continuation_token = nil
total = 0
loop do
a= client.list_objects_v2({
  bucket: "bucket", # required
  max_keys: 1000,
  prefix: "events_v3/",
  continuation_token: next_continuation_token ,
  fetch_owner: false,
})
puts a.contents.last.key
total += a.contents.size
next_continuation_token = a.next_continuation_token
break unless a.is_truncated
end

puts "total"
puts total
{code}

tried looking into following bug:
https://issues.apache.org/jira/browse/HADOOP-12810
but hadoop 2.7.3 doesn't solve that problem
stackoverflow reference:
http://stackoverflow.com/questions/39525288/spark-parquet-write-gets-slow-as-partitions-grow


> list files on s3 very slow
> --
>
> Key: SPARK-17593
> URL: