Oh huh, that is a little surprising, I'm pretty sure we had this working with 0.11, and our code for running the job looked the exact same as you shared..
Anyway, glad you have it working now! On Fri, Jan 8, 2016 at 4:05 PM, Yan Yang <[email protected]> wrote: > Turns out upgrading crunch from 0.11.0 to 0.13.0 solves the problem. > > On Mon, Jan 4, 2016 at 5:40 PM, Yan Yang <[email protected]> wrote: > >> Hi Jeff >> >> I think the blank configuration may be the issue, >> our ExecutorClasses implements Tool and we use >> >> *ToolRunner.run(new Configuration(), new ExecutorClass(), args) * >> >> to run the crunch job, which worked fine with MRPipeline all the time. >> What is the correct way of inheriting the configuration here? >> >> Thanks >> Yan >> >> On Mon, Jan 4, 2016 at 2:27 PM, Jeff Quinn <[email protected]> wrote: >> >>> Interesting, how are you submitting your job? Are you using spark-submit >>> with the "yarn-master" spark master? Is your main class extending >>> CrunchTool? My thinking is that somehow the default configurations are not >>> being inherited, and maybe you are working with a totally blank >>> Configuration object. >>> >>> On Mon, Jan 4, 2016 at 2:19 PM, Yan Yang <[email protected]> wrote: >>> >>>> Jeff, >>>> >>>> Thanks for the suggestion. After I switch the URL to s3 an almost >>>> identical exception is now encountered: >>>> >>>> java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access >>>> Key must be specified as the username or password (respectively) of a s3 >>>> URL, or by setting the *fs.s3.awsAccessKeyId* or >>>> *fs.s3.awsSecretAccessKey* properties (respectively). >>>> >>>> >>>> >>>> On Mon, Jan 4, 2016 at 12:46 PM, Jeff Quinn <[email protected]> wrote: >>>> >>>>> Ah ok, I would try it with "s3://",and I think it should work as >>>>> expected, assuming the machine role you are using for EMR has the proper >>>>> permissions for writing to the bucket. >>>>> >>>>> You should not need to set fs.s3n.awsSecretAccessKey/fs.s3n.awsAccessKeyId >>>>> or any other properties, EMR service should be taking care of that for >>>>> you. >>>>> >>>>> On Mon, Jan 4, 2016 at 12:22 PM, Yan Yang <[email protected]> wrote: >>>>> >>>>>> Hi Jeff, >>>>>> >>>>>> We are using s3n://bucket/path >>>>>> >>>>>> Thanks >>>>>> Yan >>>>>> >>>>>> On Mon, Jan 4, 2016 at 12:19 PM, Jeff Quinn <[email protected]> wrote: >>>>>> >>>>>>> Hey Yan, >>>>>>> >>>>>>> Just a hunch but from that stacktrace it looks like you might be >>>>>>> using the outdated s3-hadoop filesystem, is the url you are trying to >>>>>>> write >>>>>>> to of the form s3://bucket/path or s3n://bucket/path? >>>>>>> >>>>>>> Thanks! >>>>>>> >>>>>>> Jeff >>>>>>> >>>>>>> On Mon, Jan 4, 2016 at 12:15 PM, Yan Yang <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi >>>>>>>> >>>>>>>> I have tried to set up a Sparkpipeline to run within AWS EMR. >>>>>>>> >>>>>>>> The code is as below: >>>>>>>> >>>>>>>> SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi"); >>>>>>>> JavaSparkContext jsc = new JavaSparkContext(sparkConf); >>>>>>>> SparkPipeline pipeline = new SparkPipeline(jsc, "spark-app"); >>>>>>>> >>>>>>>> PCollection<Input> input = pipeline.read(From.avroFile(inputPaths, >>>>>>>> Input.class)); >>>>>>>> PCollection<Output> output = process(input); >>>>>>>> pipeline.write(output, To.avroFile(outputPath)); >>>>>>>> >>>>>>>> The read works and a simple spark write such as calling >>>>>>>> saveAsTextFile() on an RDD object also works. >>>>>>>> >>>>>>>> However write using pipeline.write() hits below exceptions. I have >>>>>>>> tried to set fs.s3n.awsAccessKeyId and fs.s3n.awsSecretAccessKey >>>>>>>> in sparkConf with the same result: >>>>>>>> >>>>>>>> java.lang.IllegalArgumentException: AWS Access Key ID and Secret >>>>>>>> Access Key must be specified as the username or password >>>>>>>> (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId >>>>>>>> or fs.s3n.awsSecretAccessKey properties (respectively). >>>>>>>> at >>>>>>>> org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70) >>>>>>>> at >>>>>>>> org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:80) >>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>>> at >>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>>>>>> at >>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>>>>>> at >>>>>>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) >>>>>>>> at >>>>>>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) >>>>>>>> at org.apache.hadoop.fs.s3native.$Proxy9.initialize(Unknown >>>>>>>> Source) >>>>>>>> at >>>>>>>> org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:326) >>>>>>>> at >>>>>>>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2644) >>>>>>>> at >>>>>>>> org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:90) >>>>>>>> at >>>>>>>> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2678) >>>>>>>> at >>>>>>>> org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2660) >>>>>>>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:374) >>>>>>>> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) >>>>>>>> at org.apache.avro.mapred.FsInput.<init>(FsInput.java:37) >>>>>>>> at >>>>>>>> org.apache.crunch.types.avro.AvroRecordReader.initialize(AvroRecordReader.java:54) >>>>>>>> at >>>>>>>> org.apache.crunch.impl.mr.run.CrunchRecordReader.initialize(CrunchRecordReader.java:150) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:153) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:124) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:65) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) >>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) >>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) >>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) >>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) >>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) >>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) >>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) >>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) >>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) >>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>>>>>> at >>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) >>>>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) >>>>>>>> at >>>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:88) >>>>>>>> at >>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >>>>>>>> at >>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>>>>> at >>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>> >>>>>>>> Thanks >>>>>>>> Yan >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
