Hi to all, I was trying to port another job we have that use dataset API to datastream. The legacy program was doing basically a dataset.mapPartition().reduce() so I tried to replicate this thing with a
final BasicTypeInfo<Double> columnType = BasicTypeInfo.DOUBLE_TYPE_INFO; final DataStream<Row> input = env.fromElements(// Row.of(1.0), // Row.of(2.0), // Row.of(3.0), // Row.of(5.0), // Row.of(6.0)).returns(new RowTypeInfo(columnType)); inputStream.map(new SubtaskIndexAssigner(columnType)) .keyBy(t -> t.f0) .window(GlobalWindows.create()) .trigger(PurgingTrigger.of(CountWithTimeoutTriggerPartition.of(Time.seconds(5), 100L))). .process(..) Unfortunately the program exits before reaching the Process function (moreover I need to add another window + trigger after it before adding the reduce function). Is there a way to do this with the DataStream API or should I still use DataSet API for the moment (when the batch will be fully supported)? I append to the footer all the code required to test the job. Best, Flavio ----------------------------------------------------------------- package org.apache.flink.stats.sketches; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.java.io.PrintingOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.types.Row; import org.apache.flink.util.Collector; public class Test { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.setParallelism(1); final BasicTypeInfo<Double> columnType = BasicTypeInfo.DOUBLE_TYPE_INFO; final DataStream<Row> input = env.fromElements(// Row.of(1.0), // Row.of(2.0), // Row.of(3.0), // Row.of(5.0), // Row.of(6.0)).returns(new RowTypeInfo(columnType)); final DataStream<Row> out = input.map(new SubtaskIndexAssigner(columnType))// .keyBy(t -> t.f0)// .window(GlobalWindows.create()) .trigger(PurgingTrigger.of(new CountWithTimeoutTriggerPartition(Time.seconds(5), 100L))) .process(new ProcessWindowFunction<Tuple2<Integer, Row>, Row, Integer, GlobalWindow>() { @Override public void process(Integer key, ProcessWindowFunction<Tuple2<Integer, Row>, Row, Integer, GlobalWindow>.Context context, Iterable<Tuple2<Integer, Row>> it, Collector<Row> out) throws Exception { for (Tuple2<Integer, Row> tuple : it) { out.collect(Row.of(tuple.f1.getField(0).toString())); } } }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO)); out.writeUsingOutputFormat(new PrintingOutputFormat<Row>()); env.execute(); } private static final class SubtaskIndexAssigner extends RichMapFunction<Row, Tuple2<Integer, Row>> implements ResultTypeQueryable<Tuple2<Integer, Row>> { private static final long serialVersionUID = 1L; private int myTaskId; private TypeInformation<?> columnType; public SubtaskIndexAssigner(TypeInformation<?> columnType) { this.columnType = columnType; } @Override public void open(Configuration parameters) throws Exception { this.myTaskId = getRuntimeContext().getIndexOfThisSubtask(); } @Override public Tuple2<Integer, Row> map(Row row) throws Exception { return Tuple2.of(myTaskId, row); } @Override public TypeInformation<Tuple2<Integer, Row>> getProducedType() { return new TupleTypeInfo<Tuple2<Integer, Row>>(BasicTypeInfo.INT_TYPE_INFO, new RowTypeInfo(columnType)); } } private static class CountWithTimeoutTriggerPartition extends Trigger<Tuple2<Integer, Row>, GlobalWindow> { private static final long serialVersionUID = 1L; private final long maxCount; private final long maxTime; private final ReducingStateDescriptor<Long> countstateDesc = new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE); private final ReducingStateDescriptor<Long> timestateDesc = new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE); public CountWithTimeoutTriggerPartition(long maxTime, long maxCount) { this.maxCount = maxCount; this.maxTime = maxTime; } public CountWithTimeoutTriggerPartition(Time maxTime, long maxCount) { this(maxTime.toMilliseconds(), maxCount); } @Override public TriggerResult onElement(Tuple2<Integer, Row> element, long timestamp, GlobalWindow window, org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx) throws Exception { ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timestateDesc); timestamp = ctx.getCurrentProcessingTime(); if (fireTimestamp.get() == null) { long start = timestamp - (timestamp % maxTime); long nextFireTimestamp = start + maxTime; ctx.registerProcessingTimeTimer(nextFireTimestamp); fireTimestamp.add(nextFireTimestamp); return TriggerResult.CONTINUE; } ReducingState<Long> count = ctx.getPartitionedState(countstateDesc); count.add(1L); if (count.get() >= maxCount) { count.clear(); fireTimestamp.clear(); return TriggerResult.FIRE_AND_PURGE; } return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception { ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timestateDesc); ReducingState<Long> count = ctx.getPartitionedState(countstateDesc); if (fireTimestamp.get().equals(time)) { count.clear(); fireTimestamp.clear(); fireTimestamp.add(time + maxTime); ctx.registerProcessingTimeTimer(time + maxTime); return TriggerResult.FIRE_AND_PURGE; } return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(@SuppressWarnings("unused") long time, @SuppressWarnings("unused") GlobalWindow window, @SuppressWarnings("unused") TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public void clear(GlobalWindow window, TriggerContext ctx) throws Exception { ReducingState<Long> fireTimestamp = ctx.getPartitionedState(timestateDesc); long timestamp = fireTimestamp.get(); ctx.deleteProcessingTimeTimer(timestamp); fireTimestamp.clear(); ctx.getPartitionedState(countstateDesc).clear(); } @Override public boolean canMerge() { return true; } @Override public void onMerge(GlobalWindow window, OnMergeContext ctx) { ctx.mergePartitionedState(countstateDesc); ctx.mergePartitionedState(timestateDesc); } class Sum implements ReduceFunction<Long> { private static final long serialVersionUID = 1L; @Override public Long reduce(Long value1, Long value2) throws Exception { return value1 + value2; } } class Min implements ReduceFunction<Long> { private static final long serialVersionUID = 1L; @Override public Long reduce(Long value1, Long value2) throws Exception { return Math.min(value1, value2); } } } }