Thanks Michael, Can you give me an example? I'm new to Spark
On Mon, 22 Jan 2018 at 12:25 Michael Mansour <michael_mans...@symantec.com> wrote: > Toy, > > > > I suggest your partition your data according to date, and use the > forEachPartition function, using the partition as the bucket location. > > This would require you to define a custom hash partitioner function, but > that is not too difficult. > > > > -- > > Michael Mansour > > Data Scientist > > Symantec > > *From: *Toy <noppani...@gmail.com> > *Date: *Monday, January 22, 2018 at 8:19 AM > *To: *"user@spark.apache.org" <user@spark.apache.org> > *Subject: *[EXT] How do I extract a value in foreachRDD operation > > > > Hi, > > > > We have a spark application to parse log files and save to S3 in ORC > format. However, during the foreachRDD operation we need to extract a date > field to be able to determine the bucket location; we partition it by date. > Currently, we just hardcode it by current date, but we have a requirement > to determine it for each record. > > > > Here's the current code. > > > > jsonRows.foreachRDD(r => { > > val parsedFormat = new SimpleDateFormat("yyyy-MM-dd/") > > val parsedDate = parsedFormat.format(new java.util.Date()) > > val OutputPath = destinationBucket + "/parsed_logs/orc/dt=" + > parsedDate > > > > val jsonDf = sqlSession.read.schema(Schema.schema).json(r) > > > > val writer = > jsonDf.write.mode("append").format("orc").option("compression", "zlib") > > > > if (environment.equals("local")) { > > writer.save("/tmp/sparrow") > > } else { > > writer.save(OutputPath) > > } > > }) > > > > The column in jsonRow that we want is `_ts`. > > > > Thanks. >