Actually wildcards work too, e.g. s3n://bucket/file1*, and I believe so do comma-separated lists (e.g. s3n://file1,s3n://file2). These are all inherited from FileInputFormat in Hadoop.
Matei On Apr 28, 2014, at 6:05 PM, Andrew Ash <and...@andrewash.com> wrote: > This is already possible with the sc.textFile("/path/to/filedir/*") call. > Does that work for you? > > Sent from my mobile phone > > On Apr 29, 2014 2:46 AM, "Nicholas Chammas" <nicholas.cham...@gmail.com> > wrote: > It would be useful to have some way to open multiple files at once into a > single RDD (e.g. sc.textFile(iterable_over_uris)). Logically, it would be > equivalent to opening a single file which is made by concatenating the > various files together. This would only be useful, of course, if the source > files were all in the same format. > > Nick > > > > On Mon, Apr 28, 2014 at 7:40 PM, Andrew Ash <and...@andrewash.com> wrote: > The way you've written it there, I would expect that to be serial runs. The > reason is, you're looping over matches with a driver-level map, which is > serialized. Then calling foreachWith on the RDDs executes the action in a > blocking way, so you don't get a result back until the cluster finishes. > > You can have multiple jobs running at the same time though by sharing a > SparkContext among threads. Rather than run all the foreachWith()s in serial > on a single thread in the driver, try running each in its own thread. > > > > > On Tue, Apr 29, 2014 at 1:35 AM, Art Peel <found...@gmail.com> wrote: > > I’m trying to process 3 S3 files in parallel, but they always get processed > serially. > Am I going about this the wrong way? Full details below. > > Regards, > Art > > > > I’m trying to do the following on a Spark cluster with 3 slave nodes. > > Given a list of N URLS for files in S3 (with s3n:// urls), > > For each file: > 1. search for some items > 2. record the findings from step 1 in an external data store. > > I’d like to process the 3 URLs in parallel on 3 different slave nodes, but > instead, they are getting processed serially on one node. > > I’ve tried various versions of my code to no avail. I’m also creating the > SparkContext with "spark.scheduler.mode", "FAIR”. > > Have I made a fundamental mistake? > > I’m running Spark 0.9.1 and my code looks roughly like this: > > def processFiles(sc: SparkContext) { > > val urls = List("s3n://foo/file.txt", "s3n://bar/file.txt", > "s3n://baz/file.txt") > > val hadoopFiles = urls.map(url => { > sc.hadoopFile(url, classOf[FooInputFormat], classOf[LongWritable], > classOf[WritableFooRecord]) > }) > > val matches = hadoopFiles.par.map((hadoopFile) => { > > findMatches(hadoopFile) > }) > > > matches.map((matchRDD) => { > recordMatches(matchRDD) > }) > } > > > def findMatches(hadoopFile: RDD): RDD = { > > hadoopFile.map( record => caseClassResultFromSearch(record) ) > > } > > def recordMatches(matchRDD: RDD) { > > > matchRDD.foreachWith(_ => { > > makeRestClient(config) > > // I get 3 jobs referring to the line number of the next line, but the > jobs run serially on one node. > })((matchRecord, client) => { > > client.storeResult(matchRecord) > > } > > > } > > > >