Ensure com.amazonaws.services.s3.AmazonS3ClientBuilder in your classpath which include your application jar and attached executor jars.
2017-07-20 6:12 GMT+08:00 Noppanit Charassinvichai <noppani...@gmail.com>: > I have this spark job which is using S3 client in mapPartition. And I get > this error > > Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most > recent failure: Lost task 0.3 in stage 3.0 (TID 74, > ip-10-90-78-177.ec2.internal, executor 11): java.lang.NoClassDefFoundError: > Could not initialize class com.amazonaws.services.s3.AmazonS3ClientBuilder > +details > Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most > recent failure: Lost task 0.3 in stage 3.0 (TID 74, > ip-10-90-78-177.ec2.internal, executor 11): java.lang.NoClassDefFoundError: > Could not initialize class com.amazonaws.services.s3.AmazonS3ClientBuilder > at SparrowOrc$$anonfun$1.apply(sparrowOrc.scala:49) > at SparrowOrc$$anonfun$1.apply(sparrowOrc.scala:46) > at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$ > anonfun$apply$23.apply(RDD.scala:796) > at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$ > anonfun$apply$23.apply(RDD.scala:796) > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.rdd.MapPartitionsRDD.compute( > MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run( > Executor.scala:282) > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > This is my code > val jsonRows = sqs.mapPartitions(partitions => { > val s3Client = AmazonS3ClientBuilder.standard().withCredentials(new > DefaultCredentialsProvider).build() > > val txfm = new LogLine2Json > val log = Logger.getLogger("parseLog") > > partitions.flatMap(messages => { > val sqsMsg = Json.parse(messages) > val bucketName = Json.stringify(sqsMsg(" > Records")(0)("s3")("bucket")("name")).replace("\"", "") > val key = > Json.stringify(sqsMsg("Records")(0)("s3")("object")("key")).replace("\"", > "") > val obj = s3Client.getObject(new GetObjectRequest(bucketName, key)) > val stream = obj.getObjectContent() > > scala.io.Source.fromInputStream(stream).getLines().map(line => { > try { > txfm.parseLine(line) > } > catch { > case e: Throwable => { > log.info(line); "{}"; > } > } > }).filter(line => line != "{}") > }) > }) > > This is my build.sbt > > name := "sparrow-to-orc" > > version := "0.1" > > scalaVersion := "2.11.8" > > libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0" % > "provided" > libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.1.0" % > "provided" > libraryDependencies += "org.apache.spark" %% "spark-hive" % "2.1.0" % > "provided" > libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.1.0" % > "provided" > > libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "2.7.3" % > "provided" > libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "2.7.3" % > "provided" > libraryDependencies += "com.cn" %% "sparrow-clf-parser" % "1.1-SNAPSHOT" > > libraryDependencies += "com.amazonaws" % "aws-java-sdk" % "1.11.155" > libraryDependencies += "com.amazonaws" % "aws-java-sdk-s3" % "1.11.155" > libraryDependencies += "com.amazonaws" % "aws-java-sdk-core" % "1.11.155" > > libraryDependencies += "com.github.seratch" %% "awscala" % "0.6.+" > libraryDependencies += "com.typesafe.play" %% "play-json" % "2.6.0" > dependencyOverrides ++= Set("com.fasterxml.jackson.core" % > "jackson-databind" % "2.6.0") > > > > assemblyMergeStrategy in assembly := { > case PathList("org","aopalliance", xs @ _*) => MergeStrategy.last > case PathList("javax", "inject", xs @ _*) => MergeStrategy.last > case PathList("javax", "servlet", xs @ _*) => MergeStrategy.last > case PathList("javax", "activation", xs @ _*) => MergeStrategy.last > case PathList("org", "apache", xs @ _*) => MergeStrategy.last > case PathList("com", "google", xs @ _*) => MergeStrategy.last > case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last > case PathList("com", "codahale", xs @ _*) => MergeStrategy.last > case PathList("com", "yammer", xs @ _*) => MergeStrategy.last > case PathList("com", "amazonaws", xs @ _*) => MergeStrategy.last > case PathList("com", "typesafe", xs @ _*) => MergeStrategy.last > case "about.html" => MergeStrategy.rename > case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last > case "META-INF/mailcap" => MergeStrategy.last > case "META-INF/mimetypes.default" => MergeStrategy.last > case "plugin.properties" => MergeStrategy.last > case "log4j.properties" => MergeStrategy.last > case "overview.html" => MergeStrategy.last > case x => > val oldStrategy = (assemblyMergeStrategy in assembly).value > oldStrategy(x) > } > >