I wrote this simple test: .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) .trigger(PurgingTrigger.of(CountTrigger.of(5)))
Thinking that if I send 2 elements of data, it would collect them after a minute. But that doesn't seem to be happening. Is my understanding of how windows and triggers work correct? /** * To test this job first in command line make a simple server on a terminal * * nc -l 8889 * * Then start this job at the command line or in IDE. Then in the terminal input each value by typing a line of text. * Each line of text (excluding the new line) will be picked up by this job. */ public class TriggerTestJob { public static void main(String args[]) throws Exception { final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); streamEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); streamEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); streamEnv.socketTextStream("localhost", 8889) .map(value -> new Tuple2<String, String>("test", value)).returns(new TypeHint<Tuple2<String, String>>(){}) .keyBy((KeySelector<Tuple2<String, String>, String>) value -> value.f0) .window(TumblingProcessingTimeWindows.of(Time.minutes(1))) .trigger(PurgingTrigger.of(CountTrigger.of(5))) .process(new ProcessWindowFunction<Tuple2<String, String>, Tuple2<String, String>, String, TimeWindow>() { @Override public void process(String key, Context context, Iterable<Tuple2<String, String>> elements, Collector<Tuple2<String, String>> out) throws Exception { for (Tuple2<String, String> element : elements) { out.collect(element); } } }).name("trigger") .print(); streamEnv.execute("trigger test"); } }