I'm super new to Spark and I'm writing this job to parse nginx log to ORC file format so it can be read from Presto. We wrote LogLine2Json which parse a line of nginx log to json. And that works fine.
val sqs = streamContext.receiverStream(new SQSReceiver("elb") //.credentials("key", "secret") .at(Regions.US_EAST_1) .withTimeout(5)) val jsonRows = sqs.mapPartitions(partitions => { val sqlSession = SparkSession .builder() .getOrCreate() val s3Client = new AmazonS3Client(new BasicCredentialsProvider(sys.env("AWS_ACCESS_KEY_ID"), sys.env("AWS_SECRET_ACCESS_KEY"))) val txfm = new LogLine2Json val log = Logger.getLogger("parseLog") partitions.map(messages => { val sqsMsg = Json.parse(messages) System.out.println(sqsMsg) val bucketName = Json.stringify(sqsMsg("Records")(0)("s3")("bucket")("name")).replace("\"", "") val key = Json.stringify(sqsMsg("Records")(0)("s3")("object")("key")).replace("\"", "") System.out.println(bucketName) System.out.println(key) val obj = s3Client.getObject(new GetObjectRequest(bucketName, key)) val stream = obj.getObjectContent() scala.io.Source.fromInputStream(stream).getLines().map(line => { try{ val l = txfm.parseLine(line) sqlSession.read.schema(schema.schema).json(l) } catch { case e: Throwable => {log.info(line); "";} } }).filter(line => line != "{}") }) }) jsonRows.foreachRDD(r => { r.write.mode("append").format("orc").option("compression","zlib").save("/tmp/spark/presto") }) streamContext.start() streamContext.awaitTermination() } The code is going to read S3 key off of SQS and read the file and parse the file and save to ORC. However, I can't get this to compile. It complains that r doesn't have write method in this line r.write.mode("append").format("orc").option("compression","zlib").save("/tmp/spark/presto") Please help. Thanks a lot.