Hi,

All sources emit a Long.MAX_VALUE watermark when they shut down.

What is the expected output and what is the output that you actually get?

Best,
Aljoscha

> On 27. May 2017, at 00:01, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi, 
> 
> the problem might be that your source does not send a watermark this 
> timestamp MAX_LONG after the last record has been sent.
> So your operators never compute the last window.
> 
> Best, Fabian
> 
> 2017-05-24 19:00 GMT+02:00 Sendoh <unicorn.bana...@gmail.com 
> <mailto:unicorn.bana...@gmail.com>>:
> Hi Flink users,
> 
> We have a unit test to test event time window aggregation, but when the job
> finishes, the last event is not output because the Flink job finishes before
> the watermark proceeds, as there is no next event.
> 
> Does anyone have similar issue and have a solution?
> 
> The code is like:
> env.fromElements(TestData.events("2017-05-20T19:34:17.097Z", "997"),
>                 TestData.events("2017-05-20T20:34:17.097Z", "998"),
>                 TestData.events("2017-05-20T20:38:17.097Z", "999"));
> 
> 
> DataStream<JsonNode> testResult = source.assignTimestampsAndWatermarks(new
> EventWatermark())
>                 .keyBy(new KeyByID())
>                 .window(TumblingEventTimeWindows.of(Time.minutes(1)))
>                 .trigger(PurgingTrigger.of(EventTimeTrigger103.create()))
>                 .allowedLateness(Time.minutes(Long.MAX_VALUE))
>                 .fold(null, new AggFoldFunction());
> 
>         Iterator<JsonNode> javaObj = DataStreamUtils.collect(testResult);
> 
>         int count = 0;
>         while (javaObj.hasNext()) {
>             JsonNode current = javaObj.next();
>             System.out.println(current);
>             count++;
>         }
>         Assert.assertEquals(3, count);
> 
> The watermark is simply as:
> public class EventWatermark implements
> AssignerWithPeriodicWatermarks<JsonNode> {
> 
>     private final long maxTimeLag = 5000;
> 
>     private long currentMaxTimestamp;
>     public transient static DateTimeFormatter parseFromTimeFormatter =
> ISODateTimeFormat.dateTimeParser();
> 
>     @Override
>     public long extractTimestamp(JsonNode element, long
> previousElementTimestamp) {
>         long occurredAtLong;
>         try {
>             occurredAtLong =
> DateTime.parse(element.get("metadata").get("occurred_at").asText(),
> parseFromTimeFormatter).getMillis();
>         }
>         catch(IllegalArgumentException ie) {
>             throw new IllegalArgumentException(element.asText());
>         }
> 
>         if(occurredAtLong > currentMaxTimestamp){
>             currentMaxTimestamp = occurredAtLong;
>             }
>         return occurredAtLong;
>     }
> 
>     @Override
>     public Watermark getCurrentWatermark() {
> 
>         return new Watermark(currentMaxTimestamp - maxTimeLag);
> 
>     }
> }
> 
> Best,
> 
> Sendoh
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Last-event-in-event-time-window-is-not-output-tp13305.html
>  
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Last-event-in-event-time-window-is-not-output-tp13305.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.
> 

Reply via email to