MAX_WATERMARK is emitted by ContinuousFileReaderOperator and StreamSource
when they close.

I think you'll find this just works without your having to do anything to
make it happen.

David

On Tue, Jul 28, 2020 at 8:07 AM Piotr Nowojski <pnowoj...@apache.org> wrote:

> MAX_WATERMARK should be emitted automatically by the
> WatermarkAssignerOperator.
>
> Piotrek
>
> pon., 27 lip 2020 o 09:16 Flavio Pompermaier <pomperma...@okkam.it>
> napisał(a):
>
>> Yes it could..where should I emit the MAX_WATERMARK and how do I detect
>> that the input reached its end?
>>
>> On Sat, Jul 25, 2020 at 8:08 PM David Anderson <da...@alpinegizmo.com>
>> wrote:
>>
>>> In this use case, couldn't the custom trigger register an event time
>>> timer for MAX_WATERMARK, which would be triggered when the bounded input
>>> reaches its end?
>>>
>>> David
>>>
>>> On Mon, Jul 20, 2020 at 5:47 PM Piotr Nowojski <pnowoj...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm afraid that there is not out of the box way of doing this. I've
>>>> created a ticket [1] to write down and document a discussion that we had
>>>> about this issue in the past.
>>>>
>>>> The issue is that currently, untriggered processing time timers are
>>>> ignored on end of input and it seems like there might be no one single
>>>> perfect way to handle it for all of the cases, but it probably needs to be
>>>> customized.
>>>>
>>>> Maybe you could:
>>>> 1. extend `WindowOperator`  (`MyWindowOperator`)
>>>> 2. implement
>>>> `org.apache.flink.streaming.api.operators.BoundedOneInput#endInput` in your
>>>> `MyWindowOperator`
>>>> 3. Inside `MyWindowOperator#endInput`  invoke
>>>> `internalTimerService.forEachProcessingTimeTimer(...)` and:
>>>>   a) manually trigger timers `WindowOperator#onProcessingTime`
>>>>   b) delete manually triggered timer
>>>>
>>>> Piotrek
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-18647
>>>>
>>>> pt., 17 lip 2020 o 10:30 Flavio Pompermaier <pomperma...@okkam.it>
>>>> napisał(a):
>>>>
>>>>> Hi to all,
>>>>> I was trying to port another job we have that use dataset API to
>>>>> datastream.
>>>>> The legacy program was doing basically a
>>>>> dataset.mapPartition().reduce() so I tried to replicate this thing with a
>>>>>
>>>>>  final BasicTypeInfo<Double> columnType =
>>>>> BasicTypeInfo.DOUBLE_TYPE_INFO;
>>>>>   final DataStream<Row> input = env.fromElements(//
>>>>>         Row.of(1.0), //
>>>>>         Row.of(2.0), //
>>>>>         Row.of(3.0), //
>>>>>         Row.of(5.0), //
>>>>>         Row.of(6.0)).returns(new RowTypeInfo(columnType));
>>>>>  inputStream.map(new SubtaskIndexAssigner(columnType))
>>>>>         .keyBy(t -> t.f0)
>>>>>         .window(GlobalWindows.create())
>>>>>
>>>>> .trigger(PurgingTrigger.of(CountWithTimeoutTriggerPartition.of(Time.seconds(5),
>>>>> 100L))).
>>>>>         .process(..)
>>>>>
>>>>> Unfortunately the program exits before reaching the Process function
>>>>> (moreover I need to add another window + trigger after it before adding 
>>>>> the
>>>>> reduce function).
>>>>> Is there a way to do this with the DataStream API or should I still
>>>>> use DataSet API for the moment (when the batch will be fully supported)? I
>>>>> append to the footer all the code required to test the job.
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>> -----------------------------------------------------------------
>>>>>
>>>>> package org.apache.flink.stats.sketches;
>>>>>
>>>>> import org.apache.flink.api.common.functions.ReduceFunction;
>>>>> import org.apache.flink.api.common.functions.RichMapFunction;
>>>>> import org.apache.flink.api.common.state.ReducingState;
>>>>> import org.apache.flink.api.common.state.ReducingStateDescriptor;
>>>>> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
>>>>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>>>>> import org.apache.flink.api.common.typeutils.base.LongSerializer;
>>>>> import org.apache.flink.api.java.io.PrintingOutputFormat;
>>>>> import org.apache.flink.api.java.tuple.Tuple2;
>>>>> import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
>>>>> import org.apache.flink.api.java.typeutils.RowTypeInfo;
>>>>> import org.apache.flink.api.java.typeutils.TupleTypeInfo;
>>>>> import org.apache.flink.configuration.Configuration;
>>>>> import org.apache.flink.streaming.api.TimeCharacteristic;
>>>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>>>> import
>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>>> import
>>>>> org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
>>>>> import
>>>>> org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
>>>>> import org.apache.flink.streaming.api.windowing.time.Time;
>>>>> import
>>>>> org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
>>>>> import org.apache.flink.streaming.api.windowing.triggers.Trigger;
>>>>> import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
>>>>> import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
>>>>> import org.apache.flink.types.Row;
>>>>> import org.apache.flink.util.Collector;
>>>>>
>>>>> public class Test {
>>>>>   public static void main(String[] args) throws Exception {
>>>>>     StreamExecutionEnvironment env =
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>>>>>     env.setParallelism(1);
>>>>>
>>>>>     final BasicTypeInfo<Double> columnType =
>>>>> BasicTypeInfo.DOUBLE_TYPE_INFO;
>>>>>     final DataStream<Row> input = env.fromElements(//
>>>>>         Row.of(1.0), //
>>>>>         Row.of(2.0), //
>>>>>         Row.of(3.0), //
>>>>>         Row.of(5.0), //
>>>>>         Row.of(6.0)).returns(new RowTypeInfo(columnType));
>>>>>     final DataStream<Row> out = input.map(new
>>>>> SubtaskIndexAssigner(columnType))//
>>>>>         .keyBy(t -> t.f0)//
>>>>>         .window(GlobalWindows.create())
>>>>>         .trigger(PurgingTrigger.of(new
>>>>> CountWithTimeoutTriggerPartition(Time.seconds(5), 100L)))
>>>>>         .process(new ProcessWindowFunction<Tuple2<Integer, Row>, Row,
>>>>> Integer, GlobalWindow>() {
>>>>>
>>>>>           @Override
>>>>>           public void process(Integer key,
>>>>>               ProcessWindowFunction<Tuple2<Integer, Row>, Row,
>>>>> Integer, GlobalWindow>.Context context,
>>>>>               Iterable<Tuple2<Integer, Row>> it, Collector<Row> out)
>>>>> throws Exception {
>>>>>             for (Tuple2<Integer, Row> tuple : it) {
>>>>>               out.collect(Row.of(tuple.f1.getField(0).toString()));
>>>>>             }
>>>>>
>>>>>           }
>>>>>         }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO));
>>>>>     out.writeUsingOutputFormat(new PrintingOutputFormat<Row>());
>>>>>     env.execute();
>>>>>   }
>>>>>
>>>>>   private static final class SubtaskIndexAssigner extends
>>>>> RichMapFunction<Row, Tuple2<Integer, Row>>
>>>>>       implements ResultTypeQueryable<Tuple2<Integer, Row>> {
>>>>>     private static final long serialVersionUID = 1L;
>>>>>
>>>>>     private int myTaskId;
>>>>>     private TypeInformation<?> columnType;
>>>>>
>>>>>     public SubtaskIndexAssigner(TypeInformation<?> columnType) {
>>>>>       this.columnType = columnType;
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public void open(Configuration parameters) throws Exception {
>>>>>       this.myTaskId = getRuntimeContext().getIndexOfThisSubtask();
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public Tuple2<Integer, Row> map(Row row) throws Exception {
>>>>>       return Tuple2.of(myTaskId, row);
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public TypeInformation<Tuple2<Integer, Row>> getProducedType() {
>>>>>       return new TupleTypeInfo<Tuple2<Integer,
>>>>> Row>>(BasicTypeInfo.INT_TYPE_INFO,
>>>>>           new RowTypeInfo(columnType));
>>>>>     }
>>>>>   }
>>>>>
>>>>>   private static class CountWithTimeoutTriggerPartition
>>>>>       extends Trigger<Tuple2<Integer, Row>, GlobalWindow> {
>>>>>
>>>>>     private static final long serialVersionUID = 1L;
>>>>>     private final long maxCount;
>>>>>     private final long maxTime;
>>>>>
>>>>>     private final ReducingStateDescriptor<Long> countstateDesc =
>>>>>         new ReducingStateDescriptor<>("count", new Sum(),
>>>>> LongSerializer.INSTANCE);
>>>>>     private final ReducingStateDescriptor<Long> timestateDesc =
>>>>>         new ReducingStateDescriptor<>("fire-time", new Min(),
>>>>> LongSerializer.INSTANCE);
>>>>>
>>>>>     public CountWithTimeoutTriggerPartition(long maxTime, long
>>>>> maxCount) {
>>>>>       this.maxCount = maxCount;
>>>>>       this.maxTime = maxTime;
>>>>>     }
>>>>>
>>>>>     public CountWithTimeoutTriggerPartition(Time maxTime, long
>>>>> maxCount) {
>>>>>       this(maxTime.toMilliseconds(), maxCount);
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public TriggerResult onElement(Tuple2<Integer, Row> element, long
>>>>> timestamp,
>>>>>         GlobalWindow window,
>>>>>
>>>>> org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
>>>>> ctx)
>>>>>         throws Exception {
>>>>>
>>>>>       ReducingState<Long> fireTimestamp =
>>>>> ctx.getPartitionedState(timestateDesc);
>>>>>
>>>>>       timestamp = ctx.getCurrentProcessingTime();
>>>>>
>>>>>       if (fireTimestamp.get() == null) {
>>>>>         long start = timestamp - (timestamp % maxTime);
>>>>>         long nextFireTimestamp = start + maxTime;
>>>>>
>>>>>         ctx.registerProcessingTimeTimer(nextFireTimestamp);
>>>>>
>>>>>         fireTimestamp.add(nextFireTimestamp);
>>>>>         return TriggerResult.CONTINUE;
>>>>>       }
>>>>>       ReducingState<Long> count =
>>>>> ctx.getPartitionedState(countstateDesc);
>>>>>       count.add(1L);
>>>>>       if (count.get() >= maxCount) {
>>>>>         count.clear();
>>>>>         fireTimestamp.clear();
>>>>>         return TriggerResult.FIRE_AND_PURGE;
>>>>>       }
>>>>>       return TriggerResult.CONTINUE;
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public TriggerResult onProcessingTime(long time, GlobalWindow
>>>>> window, TriggerContext ctx)
>>>>>         throws Exception {
>>>>>       ReducingState<Long> fireTimestamp =
>>>>> ctx.getPartitionedState(timestateDesc);
>>>>>       ReducingState<Long> count =
>>>>> ctx.getPartitionedState(countstateDesc);
>>>>>       if (fireTimestamp.get().equals(time)) {
>>>>>         count.clear();
>>>>>         fireTimestamp.clear();
>>>>>         fireTimestamp.add(time + maxTime);
>>>>>         ctx.registerProcessingTimeTimer(time + maxTime);
>>>>>         return TriggerResult.FIRE_AND_PURGE;
>>>>>       }
>>>>>       return TriggerResult.CONTINUE;
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public TriggerResult onEventTime(@SuppressWarnings("unused") long
>>>>> time,
>>>>>         @SuppressWarnings("unused") GlobalWindow window,
>>>>>         @SuppressWarnings("unused") TriggerContext ctx) throws
>>>>> Exception {
>>>>>       return TriggerResult.CONTINUE;
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public void clear(GlobalWindow window, TriggerContext ctx) throws
>>>>> Exception {
>>>>>       ReducingState<Long> fireTimestamp =
>>>>> ctx.getPartitionedState(timestateDesc);
>>>>>       long timestamp = fireTimestamp.get();
>>>>>       ctx.deleteProcessingTimeTimer(timestamp);
>>>>>       fireTimestamp.clear();
>>>>>       ctx.getPartitionedState(countstateDesc).clear();
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public boolean canMerge() {
>>>>>       return true;
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public void onMerge(GlobalWindow window, OnMergeContext ctx) {
>>>>>       ctx.mergePartitionedState(countstateDesc);
>>>>>       ctx.mergePartitionedState(timestateDesc);
>>>>>     }
>>>>>
>>>>>     class Sum implements ReduceFunction<Long> {
>>>>>       private static final long serialVersionUID = 1L;
>>>>>
>>>>>       @Override
>>>>>       public Long reduce(Long value1, Long value2) throws Exception {
>>>>>         return value1 + value2;
>>>>>       }
>>>>>     }
>>>>>
>>>>>     class Min implements ReduceFunction<Long> {
>>>>>       private static final long serialVersionUID = 1L;
>>>>>
>>>>>       @Override
>>>>>       public Long reduce(Long value1, Long value2) throws Exception {
>>>>>         return Math.min(value1, value2);
>>>>>       }
>>>>>     }
>>>>>   }
>>>>>
>>>>> }
>>>>>
>>>>
>>

Reply via email to