MAX_WATERMARK is emitted by ContinuousFileReaderOperator and StreamSource when they close.
I think you'll find this just works without your having to do anything to make it happen. David On Tue, Jul 28, 2020 at 8:07 AM Piotr Nowojski <pnowoj...@apache.org> wrote: > MAX_WATERMARK should be emitted automatically by the > WatermarkAssignerOperator. > > Piotrek > > pon., 27 lip 2020 o 09:16 Flavio Pompermaier <pomperma...@okkam.it> > napisał(a): > >> Yes it could..where should I emit the MAX_WATERMARK and how do I detect >> that the input reached its end? >> >> On Sat, Jul 25, 2020 at 8:08 PM David Anderson <da...@alpinegizmo.com> >> wrote: >> >>> In this use case, couldn't the custom trigger register an event time >>> timer for MAX_WATERMARK, which would be triggered when the bounded input >>> reaches its end? >>> >>> David >>> >>> On Mon, Jul 20, 2020 at 5:47 PM Piotr Nowojski <pnowoj...@apache.org> >>> wrote: >>> >>>> Hi, >>>> >>>> I'm afraid that there is not out of the box way of doing this. I've >>>> created a ticket [1] to write down and document a discussion that we had >>>> about this issue in the past. >>>> >>>> The issue is that currently, untriggered processing time timers are >>>> ignored on end of input and it seems like there might be no one single >>>> perfect way to handle it for all of the cases, but it probably needs to be >>>> customized. >>>> >>>> Maybe you could: >>>> 1. extend `WindowOperator` (`MyWindowOperator`) >>>> 2. implement >>>> `org.apache.flink.streaming.api.operators.BoundedOneInput#endInput` in your >>>> `MyWindowOperator` >>>> 3. Inside `MyWindowOperator#endInput` invoke >>>> `internalTimerService.forEachProcessingTimeTimer(...)` and: >>>> a) manually trigger timers `WindowOperator#onProcessingTime` >>>> b) delete manually triggered timer >>>> >>>> Piotrek >>>> >>>> [1] https://issues.apache.org/jira/browse/FLINK-18647 >>>> >>>> pt., 17 lip 2020 o 10:30 Flavio Pompermaier <pomperma...@okkam.it> >>>> napisał(a): >>>> >>>>> Hi to all, >>>>> I was trying to port another job we have that use dataset API to >>>>> datastream. >>>>> The legacy program was doing basically a >>>>> dataset.mapPartition().reduce() so I tried to replicate this thing with a >>>>> >>>>> final BasicTypeInfo<Double> columnType = >>>>> BasicTypeInfo.DOUBLE_TYPE_INFO; >>>>> final DataStream<Row> input = env.fromElements(// >>>>> Row.of(1.0), // >>>>> Row.of(2.0), // >>>>> Row.of(3.0), // >>>>> Row.of(5.0), // >>>>> Row.of(6.0)).returns(new RowTypeInfo(columnType)); >>>>> inputStream.map(new SubtaskIndexAssigner(columnType)) >>>>> .keyBy(t -> t.f0) >>>>> .window(GlobalWindows.create()) >>>>> >>>>> .trigger(PurgingTrigger.of(CountWithTimeoutTriggerPartition.of(Time.seconds(5), >>>>> 100L))). >>>>> .process(..) >>>>> >>>>> Unfortunately the program exits before reaching the Process function >>>>> (moreover I need to add another window + trigger after it before adding >>>>> the >>>>> reduce function). >>>>> Is there a way to do this with the DataStream API or should I still >>>>> use DataSet API for the moment (when the batch will be fully supported)? I >>>>> append to the footer all the code required to test the job. >>>>> >>>>> Best, >>>>> Flavio >>>>> >>>>> ----------------------------------------------------------------- >>>>> >>>>> package org.apache.flink.stats.sketches; >>>>> >>>>> import org.apache.flink.api.common.functions.ReduceFunction; >>>>> import org.apache.flink.api.common.functions.RichMapFunction; >>>>> import org.apache.flink.api.common.state.ReducingState; >>>>> import org.apache.flink.api.common.state.ReducingStateDescriptor; >>>>> import org.apache.flink.api.common.typeinfo.BasicTypeInfo; >>>>> import org.apache.flink.api.common.typeinfo.TypeInformation; >>>>> import org.apache.flink.api.common.typeutils.base.LongSerializer; >>>>> import org.apache.flink.api.java.io.PrintingOutputFormat; >>>>> import org.apache.flink.api.java.tuple.Tuple2; >>>>> import org.apache.flink.api.java.typeutils.ResultTypeQueryable; >>>>> import org.apache.flink.api.java.typeutils.RowTypeInfo; >>>>> import org.apache.flink.api.java.typeutils.TupleTypeInfo; >>>>> import org.apache.flink.configuration.Configuration; >>>>> import org.apache.flink.streaming.api.TimeCharacteristic; >>>>> import org.apache.flink.streaming.api.datastream.DataStream; >>>>> import >>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>>> import >>>>> org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; >>>>> import >>>>> org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; >>>>> import org.apache.flink.streaming.api.windowing.time.Time; >>>>> import >>>>> org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; >>>>> import org.apache.flink.streaming.api.windowing.triggers.Trigger; >>>>> import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; >>>>> import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; >>>>> import org.apache.flink.types.Row; >>>>> import org.apache.flink.util.Collector; >>>>> >>>>> public class Test { >>>>> public static void main(String[] args) throws Exception { >>>>> StreamExecutionEnvironment env = >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); >>>>> env.setParallelism(1); >>>>> >>>>> final BasicTypeInfo<Double> columnType = >>>>> BasicTypeInfo.DOUBLE_TYPE_INFO; >>>>> final DataStream<Row> input = env.fromElements(// >>>>> Row.of(1.0), // >>>>> Row.of(2.0), // >>>>> Row.of(3.0), // >>>>> Row.of(5.0), // >>>>> Row.of(6.0)).returns(new RowTypeInfo(columnType)); >>>>> final DataStream<Row> out = input.map(new >>>>> SubtaskIndexAssigner(columnType))// >>>>> .keyBy(t -> t.f0)// >>>>> .window(GlobalWindows.create()) >>>>> .trigger(PurgingTrigger.of(new >>>>> CountWithTimeoutTriggerPartition(Time.seconds(5), 100L))) >>>>> .process(new ProcessWindowFunction<Tuple2<Integer, Row>, Row, >>>>> Integer, GlobalWindow>() { >>>>> >>>>> @Override >>>>> public void process(Integer key, >>>>> ProcessWindowFunction<Tuple2<Integer, Row>, Row, >>>>> Integer, GlobalWindow>.Context context, >>>>> Iterable<Tuple2<Integer, Row>> it, Collector<Row> out) >>>>> throws Exception { >>>>> for (Tuple2<Integer, Row> tuple : it) { >>>>> out.collect(Row.of(tuple.f1.getField(0).toString())); >>>>> } >>>>> >>>>> } >>>>> }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO)); >>>>> out.writeUsingOutputFormat(new PrintingOutputFormat<Row>()); >>>>> env.execute(); >>>>> } >>>>> >>>>> private static final class SubtaskIndexAssigner extends >>>>> RichMapFunction<Row, Tuple2<Integer, Row>> >>>>> implements ResultTypeQueryable<Tuple2<Integer, Row>> { >>>>> private static final long serialVersionUID = 1L; >>>>> >>>>> private int myTaskId; >>>>> private TypeInformation<?> columnType; >>>>> >>>>> public SubtaskIndexAssigner(TypeInformation<?> columnType) { >>>>> this.columnType = columnType; >>>>> } >>>>> >>>>> @Override >>>>> public void open(Configuration parameters) throws Exception { >>>>> this.myTaskId = getRuntimeContext().getIndexOfThisSubtask(); >>>>> } >>>>> >>>>> @Override >>>>> public Tuple2<Integer, Row> map(Row row) throws Exception { >>>>> return Tuple2.of(myTaskId, row); >>>>> } >>>>> >>>>> @Override >>>>> public TypeInformation<Tuple2<Integer, Row>> getProducedType() { >>>>> return new TupleTypeInfo<Tuple2<Integer, >>>>> Row>>(BasicTypeInfo.INT_TYPE_INFO, >>>>> new RowTypeInfo(columnType)); >>>>> } >>>>> } >>>>> >>>>> private static class CountWithTimeoutTriggerPartition >>>>> extends Trigger<Tuple2<Integer, Row>, GlobalWindow> { >>>>> >>>>> private static final long serialVersionUID = 1L; >>>>> private final long maxCount; >>>>> private final long maxTime; >>>>> >>>>> private final ReducingStateDescriptor<Long> countstateDesc = >>>>> new ReducingStateDescriptor<>("count", new Sum(), >>>>> LongSerializer.INSTANCE); >>>>> private final ReducingStateDescriptor<Long> timestateDesc = >>>>> new ReducingStateDescriptor<>("fire-time", new Min(), >>>>> LongSerializer.INSTANCE); >>>>> >>>>> public CountWithTimeoutTriggerPartition(long maxTime, long >>>>> maxCount) { >>>>> this.maxCount = maxCount; >>>>> this.maxTime = maxTime; >>>>> } >>>>> >>>>> public CountWithTimeoutTriggerPartition(Time maxTime, long >>>>> maxCount) { >>>>> this(maxTime.toMilliseconds(), maxCount); >>>>> } >>>>> >>>>> @Override >>>>> public TriggerResult onElement(Tuple2<Integer, Row> element, long >>>>> timestamp, >>>>> GlobalWindow window, >>>>> >>>>> org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext >>>>> ctx) >>>>> throws Exception { >>>>> >>>>> ReducingState<Long> fireTimestamp = >>>>> ctx.getPartitionedState(timestateDesc); >>>>> >>>>> timestamp = ctx.getCurrentProcessingTime(); >>>>> >>>>> if (fireTimestamp.get() == null) { >>>>> long start = timestamp - (timestamp % maxTime); >>>>> long nextFireTimestamp = start + maxTime; >>>>> >>>>> ctx.registerProcessingTimeTimer(nextFireTimestamp); >>>>> >>>>> fireTimestamp.add(nextFireTimestamp); >>>>> return TriggerResult.CONTINUE; >>>>> } >>>>> ReducingState<Long> count = >>>>> ctx.getPartitionedState(countstateDesc); >>>>> count.add(1L); >>>>> if (count.get() >= maxCount) { >>>>> count.clear(); >>>>> fireTimestamp.clear(); >>>>> return TriggerResult.FIRE_AND_PURGE; >>>>> } >>>>> return TriggerResult.CONTINUE; >>>>> } >>>>> >>>>> @Override >>>>> public TriggerResult onProcessingTime(long time, GlobalWindow >>>>> window, TriggerContext ctx) >>>>> throws Exception { >>>>> ReducingState<Long> fireTimestamp = >>>>> ctx.getPartitionedState(timestateDesc); >>>>> ReducingState<Long> count = >>>>> ctx.getPartitionedState(countstateDesc); >>>>> if (fireTimestamp.get().equals(time)) { >>>>> count.clear(); >>>>> fireTimestamp.clear(); >>>>> fireTimestamp.add(time + maxTime); >>>>> ctx.registerProcessingTimeTimer(time + maxTime); >>>>> return TriggerResult.FIRE_AND_PURGE; >>>>> } >>>>> return TriggerResult.CONTINUE; >>>>> } >>>>> >>>>> @Override >>>>> public TriggerResult onEventTime(@SuppressWarnings("unused") long >>>>> time, >>>>> @SuppressWarnings("unused") GlobalWindow window, >>>>> @SuppressWarnings("unused") TriggerContext ctx) throws >>>>> Exception { >>>>> return TriggerResult.CONTINUE; >>>>> } >>>>> >>>>> @Override >>>>> public void clear(GlobalWindow window, TriggerContext ctx) throws >>>>> Exception { >>>>> ReducingState<Long> fireTimestamp = >>>>> ctx.getPartitionedState(timestateDesc); >>>>> long timestamp = fireTimestamp.get(); >>>>> ctx.deleteProcessingTimeTimer(timestamp); >>>>> fireTimestamp.clear(); >>>>> ctx.getPartitionedState(countstateDesc).clear(); >>>>> } >>>>> >>>>> @Override >>>>> public boolean canMerge() { >>>>> return true; >>>>> } >>>>> >>>>> @Override >>>>> public void onMerge(GlobalWindow window, OnMergeContext ctx) { >>>>> ctx.mergePartitionedState(countstateDesc); >>>>> ctx.mergePartitionedState(timestateDesc); >>>>> } >>>>> >>>>> class Sum implements ReduceFunction<Long> { >>>>> private static final long serialVersionUID = 1L; >>>>> >>>>> @Override >>>>> public Long reduce(Long value1, Long value2) throws Exception { >>>>> return value1 + value2; >>>>> } >>>>> } >>>>> >>>>> class Min implements ReduceFunction<Long> { >>>>> private static final long serialVersionUID = 1L; >>>>> >>>>> @Override >>>>> public Long reduce(Long value1, Long value2) throws Exception { >>>>> return Math.min(value1, value2); >>>>> } >>>>> } >>>>> } >>>>> >>>>> } >>>>> >>>> >>