I'm super new to Spark and I'm writing this job to parse nginx log to ORC
file format so it can be read from Presto. We wrote LogLine2Json which
parse a line of nginx log to json. And that works fine.

    val sqs = streamContext.receiverStream(new SQSReceiver("elb")
      //.credentials("key", "secret")
      .at(Regions.US_EAST_1)
      .withTimeout(5))

    val jsonRows = sqs.mapPartitions(partitions => {
      val sqlSession = SparkSession
        .builder()
        .getOrCreate()

      val s3Client = new AmazonS3Client(new
BasicCredentialsProvider(sys.env("AWS_ACCESS_KEY_ID"),
sys.env("AWS_SECRET_ACCESS_KEY")))

      val txfm = new LogLine2Json
      val log = Logger.getLogger("parseLog")

      partitions.map(messages => {
        val sqsMsg = Json.parse(messages)
        System.out.println(sqsMsg)

        val bucketName =
Json.stringify(sqsMsg("Records")(0)("s3")("bucket")("name")).replace("\"",
"")
        val key =
Json.stringify(sqsMsg("Records")(0)("s3")("object")("key")).replace("\"",
"")
        System.out.println(bucketName)
        System.out.println(key)
        val obj = s3Client.getObject(new GetObjectRequest(bucketName, key))
        val stream = obj.getObjectContent()
        scala.io.Source.fromInputStream(stream).getLines().map(line => {
            try{
              val l = txfm.parseLine(line)
              sqlSession.read.schema(schema.schema).json(l)

            }
            catch {
              case e: Throwable => {log.info(line); "";}
            }
          }).filter(line => line != "{}")
      })
    })


    jsonRows.foreachRDD(r => {
r.write.mode("append").format("orc").option("compression","zlib").save("/tmp/spark/presto")
    })

    streamContext.start()
    streamContext.awaitTermination()
  }

The code is going to read S3 key off of SQS and read the file and parse the
file and save to ORC. However, I can't get this to compile. It complains
that r doesn't have write method in this line

r.write.mode("append").format("orc").option("compression","zlib").save("/tmp/spark/presto")

Please help.
Thanks a lot.

Reply via email to