Actually wildcards work too, e.g. s3n://bucket/file1*, and I believe so do 
comma-separated lists (e.g. s3n://file1,s3n://file2). These are all inherited 
from FileInputFormat in Hadoop.

Matei

On Apr 28, 2014, at 6:05 PM, Andrew Ash <and...@andrewash.com> wrote:

> This is already possible with the sc.textFile("/path/to/filedir/*") call. 
> Does that work for you?
> 
> Sent from my mobile phone
> 
> On Apr 29, 2014 2:46 AM, "Nicholas Chammas" <nicholas.cham...@gmail.com> 
> wrote:
> It would be useful to have some way to open multiple files at once into a 
> single RDD (e.g. sc.textFile(iterable_over_uris)). Logically, it would be 
> equivalent to opening a single file which is made by concatenating the 
> various files together. This would only be useful, of course, if the source 
> files were all in the same format.
> 
> Nick
> 
> 
> 
> On Mon, Apr 28, 2014 at 7:40 PM, Andrew Ash <and...@andrewash.com> wrote:
> The way you've written it there, I would expect that to be serial runs.  The 
> reason is, you're looping over matches with a driver-level map, which is 
> serialized.  Then calling foreachWith on the RDDs executes the action in a 
> blocking way, so you don't get a result back until the cluster finishes.
> 
> You can have multiple jobs running at the same time though by sharing a 
> SparkContext among threads.  Rather than run all the foreachWith()s in serial 
> on a single thread in the driver, try running each in its own thread.
> 
> 
> 
> 
> On Tue, Apr 29, 2014 at 1:35 AM, Art Peel <found...@gmail.com> wrote:
> 
> I’m trying to process 3 S3 files in parallel, but they always get processed 
> serially.
> Am I going about this the wrong way?  Full details below.
> 
> Regards,
> Art
> 
> 
> 
> I’m trying to do the following on a Spark cluster with 3 slave nodes.
> 
> Given a list of N URLS for files in S3 (with s3n:// urls),
> 
> For each file:
>       1. search for some items
>         2. record the findings from step 1 in an external data store.
> 
> I’d like to process the 3 URLs in parallel on 3 different slave nodes, but 
> instead, they are getting processed serially on one node.
> 
> I’ve tried various versions of my code to no avail. I’m also creating the 
> SparkContext with "spark.scheduler.mode", "FAIR”.
> 
> Have I made a fundamental mistake? 
> 
> I’m running Spark 0.9.1 and my code looks roughly like this:
> 
> def processFiles(sc: SparkContext) {
> 
>     val urls = List("s3n://foo/file.txt", "s3n://bar/file.txt", 
> "s3n://baz/file.txt")
> 
>     val hadoopFiles = urls.map(url => {
>       sc.hadoopFile(url, classOf[FooInputFormat], classOf[LongWritable], 
> classOf[WritableFooRecord])
>     })
> 
>     val matches = hadoopFiles.par.map((hadoopFile) => {
> 
>       findMatches(hadoopFile)
>     })
> 
> 
>     matches.map((matchRDD) => {
>       recordMatches(matchRDD)
>     })
>   }
> 
> 
>   def findMatches(hadoopFile: RDD): RDD = {
> 
>     hadoopFile.map( record => caseClassResultFromSearch(record) )
> 
>   }
> 
>   def recordMatches(matchRDD: RDD) {
> 
>     
>     matchRDD.foreachWith(_ => {
> 
>       makeRestClient(config)
> 
>     // I get 3 jobs referring to the line number of the next line, but the 
> jobs run serially on one node.
>     })((matchRecord, client) => {
> 
>       client.storeResult(matchRecord)
> 
>     }
> 
> 
>   }
> 
> 
> 
> 

Reply via email to