Re: Strategies for reading large numbers of files

2014-11-19 Thread soojin
Hi Landon,

I tried this but it didn't work for me. I get Task not serializable
exception:
Caused by: java.io.NotSerializableException:
org.apache.hadoop.conf.Configuration

How do you make org.apache.hadoop.conf.Configuration hadoopConfiguration
available to tasks?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Strategies-for-reading-large-numbers-of-files-tp15644p19314.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Strategies for reading large numbers of files

2014-10-21 Thread Landon Kuhn
Thanks to folks here for the suggestions. I ended up settling on what seems
to be a simple and scalable approach. I am no longer using
sparkContext.textFiles with wildcards (it is too slow when working with a
large number of files). Instead, I have implemented directory traversal as
a Spark job, which enables it to parallelize across the cluster.

First, a couple of functions. One to traverse directories, and another to
get the lines in a file:

  def list_file_names(path: String): Seq[String] = {
val fs = FileSystem.get(new java.net.URI(path), hadoopConfiguration)
def f(path: Path): Seq[String] = {
  Option(fs.listStatus(path)).getOrElse(Array[FileStatus]()).
  flatMap {
case fileStatus if fileStatus.isDir ⇒ f(fileStatus.getPath)
case fileStatus ⇒ Seq(fileStatus.getPath.toString)
  }
}
f(new Path(path))
  }

  def read_log_file(path: String): Seq[String] = {
val fs = FileSystem.get(new java.net.URI(path), hadoopConfiguration)
val file = fs.open(new Path(path))
val source = Source.fromInputStream(file)
source.getLines.toList
  }

Next, I generate a list of "root" paths to scan:

  val paths =
for {
  record_type ← record_types
  year ← years
  month ← months
  day ← days
  hour ← hours
} yield s"s3n://s3-bucket-name/$record_type/$year/$month/$day/$hour/"
  }

(In this case, I generate one path per hour per record type.)

Finally, using Spark, I can build an RDD with the contents of every file in
the path list:

val rdd: RDD[String] =
sparkContext.
parallelize(paths, paths.size).
flatMap(list_file_names).
flatMap(read_log_file)

I am posting this info here with the hope that it will be useful to
somebody in the future.

L


On Tue, Oct 7, 2014 at 12:58 AM, deenar.toraskar 
wrote:

> Hi Landon
>
> I had a problem very similar to your, where we have to process around 5
> million relatively small files on NFS. After trying various options, we did
> something similar to what Matei suggested.
>
> 1) take the original path and find the subdirectories under that path and
> then parallelize the resulting list. you can configure the depth you want
> to
> go down to before sending the paths across the cluster.
>
>   def getFileList(srcDir:File, depth:Int) : List[File] = {
> var list : ListBuffer[File] = new ListBuffer[File]()
> if (srcDir.isDirectory()) {
> srcDir.listFiles() .foreach((file: File) =>
>if (file.isFile()) {
>   list +=(file)
>} else {
>   if (depth > 0 ) {
>  list ++= getFileList(file, (depth- 1 ))
>   }
>else if (depth < 0) {
> list ++= getFileList(file, (depth))
>   }
>else {
>   list += file
>}
> })
> }
> else {
>list += srcDir
> }
> list .toList
>   }
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Strategies-for-reading-large-numbers-of-files-tp15644p15835.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
*Landon Kuhn*, *Software Architect*, Janrain, Inc. 
E: lan...@janrain.com | M: 971-645-5501 | F: 888-267-9025
Follow Janrain: Facebook  | Twitter
 | YouTube  | LinkedIn
 | Blog 
Follow Me: LinkedIn 
-
*Acquire, understand, and engage your users. Watch our video
 or sign up for a live demo
 to see what it's all about.*


Re: Strategies for reading large numbers of files

2014-10-07 Thread deenar.toraskar
Hi Landon 

I had a problem very similar to your, where we have to process around 5
million relatively small files on NFS. After trying various options, we did
something similar to what Matei suggested.

1) take the original path and find the subdirectories under that path and
then parallelize the resulting list. you can configure the depth you want to
go down to before sending the paths across the cluster.

  def getFileList(srcDir:File, depth:Int) : List[File] = { 
var list : ListBuffer[File] = new ListBuffer[File]() 
if (srcDir.isDirectory()) { 
srcDir.listFiles() .foreach((file: File) => 
   if (file.isFile()) { 
  list +=(file) 
   } else { 
  if (depth > 0 ) { 
 list ++= getFileList(file, (depth- 1 )) 
  } 
   else if (depth < 0) {
list ++= getFileList(file, (depth)) 
  }
   else { 
  list += file 
   } 
}) 
}
else {
   list += srcDir
}
list .toList 
  }





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Strategies-for-reading-large-numbers-of-files-tp15644p15835.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Strategies for reading large numbers of files

2014-10-06 Thread Matei Zaharia
The problem is that listing the metadata for all these files in S3 takes a long 
time. Something you can try is the following: split your files into several 
non-overlapping paths (e.g. s3n://bucket/purchase/2014/01, 
s3n://bucket/purchase/2014/02, etc), then do sc.parallelize over a list of such 
path, and in each task use a single-node S3 library to list the contents of 
that directory only and read them. You can use Hadoop's FileSystem class for 
example (FileSystem.open("s3n://...") or something like that). That way more 
nodes will be querying the metadata for these in parallel.

Matei

On Oct 6, 2014, at 12:59 PM, Nicholas Chammas  
wrote:

> Unfortunately not. Again, I wonder if adding support targeted at this "small 
> files problem" would make sense for Spark core, as it is a common problem in 
> our space.
> 
> Right now, I don't know of any other options.
> 
> Nick
> 
> 
> On Mon, Oct 6, 2014 at 2:24 PM, Landon Kuhn  wrote:
> Nicholas, thanks for the tip. Your suggestion certainly seemed like the right 
> approach, but after a few days of fiddling I've come to the conclusion that 
> s3distcp will not work for my use case. It is unable to flatten directory 
> hierarchies, which I need because my source directories contain 
> hour/minute/second parts.
> 
> See https://forums.aws.amazon.com/message.jspa?messageID=478960. It seems 
> that s3distcp can only combine files in the same path.
> 
> Thanks again. That gave me a lot to go on. Any further suggestions?
> 
> L
> 
> 
> On Thu, Oct 2, 2014 at 4:15 PM, Nicholas Chammas  
> wrote:
> I believe this is known as the "Hadoop Small Files Problem", and it affects 
> Spark as well. The best approach I've seen to merging small files like this 
> is by using s3distcp, as suggested here, as a pre-processing step.
> 
> It would be great if Spark could somehow handle this common situation out of 
> the box, but for now I don't think it does.
> 
> Nick
> 
> On Thu, Oct 2, 2014 at 7:10 PM, Landon Kuhn  wrote:
> Hello, I'm trying to use Spark to process a large number of files in S3. I'm 
> running into an issue that I believe is related to the high number of files, 
> and the resources required to build the listing within the driver program. If 
> anyone in the Spark community can provide insight or guidance, it would be 
> greatly appreciated.
> 
> The task at hand is to read ~100 million files stored in S3, and repartition 
> the data into a sensible number of files (perhaps 1,000). The files are 
> organized in a directory structure like so:
> 
> 
> s3://bucket/event_type/year/month/day/hour/minute/second/customer_id/file_name
> 
> (Note that each file is very small, containing 1-10 records each. 
> Unfortunately this is an artifact of the upstream systems that put data in 
> S3.)
> 
> My Spark program is simple, and works when I target a relatively specific 
> subdirectory. For example:
> 
> 
> sparkContext.textFile("s3n://bucket/purchase/2014/01/01/00/*/*/*/*").coalesce(...).write(...)
> 
> This targets 1 hour's worth of purchase records, containing about 10,000 
> files. The driver program blocks (I assume it is making S3 calls to traverse 
> the directories), and during this time no activity is visible in the driver 
> UI. After about a minute, the stages and tasks allocate in the UI, and then 
> everything progresses and completes within a few minutes.
> 
> I need to process all the data (several year's worth). Something like:
> 
>   
> sparkContext.textFile("s3n://bucket/*/*/*/*/*/*/*/*/*").coalesce(...).write(...)
> 
> This blocks "forever" (I have only run the program for as long as overnight). 
> The stages and tasks never appear in the UI. I assume Spark is building the 
> file listing, which will either take too long and/or cause the driver to 
> eventually run out of memory.
> 
> I would appreciate any comments or suggestions. I'm happy to provide more 
> information if that would be helpful.
> 
> Thanks
> 
> Landon
> 
> 
> 
> 
> 
> -- 
> Landon Kuhn, Software Architect, Janrain, Inc.
> E: lan...@janrain.com | M: 971-645-5501 | F: 888-267-9025
> Follow Janrain: Facebook | Twitter | YouTube | LinkedIn | Blog
> Follow Me: LinkedIn
> -
> Acquire, understand, and engage your users. Watch our video or sign up for a 
> live demo to see what it's all about.
> 



Re: Strategies for reading large numbers of files

2014-10-06 Thread Nicholas Chammas
Unfortunately not. Again, I wonder if adding support targeted at this
"small files problem" would make sense for Spark core, as it is a common
problem in our space.

Right now, I don't know of any other options.

Nick


On Mon, Oct 6, 2014 at 2:24 PM, Landon Kuhn  wrote:

> Nicholas, thanks for the tip. Your suggestion certainly seemed like the
> right approach, but after a few days of fiddling I've come to the
> conclusion that s3distcp will not work for my use case. It is unable to
> flatten directory hierarchies, which I need because my source directories
> contain hour/minute/second parts.
>
> See https://forums.aws.amazon.com/message.jspa?messageID=478960. It seems
> that s3distcp can only combine files in the same path.
>
> Thanks again. That gave me a lot to go on. Any further suggestions?
>
> L
>
>
> On Thu, Oct 2, 2014 at 4:15 PM, Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> I believe this is known as the "Hadoop Small Files Problem", and it
>> affects Spark as well. The best approach I've seen to merging small files
>> like this is by using s3distcp, as suggested here
>> ,
>> as a pre-processing step.
>>
>> It would be great if Spark could somehow handle this common situation out
>> of the box, but for now I don't think it does.
>>
>> Nick
>>
>> On Thu, Oct 2, 2014 at 7:10 PM, Landon Kuhn  wrote:
>>
>>> Hello, I'm trying to use Spark to process a large number of files in S3.
>>> I'm running into an issue that I believe is related to the high number of
>>> files, and the resources required to build the listing within the driver
>>> program. If anyone in the Spark community can provide insight or guidance,
>>> it would be greatly appreciated.
>>>
>>> The task at hand is to read ~100 million files stored in S3, and
>>> repartition the data into a sensible number of files (perhaps 1,000). The
>>> files are organized in a directory structure like so:
>>>
>>>
>>> s3://bucket/event_type/year/month/day/hour/minute/second/customer_id/file_name
>>>
>>> (Note that each file is very small, containing 1-10 records each.
>>> Unfortunately this is an artifact of the upstream systems that put data in
>>> S3.)
>>>
>>> My Spark program is simple, and works when I target a relatively
>>> specific subdirectory. For example:
>>>
>>>
>>> sparkContext.textFile("s3n://bucket/purchase/2014/01/01/00/*/*/*/*").coalesce(...).write(...)
>>>
>>> This targets 1 hour's worth of purchase records, containing about 10,000
>>> files. The driver program blocks (I assume it is making S3 calls to
>>> traverse the directories), and during this time no activity is visible in
>>> the driver UI. After about a minute, the stages and tasks allocate in the
>>> UI, and then everything progresses and completes within a few minutes.
>>>
>>> I need to process all the data (several year's worth). Something like:
>>>
>>>
>>> sparkContext.textFile("s3n://bucket/*/*/*/*/*/*/*/*/*").coalesce(...).write(...)
>>>
>>> This blocks "forever" (I have only run the program for as long as
>>> overnight). The stages and tasks never appear in the UI. I assume Spark is
>>> building the file listing, which will either take too long and/or cause the
>>> driver to eventually run out of memory.
>>>
>>> I would appreciate any comments or suggestions. I'm happy to provide
>>> more information if that would be helpful.
>>>
>>> Thanks
>>>
>>> Landon
>>>
>>>
>>
>
>
> --
> *Landon Kuhn*, *Software Architect*, Janrain, Inc. 
> E: lan...@janrain.com | M: 971-645-5501 | F: 888-267-9025
> Follow Janrain: Facebook  | Twitter
>  | YouTube  | LinkedIn
>  | Blog 
> Follow Me: LinkedIn 
>
> -
> *Acquire, understand, and engage your users. Watch our video
>  or sign up for a live demo
>  to see what it's all about.*
>


Re: Strategies for reading large numbers of files

2014-10-06 Thread Landon Kuhn
Nicholas, thanks for the tip. Your suggestion certainly seemed like the
right approach, but after a few days of fiddling I've come to the
conclusion that s3distcp will not work for my use case. It is unable to
flatten directory hierarchies, which I need because my source directories
contain hour/minute/second parts.

See https://forums.aws.amazon.com/message.jspa?messageID=478960. It seems
that s3distcp can only combine files in the same path.

Thanks again. That gave me a lot to go on. Any further suggestions?

L


On Thu, Oct 2, 2014 at 4:15 PM, Nicholas Chammas  wrote:

> I believe this is known as the "Hadoop Small Files Problem", and it
> affects Spark as well. The best approach I've seen to merging small files
> like this is by using s3distcp, as suggested here
> ,
> as a pre-processing step.
>
> It would be great if Spark could somehow handle this common situation out
> of the box, but for now I don't think it does.
>
> Nick
>
> On Thu, Oct 2, 2014 at 7:10 PM, Landon Kuhn  wrote:
>
>> Hello, I'm trying to use Spark to process a large number of files in S3.
>> I'm running into an issue that I believe is related to the high number of
>> files, and the resources required to build the listing within the driver
>> program. If anyone in the Spark community can provide insight or guidance,
>> it would be greatly appreciated.
>>
>> The task at hand is to read ~100 million files stored in S3, and
>> repartition the data into a sensible number of files (perhaps 1,000). The
>> files are organized in a directory structure like so:
>>
>>
>> s3://bucket/event_type/year/month/day/hour/minute/second/customer_id/file_name
>>
>> (Note that each file is very small, containing 1-10 records each.
>> Unfortunately this is an artifact of the upstream systems that put data in
>> S3.)
>>
>> My Spark program is simple, and works when I target a relatively specific
>> subdirectory. For example:
>>
>>
>> sparkContext.textFile("s3n://bucket/purchase/2014/01/01/00/*/*/*/*").coalesce(...).write(...)
>>
>> This targets 1 hour's worth of purchase records, containing about 10,000
>> files. The driver program blocks (I assume it is making S3 calls to
>> traverse the directories), and during this time no activity is visible in
>> the driver UI. After about a minute, the stages and tasks allocate in the
>> UI, and then everything progresses and completes within a few minutes.
>>
>> I need to process all the data (several year's worth). Something like:
>>
>>
>> sparkContext.textFile("s3n://bucket/*/*/*/*/*/*/*/*/*").coalesce(...).write(...)
>>
>> This blocks "forever" (I have only run the program for as long as
>> overnight). The stages and tasks never appear in the UI. I assume Spark is
>> building the file listing, which will either take too long and/or cause the
>> driver to eventually run out of memory.
>>
>> I would appreciate any comments or suggestions. I'm happy to provide more
>> information if that would be helpful.
>>
>> Thanks
>>
>> Landon
>>
>>
>


-- 
*Landon Kuhn*, *Software Architect*, Janrain, Inc. 
E: lan...@janrain.com | M: 971-645-5501 | F: 888-267-9025
Follow Janrain: Facebook  | Twitter
 | YouTube  | LinkedIn
 | Blog 
Follow Me: LinkedIn 
-
*Acquire, understand, and engage your users. Watch our video
 or sign up for a live demo
 to see what it's all about.*


Re: Strategies for reading large numbers of files

2014-10-02 Thread Nicholas Chammas
I believe this is known as the "Hadoop Small Files Problem", and it affects
Spark as well. The best approach I've seen to merging small files like this
is by using s3distcp, as suggested here
,
as a pre-processing step.

It would be great if Spark could somehow handle this common situation out
of the box, but for now I don't think it does.

Nick

On Thu, Oct 2, 2014 at 7:10 PM, Landon Kuhn  wrote:

> Hello, I'm trying to use Spark to process a large number of files in S3.
> I'm running into an issue that I believe is related to the high number of
> files, and the resources required to build the listing within the driver
> program. If anyone in the Spark community can provide insight or guidance,
> it would be greatly appreciated.
>
> The task at hand is to read ~100 million files stored in S3, and
> repartition the data into a sensible number of files (perhaps 1,000). The
> files are organized in a directory structure like so:
>
>
> s3://bucket/event_type/year/month/day/hour/minute/second/customer_id/file_name
>
> (Note that each file is very small, containing 1-10 records each.
> Unfortunately this is an artifact of the upstream systems that put data in
> S3.)
>
> My Spark program is simple, and works when I target a relatively specific
> subdirectory. For example:
>
>
> sparkContext.textFile("s3n://bucket/purchase/2014/01/01/00/*/*/*/*").coalesce(...).write(...)
>
> This targets 1 hour's worth of purchase records, containing about 10,000
> files. The driver program blocks (I assume it is making S3 calls to
> traverse the directories), and during this time no activity is visible in
> the driver UI. After about a minute, the stages and tasks allocate in the
> UI, and then everything progresses and completes within a few minutes.
>
> I need to process all the data (several year's worth). Something like:
>
>
> sparkContext.textFile("s3n://bucket/*/*/*/*/*/*/*/*/*").coalesce(...).write(...)
>
> This blocks "forever" (I have only run the program for as long as
> overnight). The stages and tasks never appear in the UI. I assume Spark is
> building the file listing, which will either take too long and/or cause the
> driver to eventually run out of memory.
>
> I would appreciate any comments or suggestions. I'm happy to provide more
> information if that would be helpful.
>
> Thanks
>
> Landon
>
>