Re: Flink not writing last few elements to disk

2018-02-05 Thread Piotr Nowojski
Hi,

FileProcessMode.PROCESS_CONTINUOUSLY processes the file continuously - the 
stream will not end. 

Simple `writeAsCsv(…)` on the other hand only flushes the output file on a 
stream end (see `OutputFormatSinkFunction`).

You can either use `PROCESS_ONCE` mode or use more advanced data sink:
- BucketingSink
- re-use `writeAsCsv(…)` code by extending OutputFormatSinkFunction and 
implementing `CheckpointedFunction` to flush on snapshots (for at-least-once)
- write your own sink by extending `TwoPhaseCommitSinkFunction` (to support 
`exactly-once`)

Piotrek

> On 2 Feb 2018, at 18:32, geoff halmo  wrote:
> 
> Hi Flink community:
> 
> I am testing Flink but can't write the final(18 or so elements out to disk)
> 
> Setup:
> Using NYC yellow taxi from data 2017-09.csv, I sorted the data on
> pickup_datetime in bash. I am working in event time.
> 
> Skeleton program:
> val ds = senv.readFile(input_format, input_path,
> FileProcessMode.PROCESS_CONTINUOUSLY, 1000)
> 
> ds.flatMap(row => parse(row)
> .assignAscendingTimestamps( _.datetime)
> .timeWindowAll(Time.hours(1))
> .process( new MyProcessAllWIndowFunction() )
> .writeCsv
> 
> Issue:
> The last line is a half line:
> tail -n1 output.csv
> 150655320,2017-09-27T:19:00-4:00[user@computer]
> 
> When I use .print instead of .writeCsv, the last line on console is
> 150682680,2017-09-30T23:00-400[America/New_York],21353



Flink not writing last few elements to disk

2018-02-02 Thread geoff halmo
Hi Flink community:

I am testing Flink but can't write the final(18 or so elements out to disk)

Setup:
Using NYC yellow taxi from data 2017-09.csv, I sorted the data on
pickup_datetime in bash. I am working in event time.

Skeleton program:
val ds = senv.readFile(input_format, input_path,
FileProcessMode.PROCESS_CONTINUOUSLY, 1000)

ds.flatMap(row => parse(row)
.assignAscendingTimestamps( _.datetime)
.timeWindowAll(Time.hours(1))
.process( new MyProcessAllWIndowFunction() )
.writeCsv

Issue:
The last line is a half line:
tail -n1 output.csv
150655320,2017-09-27T:19:00-4:00[user@computer]

When I use .print instead of .writeCsv, the last line on console is
150682680,2017-09-30T23:00-400[America/New_York],21353