You can try something like this:

​eventsDStream.foreachRDD(rdd => {
      val curdate = new DateTime()
      val fmt = DateTimeFormat.forPattern("dd_MM_YYYY");


rdd.saveAsTextFile("s3n://<bucket_name>/test/events_"+fmt.print(curdate)+"/events")

    })

Thanks
Best Regards

On Fri, Apr 10, 2015 at 4:22 PM, Anshul Singhle <ans...@betaglide.com>
wrote:

> Hi all,
>
> I'm using spark streaming to log events from kinesis to s3.
> My code is doing something like this -
>
> val curdate = new DateTime()
> val fmt = DateTimeFormat.forPattern("dd_MM_YYYY");
> eventsDStream.saveAsTextFiles("s3n://<bucket_name>/test/events_"+fmt.print(curdate)+"/events","json")
>
> The problem is that when I started my streaming job on 9th April and kept it 
> running for > 1day , it was still writing to the 09_04_2015 folder.
>
> On restarting my streaming job, it was able to write to the correct directory 
> i.e. 10_04_2015
>
> Any idea why this is occurring?
>
> Regards,
> Anshul
>
>

Reply via email to