Hi to all,
I was trying to port another job we have that use dataset API to datastream.
The legacy program was doing basically a dataset.mapPartition().reduce() so
I tried to replicate this thing with a

 final BasicTypeInfo<Double> columnType = BasicTypeInfo.DOUBLE_TYPE_INFO;
  final DataStream<Row> input = env.fromElements(//
        Row.of(1.0), //
        Row.of(2.0), //
        Row.of(3.0), //
        Row.of(5.0), //
        Row.of(6.0)).returns(new RowTypeInfo(columnType));
 inputStream.map(new SubtaskIndexAssigner(columnType))
        .keyBy(t -> t.f0)
        .window(GlobalWindows.create())

.trigger(PurgingTrigger.of(CountWithTimeoutTriggerPartition.of(Time.seconds(5),
100L))).
        .process(..)

Unfortunately the program exits before reaching the Process function
(moreover I need to add another window + trigger after it before adding the
reduce function).
Is there a way to do this with the DataStream API or should I still use
DataSet API for the moment (when the batch will be fully supported)? I
append to the footer all the code required to test the job.

Best,
Flavio

-----------------------------------------------------------------

package org.apache.flink.stats.sketches;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.io.PrintingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

public class Test {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    env.setParallelism(1);

    final BasicTypeInfo<Double> columnType = BasicTypeInfo.DOUBLE_TYPE_INFO;
    final DataStream<Row> input = env.fromElements(//
        Row.of(1.0), //
        Row.of(2.0), //
        Row.of(3.0), //
        Row.of(5.0), //
        Row.of(6.0)).returns(new RowTypeInfo(columnType));
    final DataStream<Row> out = input.map(new
SubtaskIndexAssigner(columnType))//
        .keyBy(t -> t.f0)//
        .window(GlobalWindows.create())
        .trigger(PurgingTrigger.of(new
CountWithTimeoutTriggerPartition(Time.seconds(5), 100L)))
        .process(new ProcessWindowFunction<Tuple2<Integer, Row>, Row,
Integer, GlobalWindow>() {

          @Override
          public void process(Integer key,
              ProcessWindowFunction<Tuple2<Integer, Row>, Row, Integer,
GlobalWindow>.Context context,
              Iterable<Tuple2<Integer, Row>> it, Collector<Row> out) throws
Exception {
            for (Tuple2<Integer, Row> tuple : it) {
              out.collect(Row.of(tuple.f1.getField(0).toString()));
            }

          }
        }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO));
    out.writeUsingOutputFormat(new PrintingOutputFormat<Row>());
    env.execute();
  }

  private static final class SubtaskIndexAssigner extends
RichMapFunction<Row, Tuple2<Integer, Row>>
      implements ResultTypeQueryable<Tuple2<Integer, Row>> {
    private static final long serialVersionUID = 1L;

    private int myTaskId;
    private TypeInformation<?> columnType;

    public SubtaskIndexAssigner(TypeInformation<?> columnType) {
      this.columnType = columnType;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
      this.myTaskId = getRuntimeContext().getIndexOfThisSubtask();
    }

    @Override
    public Tuple2<Integer, Row> map(Row row) throws Exception {
      return Tuple2.of(myTaskId, row);
    }

    @Override
    public TypeInformation<Tuple2<Integer, Row>> getProducedType() {
      return new TupleTypeInfo<Tuple2<Integer,
Row>>(BasicTypeInfo.INT_TYPE_INFO,
          new RowTypeInfo(columnType));
    }
  }

  private static class CountWithTimeoutTriggerPartition
      extends Trigger<Tuple2<Integer, Row>, GlobalWindow> {

    private static final long serialVersionUID = 1L;
    private final long maxCount;
    private final long maxTime;

    private final ReducingStateDescriptor<Long> countstateDesc =
        new ReducingStateDescriptor<>("count", new Sum(),
LongSerializer.INSTANCE);
    private final ReducingStateDescriptor<Long> timestateDesc =
        new ReducingStateDescriptor<>("fire-time", new Min(),
LongSerializer.INSTANCE);

    public CountWithTimeoutTriggerPartition(long maxTime, long maxCount) {
      this.maxCount = maxCount;
      this.maxTime = maxTime;
    }

    public CountWithTimeoutTriggerPartition(Time maxTime, long maxCount) {
      this(maxTime.toMilliseconds(), maxCount);
    }

    @Override
    public TriggerResult onElement(Tuple2<Integer, Row> element, long
timestamp,
        GlobalWindow window,

org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
ctx)
        throws Exception {

      ReducingState<Long> fireTimestamp =
ctx.getPartitionedState(timestateDesc);

      timestamp = ctx.getCurrentProcessingTime();

      if (fireTimestamp.get() == null) {
        long start = timestamp - (timestamp % maxTime);
        long nextFireTimestamp = start + maxTime;

        ctx.registerProcessingTimeTimer(nextFireTimestamp);

        fireTimestamp.add(nextFireTimestamp);
        return TriggerResult.CONTINUE;
      }
      ReducingState<Long> count = ctx.getPartitionedState(countstateDesc);
      count.add(1L);
      if (count.get() >= maxCount) {
        count.clear();
        fireTimestamp.clear();
        return TriggerResult.FIRE_AND_PURGE;
      }
      return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, GlobalWindow window,
TriggerContext ctx)
        throws Exception {
      ReducingState<Long> fireTimestamp =
ctx.getPartitionedState(timestateDesc);
      ReducingState<Long> count = ctx.getPartitionedState(countstateDesc);
      if (fireTimestamp.get().equals(time)) {
        count.clear();
        fireTimestamp.clear();
        fireTimestamp.add(time + maxTime);
        ctx.registerProcessingTimeTimer(time + maxTime);
        return TriggerResult.FIRE_AND_PURGE;
      }
      return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(@SuppressWarnings("unused") long time,
        @SuppressWarnings("unused") GlobalWindow window,
        @SuppressWarnings("unused") TriggerContext ctx) throws Exception {
      return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(GlobalWindow window, TriggerContext ctx) throws
Exception {
      ReducingState<Long> fireTimestamp =
ctx.getPartitionedState(timestateDesc);
      long timestamp = fireTimestamp.get();
      ctx.deleteProcessingTimeTimer(timestamp);
      fireTimestamp.clear();
      ctx.getPartitionedState(countstateDesc).clear();
    }

    @Override
    public boolean canMerge() {
      return true;
    }

    @Override
    public void onMerge(GlobalWindow window, OnMergeContext ctx) {
      ctx.mergePartitionedState(countstateDesc);
      ctx.mergePartitionedState(timestateDesc);
    }

    class Sum implements ReduceFunction<Long> {
      private static final long serialVersionUID = 1L;

      @Override
      public Long reduce(Long value1, Long value2) throws Exception {
        return value1 + value2;
      }
    }

    class Min implements ReduceFunction<Long> {
      private static final long serialVersionUID = 1L;

      @Override
      public Long reduce(Long value1, Long value2) throws Exception {
        return Math.min(value1, value2);
      }
    }
  }

}

Reply via email to