Re: processing s3n:// files in parallel

2014-04-30 Thread foundart
Thanks, Andrew.  As it turns out, the tasks were getting processed in
parallel in separate threads on the same node.  Using the parallel
collection of hadoop files was sufficient to trigger that but my expectation
that the tasks would be spread across nodes rather than cores on a single
node led me not to see that right away.

val matches = hadoopFiles.par.map((hadoopFile) ...








--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/processing-s3n-files-in-parallel-tp4989p5116.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: processing s3n:// files in parallel

2014-04-28 Thread Nicholas Chammas
Oh snap! I didn’t know that!

Confirmed that both the wildcard syntax and the comma-separated syntax work
in PySpark. For example:

sc.textFile('s3n://file1,s3n://file2').count()

Art,

Would this approach work for you? It would let you load your 3 files into a
single RDD, which your workers could then all work on in parallel.

Nick

On Mon, Apr 28, 2014 at 9:09 PM, Matei Zaharia 
wrote:

Actually wildcards work too, e.g. s3n://bucket/file1*, and I believe so do
> comma-separated lists (e.g. s3n://file1,s3n://file2). These are all
> inherited from FileInputFormat in Hadoop.
>
> Matei
>
> On Apr 28, 2014, at 6:05 PM, Andrew Ash  wrote:
>
> This is already possible with the sc.textFile("/path/to/filedir/*") call.
> Does that work for you?
>
> Sent from my mobile phone
> On Apr 29, 2014 2:46 AM, "Nicholas Chammas" 
> wrote:
>
>> It would be useful to have some way to open multiple files at once into a
>> single RDD (e.g. sc.textFile(iterable_over_uris)). Logically, it would
>> be equivalent to opening a single file which is made by concatenating the
>> various files together. This would only be useful, of course, if the source
>> files were all in the same format.
>>
>> Nick
>>
>>
>> On Mon, Apr 28, 2014 at 7:40 PM, Andrew Ash  wrote:
>>
>>> The way you've written it there, I would expect that to be serial runs.
>>>  The reason is, you're looping over matches with a driver-level map, which
>>> is serialized.  Then calling foreachWith on the RDDs executes the action in
>>> a blocking way, so you don't get a result back until the cluster finishes.
>>>
>>> You can have multiple jobs running at the same time though by sharing a
>>> SparkContext among threads.  Rather than run all the foreachWith()s in
>>> serial on a single thread in the driver, try running each in its own thread.
>>>
>>>
>>>
>>>
>>> On Tue, Apr 29, 2014 at 1:35 AM, Art Peel  wrote:
>>>

 I’m trying to process 3 S3 files in parallel, but they always get
 processed serially.
 Am I going about this the wrong way?  Full details below.

 Regards,
 Art



 I’m trying to do the following on a Spark cluster with 3 slave nodes.

 Given a list of N URLS for files in S3 (with s3n:// urls),

 For each file:
 1. search for some items
 2. record the findings from step 1 in an external data store.

 I’d like to process the 3 URLs in parallel on 3 different slave nodes,
 but instead, they are getting processed serially on one node.

 I’ve tried various versions of my code to no avail. I’m also creating
 the SparkContext with "spark.scheduler.mode", "FAIR”.

 Have I made a fundamental mistake?

 I’m running Spark 0.9.1 and my code looks roughly like this:

 def processFiles(sc: SparkContext) {

 val urls = List("s3n://foo/file.txt", "s3n://bar/file.txt", "
 s3n://baz/file.txt")

 val hadoopFiles = urls.map(url => {
   sc.hadoopFile(url, classOf[FooInputFormat],
 classOf[LongWritable], classOf[WritableFooRecord])
 })

 val matches = hadoopFiles.par.map((hadoopFile) => {

   findMatches(hadoopFile)
 })


 matches.map((matchRDD) => {
   recordMatches(matchRDD)
 })
   }


   def findMatches(hadoopFile: RDD): RDD = {

 hadoopFile.map( record => caseClassResultFromSearch(record) )

   }

   def recordMatches(matchRDD: RDD) {


 matchRDD.foreachWith(_ => {

   makeRestClient(config)

 // I get 3 jobs referring to the line number of the next line, but
 the jobs run serially on one node.
 })((matchRecord, client) => {

   client.storeResult(matchRecord)

 }


   }



>>>
>>
>


Re: processing s3n:// files in parallel

2014-04-28 Thread Matei Zaharia
Actually wildcards work too, e.g. s3n://bucket/file1*, and I believe so do 
comma-separated lists (e.g. s3n://file1,s3n://file2). These are all inherited 
from FileInputFormat in Hadoop.

Matei

On Apr 28, 2014, at 6:05 PM, Andrew Ash  wrote:

> This is already possible with the sc.textFile("/path/to/filedir/*") call. 
> Does that work for you?
> 
> Sent from my mobile phone
> 
> On Apr 29, 2014 2:46 AM, "Nicholas Chammas"  
> wrote:
> It would be useful to have some way to open multiple files at once into a 
> single RDD (e.g. sc.textFile(iterable_over_uris)). Logically, it would be 
> equivalent to opening a single file which is made by concatenating the 
> various files together. This would only be useful, of course, if the source 
> files were all in the same format.
> 
> Nick
> 
> 
> 
> On Mon, Apr 28, 2014 at 7:40 PM, Andrew Ash  wrote:
> The way you've written it there, I would expect that to be serial runs.  The 
> reason is, you're looping over matches with a driver-level map, which is 
> serialized.  Then calling foreachWith on the RDDs executes the action in a 
> blocking way, so you don't get a result back until the cluster finishes.
> 
> You can have multiple jobs running at the same time though by sharing a 
> SparkContext among threads.  Rather than run all the foreachWith()s in serial 
> on a single thread in the driver, try running each in its own thread.
> 
> 
> 
> 
> On Tue, Apr 29, 2014 at 1:35 AM, Art Peel  wrote:
> 
> I’m trying to process 3 S3 files in parallel, but they always get processed 
> serially.
> Am I going about this the wrong way?  Full details below.
> 
> Regards,
> Art
> 
> 
> 
> I’m trying to do the following on a Spark cluster with 3 slave nodes.
> 
> Given a list of N URLS for files in S3 (with s3n:// urls),
> 
> For each file:
>   1. search for some items
> 2. record the findings from step 1 in an external data store.
> 
> I’d like to process the 3 URLs in parallel on 3 different slave nodes, but 
> instead, they are getting processed serially on one node.
> 
> I’ve tried various versions of my code to no avail. I’m also creating the 
> SparkContext with "spark.scheduler.mode", "FAIR”.
> 
> Have I made a fundamental mistake? 
> 
> I’m running Spark 0.9.1 and my code looks roughly like this:
> 
> def processFiles(sc: SparkContext) {
> 
> val urls = List("s3n://foo/file.txt", "s3n://bar/file.txt", 
> "s3n://baz/file.txt")
> 
> val hadoopFiles = urls.map(url => {
>   sc.hadoopFile(url, classOf[FooInputFormat], classOf[LongWritable], 
> classOf[WritableFooRecord])
> })
> 
> val matches = hadoopFiles.par.map((hadoopFile) => {
> 
>   findMatches(hadoopFile)
> })
> 
> 
> matches.map((matchRDD) => {
>   recordMatches(matchRDD)
> })
>   }
> 
> 
>   def findMatches(hadoopFile: RDD): RDD = {
> 
> hadoopFile.map( record => caseClassResultFromSearch(record) )
> 
>   }
> 
>   def recordMatches(matchRDD: RDD) {
> 
> 
> matchRDD.foreachWith(_ => {
> 
>   makeRestClient(config)
> 
> // I get 3 jobs referring to the line number of the next line, but the 
> jobs run serially on one node.
> })((matchRecord, client) => {
> 
>   client.storeResult(matchRecord)
> 
> }
> 
> 
>   }
> 
> 
> 
> 



Re: processing s3n:// files in parallel

2014-04-28 Thread Andrew Ash
This is already possible with the sc.textFile("/path/to/filedir/*") call.
Does that work for you?

Sent from my mobile phone
On Apr 29, 2014 2:46 AM, "Nicholas Chammas" 
wrote:

> It would be useful to have some way to open multiple files at once into a
> single RDD (e.g. sc.textFile(iterable_over_uris)). Logically, it would be
> equivalent to opening a single file which is made by concatenating the
> various files together. This would only be useful, of course, if the source
> files were all in the same format.
>
> Nick
>
>
> On Mon, Apr 28, 2014 at 7:40 PM, Andrew Ash  wrote:
>
>> The way you've written it there, I would expect that to be serial runs.
>>  The reason is, you're looping over matches with a driver-level map, which
>> is serialized.  Then calling foreachWith on the RDDs executes the action in
>> a blocking way, so you don't get a result back until the cluster finishes.
>>
>> You can have multiple jobs running at the same time though by sharing a
>> SparkContext among threads.  Rather than run all the foreachWith()s in
>> serial on a single thread in the driver, try running each in its own thread.
>>
>>
>>
>>
>> On Tue, Apr 29, 2014 at 1:35 AM, Art Peel  wrote:
>>
>>>
>>> I’m trying to process 3 S3 files in parallel, but they always get
>>> processed serially.
>>> Am I going about this the wrong way?  Full details below.
>>>
>>> Regards,
>>> Art
>>>
>>>
>>>
>>> I’m trying to do the following on a Spark cluster with 3 slave nodes.
>>>
>>> Given a list of N URLS for files in S3 (with s3n:// urls),
>>>
>>> For each file:
>>> 1. search for some items
>>> 2. record the findings from step 1 in an external data store.
>>>
>>> I’d like to process the 3 URLs in parallel on 3 different slave nodes,
>>> but instead, they are getting processed serially on one node.
>>>
>>> I’ve tried various versions of my code to no avail. I’m also creating
>>> the SparkContext with "spark.scheduler.mode", "FAIR”.
>>>
>>> Have I made a fundamental mistake?
>>>
>>> I’m running Spark 0.9.1 and my code looks roughly like this:
>>>
>>> def processFiles(sc: SparkContext) {
>>>
>>> val urls = List("s3n://foo/file.txt", "s3n://bar/file.txt",
>>> "s3n://baz/file.txt")
>>>
>>> val hadoopFiles = urls.map(url => {
>>>   sc.hadoopFile(url, classOf[FooInputFormat], classOf[LongWritable],
>>> classOf[WritableFooRecord])
>>> })
>>>
>>> val matches = hadoopFiles.par.map((hadoopFile) => {
>>>
>>>   findMatches(hadoopFile)
>>> })
>>>
>>>
>>> matches.map((matchRDD) => {
>>>   recordMatches(matchRDD)
>>> })
>>>   }
>>>
>>>
>>>   def findMatches(hadoopFile: RDD): RDD = {
>>>
>>> hadoopFile.map( record => caseClassResultFromSearch(record) )
>>>
>>>   }
>>>
>>>   def recordMatches(matchRDD: RDD) {
>>>
>>>
>>> matchRDD.foreachWith(_ => {
>>>
>>>   makeRestClient(config)
>>>
>>> // I get 3 jobs referring to the line number of the next line, but
>>> the jobs run serially on one node.
>>> })((matchRecord, client) => {
>>>
>>>   client.storeResult(matchRecord)
>>>
>>> }
>>>
>>>
>>>   }
>>>
>>>
>>>
>>
>


Re: processing s3n:// files in parallel

2014-04-28 Thread Nicholas Chammas
It would be useful to have some way to open multiple files at once into a
single RDD (e.g. sc.textFile(iterable_over_uris)). Logically, it would be
equivalent to opening a single file which is made by concatenating the
various files together. This would only be useful, of course, if the source
files were all in the same format.

Nick


On Mon, Apr 28, 2014 at 7:40 PM, Andrew Ash  wrote:

> The way you've written it there, I would expect that to be serial runs.
>  The reason is, you're looping over matches with a driver-level map, which
> is serialized.  Then calling foreachWith on the RDDs executes the action in
> a blocking way, so you don't get a result back until the cluster finishes.
>
> You can have multiple jobs running at the same time though by sharing a
> SparkContext among threads.  Rather than run all the foreachWith()s in
> serial on a single thread in the driver, try running each in its own thread.
>
>
>
>
> On Tue, Apr 29, 2014 at 1:35 AM, Art Peel  wrote:
>
>>
>> I’m trying to process 3 S3 files in parallel, but they always get
>> processed serially.
>> Am I going about this the wrong way?  Full details below.
>>
>> Regards,
>> Art
>>
>>
>>
>> I’m trying to do the following on a Spark cluster with 3 slave nodes.
>>
>> Given a list of N URLS for files in S3 (with s3n:// urls),
>>
>> For each file:
>> 1. search for some items
>> 2. record the findings from step 1 in an external data store.
>>
>> I’d like to process the 3 URLs in parallel on 3 different slave nodes,
>> but instead, they are getting processed serially on one node.
>>
>> I’ve tried various versions of my code to no avail. I’m also creating the
>> SparkContext with "spark.scheduler.mode", "FAIR”.
>>
>> Have I made a fundamental mistake?
>>
>> I’m running Spark 0.9.1 and my code looks roughly like this:
>>
>> def processFiles(sc: SparkContext) {
>>
>> val urls = List("s3n://foo/file.txt", "s3n://bar/file.txt",
>> "s3n://baz/file.txt")
>>
>> val hadoopFiles = urls.map(url => {
>>   sc.hadoopFile(url, classOf[FooInputFormat], classOf[LongWritable],
>> classOf[WritableFooRecord])
>> })
>>
>> val matches = hadoopFiles.par.map((hadoopFile) => {
>>
>>   findMatches(hadoopFile)
>> })
>>
>>
>> matches.map((matchRDD) => {
>>   recordMatches(matchRDD)
>> })
>>   }
>>
>>
>>   def findMatches(hadoopFile: RDD): RDD = {
>>
>> hadoopFile.map( record => caseClassResultFromSearch(record) )
>>
>>   }
>>
>>   def recordMatches(matchRDD: RDD) {
>>
>>
>> matchRDD.foreachWith(_ => {
>>
>>   makeRestClient(config)
>>
>> // I get 3 jobs referring to the line number of the next line, but
>> the jobs run serially on one node.
>> })((matchRecord, client) => {
>>
>>   client.storeResult(matchRecord)
>>
>> }
>>
>>
>>   }
>>
>>
>>
>


Re: processing s3n:// files in parallel

2014-04-28 Thread Andrew Ash
The way you've written it there, I would expect that to be serial runs.
 The reason is, you're looping over matches with a driver-level map, which
is serialized.  Then calling foreachWith on the RDDs executes the action in
a blocking way, so you don't get a result back until the cluster finishes.

You can have multiple jobs running at the same time though by sharing a
SparkContext among threads.  Rather than run all the foreachWith()s in
serial on a single thread in the driver, try running each in its own thread.




On Tue, Apr 29, 2014 at 1:35 AM, Art Peel  wrote:

>
> I’m trying to process 3 S3 files in parallel, but they always get
> processed serially.
> Am I going about this the wrong way?  Full details below.
>
> Regards,
> Art
>
>
>
> I’m trying to do the following on a Spark cluster with 3 slave nodes.
>
> Given a list of N URLS for files in S3 (with s3n:// urls),
>
> For each file:
> 1. search for some items
> 2. record the findings from step 1 in an external data store.
>
> I’d like to process the 3 URLs in parallel on 3 different slave nodes, but
> instead, they are getting processed serially on one node.
>
> I’ve tried various versions of my code to no avail. I’m also creating the
> SparkContext with "spark.scheduler.mode", "FAIR”.
>
> Have I made a fundamental mistake?
>
> I’m running Spark 0.9.1 and my code looks roughly like this:
>
> def processFiles(sc: SparkContext) {
>
> val urls = List("s3n://foo/file.txt", "s3n://bar/file.txt",
> "s3n://baz/file.txt")
>
> val hadoopFiles = urls.map(url => {
>   sc.hadoopFile(url, classOf[FooInputFormat], classOf[LongWritable],
> classOf[WritableFooRecord])
> })
>
> val matches = hadoopFiles.par.map((hadoopFile) => {
>
>   findMatches(hadoopFile)
> })
>
>
> matches.map((matchRDD) => {
>   recordMatches(matchRDD)
> })
>   }
>
>
>   def findMatches(hadoopFile: RDD): RDD = {
>
> hadoopFile.map( record => caseClassResultFromSearch(record) )
>
>   }
>
>   def recordMatches(matchRDD: RDD) {
>
>
> matchRDD.foreachWith(_ => {
>
>   makeRestClient(config)
>
> // I get 3 jobs referring to the line number of the next line, but the
> jobs run serially on one node.
> })((matchRecord, client) => {
>
>   client.storeResult(matchRecord)
>
> }
>
>
>   }
>
>
>


processing s3n:// files in parallel

2014-04-28 Thread Art Peel
I’m trying to process 3 S3 files in parallel, but they always get processed
serially.
Am I going about this the wrong way?  Full details below.

Regards,
Art



I’m trying to do the following on a Spark cluster with 3 slave nodes.

Given a list of N URLS for files in S3 (with s3n:// urls),

For each file:
1. search for some items
2. record the findings from step 1 in an external data store.

I’d like to process the 3 URLs in parallel on 3 different slave nodes, but
instead, they are getting processed serially on one node.

I’ve tried various versions of my code to no avail. I’m also creating the
SparkContext with "spark.scheduler.mode", "FAIR”.

Have I made a fundamental mistake?

I’m running Spark 0.9.1 and my code looks roughly like this:

def processFiles(sc: SparkContext) {

val urls = List("s3n://foo/file.txt", "s3n://bar/file.txt",
"s3n://baz/file.txt")

val hadoopFiles = urls.map(url => {
  sc.hadoopFile(url, classOf[FooInputFormat], classOf[LongWritable],
classOf[WritableFooRecord])
})

val matches = hadoopFiles.par.map((hadoopFile) => {

  findMatches(hadoopFile)
})


matches.map((matchRDD) => {
  recordMatches(matchRDD)
})
  }


  def findMatches(hadoopFile: RDD): RDD = {

hadoopFile.map( record => caseClassResultFromSearch(record) )

  }

  def recordMatches(matchRDD: RDD) {


matchRDD.foreachWith(_ => {

  makeRestClient(config)

// I get 3 jobs referring to the line number of the next line, but the
jobs run serially on one node.
})((matchRecord, client) => {

  client.storeResult(matchRecord)

}


  }