Never mind. The code is correct; the input test data was not. All is well.
FWIW, it’s useful while debugging to select the results of the time function itself: String query = "SELECT lastTry, LOCALTIMESTAMP, TIMESTAMPDIFF(MINUTE, lastTry, LOCALTIMESTAMP) from " + rawTable + " WHERE (TIMESTAMPDIFF(MINUTE, lastTry, LOCALTIMESTAMP) > 30)"; logger.debug("Query: " + query); Table qTable = tableEnv.sqlQuery(query); TupleTypeInfo<Tuple3<Timestamp,Timestamp, Integer>> typeInfoTs = new TupleTypeInfo<>( Types.SQL_TIMESTAMP(), Types.SQL_TIMESTAMP(), Types.INT()); tableEnv.toAppendStream(qTable, typeInfoTs) .process(new ProcessFunction<Tuple3<Timestamp,Timestamp, Integer>, Tuple3<Timestamp,Timestamp, Integer>>() { @Override public void processElement(Tuple3<Timestamp, Timestamp, Integer> t, Context context, Collector<Tuple3<Timestamp,Timestamp, Integer>> collector) throws Exception { logger.debug("QR: " + t); collector.collect(t); } }) .addSink(new DiscardingSink<>()); env.execute(); 19/12/19 17:50:37 DEBUG manual.KafkaRRConsumerTest: QR: (2019-12-19 16:32:40.58,2019-12-19 17:50:37.955,77) 19/12/19 17:50:46 DEBUG manual.KafkaRRConsumerTest: QR: (2019-12-19 16:42:40.58,2019-12-19 17:50:46.955,68) 19/12/19 17:50:55 DEBUG manual.KafkaRRConsumerTest: QR: (2019-12-19 16:52:40.58,2019-12-19 17:50:55.958,58) On 2019/12/19 21:41:17, Cindy McMullen <c...@oracle.com> wrote: > This code runs and returns the correct result on the initial query, but fails > to trigger as data continues to stream in on Kafka. Is there anything > obvious I’m missing?> > > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);> > tableEnv = StreamTableEnvironment.create(env);> > > // Consume RedactionResults from Kafka into DataStream> > DataStream<RedactionResult> rrStream => > env.addSource(kafkaConsumer, "Kafka source for topic: " + getTopic());> > Table rawTable = tableEnv.fromDataStream(rrStream, "lastTry, pid, tid, > status, UserActionTime.proctime");> > rawTable.printSchema();> > > // This works on initial query, but fails to trigger afterwards.> > String query = "SELECT UserActionTime, lastTry, LOCALTIMESTAMP from " + > rawTable +> > " WHERE (TIMESTAMPDIFF(MINUTE, lastTry, LOCALTIMESTAMP) > 1)";> > logger.debug("Query: " + query);> > > Table qTable = tableEnv.sqlQuery(query);> > >