Thanks Michael,

Can you give me an example? I'm new to Spark

On Mon, 22 Jan 2018 at 12:25 Michael Mansour <michael_mans...@symantec.com>
wrote:

> Toy,
>
>
>
> I suggest your partition your data according to date, and use the
> forEachPartition function, using the partition as the bucket location.
>
> This would require you to define a custom hash partitioner function, but
> that is not too difficult.
>
>
>
> --
>
> Michael Mansour
>
> Data Scientist
>
> Symantec
>
> *From: *Toy <noppani...@gmail.com>
> *Date: *Monday, January 22, 2018 at 8:19 AM
> *To: *"user@spark.apache.org" <user@spark.apache.org>
> *Subject: *[EXT] How do I extract a value in foreachRDD operation
>
>
>
> Hi,
>
>
>
> We have a spark application to parse log files and save to S3 in ORC
> format. However, during the foreachRDD operation we need to extract a date
> field to be able to determine the bucket location; we partition it by date.
> Currently, we just hardcode it by current date, but we have a requirement
> to determine it for each record.
>
>
>
> Here's the current code.
>
>
>
>     jsonRows.foreachRDD(r => {
>
>       val parsedFormat = new SimpleDateFormat("yyyy-MM-dd/")
>
>       val parsedDate = parsedFormat.format(new java.util.Date())
>
>       val OutputPath = destinationBucket + "/parsed_logs/orc/dt=" +
> parsedDate
>
>
>
>       val jsonDf = sqlSession.read.schema(Schema.schema).json(r)
>
>
>
>       val writer =
> jsonDf.write.mode("append").format("orc").option("compression", "zlib")
>
>
>
>       if (environment.equals("local")) {
>
>         writer.save("/tmp/sparrow")
>
>       } else {
>
>         writer.save(OutputPath)
>
>       }
>
>     })
>
>
>
> The column in jsonRow that we want is `_ts`.
>
>
>
> Thanks.
>

Reply via email to