You can get the end time of a window from the TimeWindow object which is passed to the AllWindowFunction. This is basically a window ID / index. I would go for a custom output sink which writes records to files based on their timestamp. IMO, this would be cleaner & easier than implementing the file output into the window function.
2016-02-04 13:49 GMT+01:00 Radu Tudoran <radu.tudo...@huawei.com>: > Hi Radu, > > > > It is indeed interesting to know how each window could be registered > separately - I am not sure it any of the existing mechanisms in Flink > support this. > > I think you need to create your own output sink. It is a bit tricky to > pass the window sequence number (actually I do not think such an index is > kept – but you can create one by yourself). Maybe an easier option is to > manage the writing of the data yourself in the window function or in a > custom created evictor. In the window and in the evictor you have access to > all data and you can create specific files for each window triggered > > > > > > > > *From:* Radu Prodan [mailto:raduprod...@gmail.com] > *Sent:* Thursday, February 04, 2016 11:58 AM > *To:* user@flink.apache.org > *Subject:* Re: Flink writeAsCsv > > > > Hi Marton, > > > > Thanks to your comment I managed to get it worked. At least it outputs the > results. However, what I need is to output each window result seperately. > Now, it outputs the results of parallel working windows (I think) and > appends the new results to them. For example, If I have parallelism of 10, > then I will have at most 10 files and each file will grow in size as > windows continue. > > What I want is, to have seperate file for a window. For example, after > n'th window is computed output it to some file and close the file. > > > > -best > > Radu > > > > On Thu, Feb 4, 2016 at 11:42 AM Márton Balassi <balassi.mar...@gmail.com> > wrote: > > Hey Radu, > > > > As you are using the streaming api I assume that you call env.execute() in > both cases. Is that the case? > > > > Do you see any errors appearing? My first call would be if your data type > is not a tuple type then writeAsCsv does not work by default. > > > > Best, > > > > Marton > > > > On Thu, Feb 4, 2016 at 11:36 AM, Radu Prodan <raduprod...@gmail.com> > wrote: > > Hi all, > > > > I am new to flink. I wrote a simple program and I want it to output as csv > file. > > > > timeWindowAll(Time.of(3, TimeUnit.MINUTES)) > > .apply(newFunction1()) > > .writeAsCsv("file:///user/someuser/Documents/somefile.csv"); > > > > When I change the sink to . print(), it works and outputs some results. > > I want it to output the result of every window. However, it outputs > nothing and the file is not created. Am I missing anything? > > > > -best > > Radu > > > > > > > > > > > >