Hi Jeff I think the blank configuration may be the issue, our ExecutorClasses implements Tool and we use
*ToolRunner.run(new Configuration(), new ExecutorClass(), args) * to run the crunch job, which worked fine with MRPipeline all the time. What is the correct way of inheriting the configuration here? Thanks Yan On Mon, Jan 4, 2016 at 2:27 PM, Jeff Quinn <[email protected]> wrote: > Interesting, how are you submitting your job? Are you using spark-submit > with the "yarn-master" spark master? Is your main class extending > CrunchTool? My thinking is that somehow the default configurations are not > being inherited, and maybe you are working with a totally blank > Configuration object. > > On Mon, Jan 4, 2016 at 2:19 PM, Yan Yang <[email protected]> wrote: > >> Jeff, >> >> Thanks for the suggestion. After I switch the URL to s3 an almost >> identical exception is now encountered: >> >> java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key >> must be specified as the username or password (respectively) of a s3 URL, or >> by setting the *fs.s3.awsAccessKeyId* or *fs.s3.awsSecretAccessKey* >> properties (respectively). >> >> >> >> On Mon, Jan 4, 2016 at 12:46 PM, Jeff Quinn <[email protected]> wrote: >> >>> Ah ok, I would try it with "s3://",and I think it should work as >>> expected, assuming the machine role you are using for EMR has the proper >>> permissions for writing to the bucket. >>> >>> You should not need to set fs.s3n.awsSecretAccessKey/fs.s3n.awsAccessKeyId >>> or any other properties, EMR service should be taking care of that for you. >>> >>> On Mon, Jan 4, 2016 at 12:22 PM, Yan Yang <[email protected]> wrote: >>> >>>> Hi Jeff, >>>> >>>> We are using s3n://bucket/path >>>> >>>> Thanks >>>> Yan >>>> >>>> On Mon, Jan 4, 2016 at 12:19 PM, Jeff Quinn <[email protected]> wrote: >>>> >>>>> Hey Yan, >>>>> >>>>> Just a hunch but from that stacktrace it looks like you might be using >>>>> the outdated s3-hadoop filesystem, is the url you are trying to write to >>>>> of >>>>> the form s3://bucket/path or s3n://bucket/path? >>>>> >>>>> Thanks! >>>>> >>>>> Jeff >>>>> >>>>> On Mon, Jan 4, 2016 at 12:15 PM, Yan Yang <[email protected]> wrote: >>>>> >>>>>> Hi >>>>>> >>>>>> I have tried to set up a Sparkpipeline to run within AWS EMR. >>>>>> >>>>>> The code is as below: >>>>>> >>>>>> SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi"); >>>>>> JavaSparkContext jsc = new JavaSparkContext(sparkConf); >>>>>> SparkPipeline pipeline = new SparkPipeline(jsc, "spark-app"); >>>>>> >>>>>> PCollection<Input> input = pipeline.read(From.avroFile(inputPaths, >>>>>> Input.class)); >>>>>> PCollection<Output> output = process(input); >>>>>> pipeline.write(output, To.avroFile(outputPath)); >>>>>> >>>>>> The read works and a simple spark write such as calling >>>>>> saveAsTextFile() on an RDD object also works. >>>>>> >>>>>> However write using pipeline.write() hits below exceptions. I have >>>>>> tried to set fs.s3n.awsAccessKeyId and fs.s3n.awsSecretAccessKey in >>>>>> sparkConf >>>>>> with the same result: >>>>>> >>>>>> java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access >>>>>> Key must be specified as the username or password (respectively) of a >>>>>> s3n URL, or by setting the fs.s3n.awsAccessKeyId or >>>>>> fs.s3n.awsSecretAccessKey properties (respectively). >>>>>> at >>>>>> org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70) >>>>>> at >>>>>> org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:80) >>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>> at >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>>>> at >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>>>> at >>>>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) >>>>>> at >>>>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) >>>>>> at org.apache.hadoop.fs.s3native.$Proxy9.initialize(Unknown Source) >>>>>> at >>>>>> org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:326) >>>>>> at >>>>>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2644) >>>>>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:90) >>>>>> at >>>>>> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2678) >>>>>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2660) >>>>>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:374) >>>>>> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) >>>>>> at org.apache.avro.mapred.FsInput.<init>(FsInput.java:37) >>>>>> at >>>>>> org.apache.crunch.types.avro.AvroRecordReader.initialize(AvroRecordReader.java:54) >>>>>> at >>>>>> org.apache.crunch.impl.mr.run.CrunchRecordReader.initialize(CrunchRecordReader.java:150) >>>>>> at >>>>>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:153) >>>>>> at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:124) >>>>>> at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:65) >>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) >>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>>>>> at >>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) >>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>>>>> at >>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) >>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>>>>> at >>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) >>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>>>>> at >>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) >>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>>>>> at >>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) >>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>>>>> at >>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) >>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>>>>> at >>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) >>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>>>>> at >>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) >>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>>>>> at >>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) >>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>>>>> at >>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >>>>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) >>>>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) >>>>>> at >>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) >>>>>> at >>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:88) >>>>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >>>>>> at >>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>>> at >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>> >>>>>> Thanks >>>>>> Yan >>>>>> >>>>> >>>>> >>>> >>> >> >
