Re: possible issues with listing objects in the HadoopFSrelation

2015-08-12 Thread Cheng Lian

Hi Gil,

Sorry for the late reply and thanks for raising this question. The file 
listing logic in HadoopFsRelation is intentionally made different from 
Hadoop FileInputFormat. Here are the reasons:


1. Efficiency: when computing RDD partitions, 
FileInputFormat.listStatus() is called on the driver side in a 
sequential manner, and can be slow for S3 directories with lots of 
sub-directories, e.g. partitioned tables with thousands or even more 
partitions. This is partly because file metadata operation can be very 
slow on S3. HadoopFsRelation relies on this file listing action to do 
partition discovery, and we've made a distributed parallel version in 
Spark 1.5: we first list input paths on driver side in a sequential 
breadth-first manner, and once we find the number of directories to be 
listed exceeds a threshold (32 by default), we launch a Spark job to do 
file listing. With this mechanism, we've observed 2 orders of magnitude 
performance boost when reading partitioned table with thousands of 
distinct partitions located on S3.


2. Semantics difference: the default hiddenFileFilter doesn't apply in 
every cases. For example, Parquet summary files _metadata and 
_common_metadata plays crucial roles in schema discovery and schema 
merging, and we don't want to exclude them when listing the files. But 
they are removed when reading the actual data. However, we probably 
should allow users to pass in user defined path filters.


Cheng

On 8/10/15 7:55 PM, Gil Vernik wrote:

Just some thoughts, hope i didn't missed something obvious.

HadoopFSRelation calls directly FileSystem class to list files in the 
path.
It looks like it implements basically the same logic as in the 
FileInputFormat.listStatus method ( located in 
hadoop-map-reduce-client-core)


The point is that HadoopRDD (or similar ) calls getSplits method that 
calls FileInputFormat.listStatus, while HadoopFSRelation calls 
FileSystem directly and both of them try to achieve listing of objects.


There might be various issues with this, for example this one 
https://issues.apache.org/jira/browse/SPARK-7868makes sure that 
_temporary is not returned in a result, but the the listing of 
FileInputFormat contains more logic,  it uses hidden PathFilter like this


*private**static**final*PathFilter */hiddenFileFilter/*= 
*new*PathFilter(){

*public**boolean*accept(Path p){
String name= p.getName();
*return*!name.startsWith(_)  !name.startsWith(.);
  }
};

In addition, custom FileOutputCommitter, may use other name than 
_temporary .


All this may lead that HadoopFSrelation and HadoopRDD will provide 
different lists from the same data source.


My question is: what the roadmap for this listing in HadoopFSrelation. 
Will it implement exactly the same logic like in 
FileInputFormat.listStatus, or may be one day HadoopFSrelation will 
call FileInputFormat.listStatus and provide custom PathFilter or 
MultiPathFilter? This way there will be single  code that list objects.


Thanks,
Gil.






Re: possible issues with listing objects in the HadoopFSrelation

2015-08-12 Thread Gil Vernik
Hi Cheng,

Thanks a lot for responding to it.
I still miss some points in the Efficiency and i would be very thankful if 
you will expand it little bit more.
As i see it, both HadoopFSRelation and FileInputFormat.listStatus perform 
lists and eventually both calls to FileSystem.listStatus method.
FileInputFormat.listStatus does not need to be sequential, it also can 
create many threads to list objects from the same data source.
   int numThreads = job
.getInt(
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.
LIST_STATUS_NUM_THREADS,
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.
DEFAULT_LIST_STATUS_NUM_THREADS);

But i guess if you call FileSystem directly without calling intermediate 
FileInputFormat - it may be faster, just because there is less code to 
execute?

I want to make sure i understand your correctly. So HadoopFSRelation will 
create partitions by itself, perform listing, etc.. it will also be in 
parallel in 1.5.0.  But the code inside Spark that use HadoopRDD will 
still rely on the partitions provided from FileInputFormat?
For example if I have 1 objects inside some bucket and i access this 
via sparkContext.hadoopFile than FileInputFormat will provide all the 
partitions and splits, but if i will access the same bucket from some code 
that relies on HadoopFSRelation than partitions will be created by 
HadoopFSRelation?

Thanks
Gil.


 



From:   Cheng Lian lian.cs@gmail.com
To: Gil Vernik/Haifa/IBM@IBMIL, Dev dev@spark.apache.org
Date:   12/08/2015 10:51
Subject:Re: possible issues with listing objects in the 
HadoopFSrelation



Hi Gil,

Sorry for the late reply and thanks for raising this question. The file 
listing logic in HadoopFsRelation is intentionally made different from 
Hadoop FileInputFormat. Here are the reasons:

1. Efficiency: when computing RDD partitions, FileInputFormat.listStatus() 
is called on the driver side in a sequential manner, and can be slow for 
S3 directories with lots of sub-directories, e.g. partitioned tables with 
thousands or even more partitions. This is partly because file metadata 
operation can be very slow on S3. HadoopFsRelation relies on this file 
listing action to do partition discovery, and we've made a distributed 
parallel version in Spark 1.5: we first list input paths on driver side in 
a sequential breadth-first manner, and once we find the number of 
directories to be listed exceeds a threshold (32 by default), we launch a 
Spark job to do file listing. With this mechanism, we've observed 2 orders 
of magnitude performance boost when reading partitioned table with 
thousands of distinct partitions located on S3.

2. Semantics difference: the default hiddenFileFilter doesn't apply in 
every cases. For example, Parquet summary files _metadata and 
_common_metadata plays crucial roles in schema discovery and schema 
merging, and we don't want to exclude them when listing the files. But 
they are removed when reading the actual data. However, we probably should 
allow users to pass in user defined path filters.

Cheng

On 8/10/15 7:55 PM, Gil Vernik wrote:
Just some thoughts, hope i didn't missed something obvious. 

HadoopFSRelation calls directly FileSystem class to list files in the 
path. 
It looks like it implements basically the same logic as in the 
FileInputFormat.listStatus method ( located in 
hadoop-map-reduce-client-core) 

The point is that HadoopRDD (or similar ) calls getSplits method that 
calls FileInputFormat.listStatus, while HadoopFSRelation calls FileSystem 
directly and both of them try to achieve listing of objects. 

There might be various issues with this, for example this one 
https://issues.apache.org/jira/browse/SPARK-7868 makes sure that 
_temporary is not returned in a result, but the the listing of 
FileInputFormat contains more logic,  it uses hidden PathFilter like this 

  private static final PathFilter hiddenFileFilter = new PathFilter(){ 
  public boolean accept(Path p){ 
String name = p.getName(); 
return !name.startsWith(_)  !name.startsWith(.); 
  } 
}; 

In addition, custom FileOutputCommitter, may use other name than 
_temporary . 

All this may lead that HadoopFSrelation and HadoopRDD will provide 
different lists from the same data source. 

My question is: what the roadmap for this listing in HadoopFSrelation. 
Will it implement exactly the same logic like in 
FileInputFormat.listStatus, or may be one day HadoopFSrelation will call 
FileInputFormat.listStatus and provide custom PathFilter or 
MultiPathFilter? This way there will be single  code that list objects. 

Thanks, 
Gil.