I wrote this simple test:

.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.trigger(PurgingTrigger.of(CountTrigger.of(5)))

Thinking that if I send 2 elements of data, it would collect them after a
minute.
But that doesn't seem to be happening.

Is my understanding of how windows and triggers work correct?

/**
 * To test this job first in command line make a simple server on a terminal
 *
 * nc -l 8889
 *
 * Then start this job at the command line or in IDE. Then in the
terminal input each value by typing a line of text.
 * Each line of text (excluding the new line) will be picked up by this job.
 */
public class TriggerTestJob {

   public static void main(String args[]) throws Exception {
      final StreamExecutionEnvironment streamEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
      
streamEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
      streamEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
      streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

      streamEnv.socketTextStream("localhost", 8889)
            .map(value -> new Tuple2<String, String>("test",
value)).returns(new TypeHint<Tuple2<String, String>>(){})
            .keyBy((KeySelector<Tuple2<String, String>, String>) value
-> value.f0)
            .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
            .trigger(PurgingTrigger.of(CountTrigger.of(5)))
            .process(new ProcessWindowFunction<Tuple2<String, String>,
Tuple2<String, String>, String, TimeWindow>() {
               @Override
               public void process(String key, Context context,
Iterable<Tuple2<String, String>> elements, Collector<Tuple2<String,
String>> out) throws Exception {
                  for (Tuple2<String, String> element : elements) {
                     out.collect(element);
                  }
               }
            }).name("trigger")
            .print();

      streamEnv.execute("trigger test");
   }
}

Reply via email to