Never mind. The code is correct; the input test data was not. All is well.
FWIW, it’s useful while debugging to select the results of the time function
itself:
String query = "SELECT lastTry, LOCALTIMESTAMP, TIMESTAMPDIFF(MINUTE, lastTry,
LOCALTIMESTAMP) from " + rawTable +
" WHERE (TIMESTAMPDIFF(MINUTE, lastTry, LOCALTIMESTAMP) > 30)";
logger.debug("Query: " + query);
Table qTable = tableEnv.sqlQuery(query);
TupleTypeInfo<Tuple3<Timestamp,Timestamp, Integer>> typeInfoTs =
new TupleTypeInfo<>( Types.SQL_TIMESTAMP(), Types.SQL_TIMESTAMP(),
Types.INT());
tableEnv.toAppendStream(qTable, typeInfoTs)
.process(new ProcessFunction<Tuple3<Timestamp,Timestamp, Integer>,
Tuple3<Timestamp,Timestamp, Integer>>() {
@Override
public void processElement(Tuple3<Timestamp, Timestamp, Integer> t,
Context context,
Collector<Tuple3<Timestamp,Timestamp,
Integer>> collector) throws Exception {
logger.debug("QR: " + t);
collector.collect(t);
}
})
.addSink(new DiscardingSink<>());
env.execute();
19/12/19 17:50:37 DEBUG manual.KafkaRRConsumerTest: QR: (2019-12-19
16:32:40.58,2019-12-19 17:50:37.955,77)
19/12/19 17:50:46 DEBUG manual.KafkaRRConsumerTest: QR: (2019-12-19
16:42:40.58,2019-12-19 17:50:46.955,68)
19/12/19 17:50:55 DEBUG manual.KafkaRRConsumerTest: QR: (2019-12-19
16:52:40.58,2019-12-19 17:50:55.958,58)
On 2019/12/19 21:41:17, Cindy McMullen <[email protected]> wrote:
> This code runs and returns the correct result on the initial query, but fails
> to trigger as data continues to stream in on Kafka. Is there anything
> obvious I’m missing?>
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);>
> tableEnv = StreamTableEnvironment.create(env);>
>
> // Consume RedactionResults from Kafka into DataStream>
> DataStream<RedactionResult> rrStream =>
> env.addSource(kafkaConsumer, "Kafka source for topic: " + getTopic());>
> Table rawTable = tableEnv.fromDataStream(rrStream, "lastTry, pid, tid,
> status, UserActionTime.proctime");>
> rawTable.printSchema();>
>
> // This works on initial query, but fails to trigger afterwards.>
> String query = "SELECT UserActionTime, lastTry, LOCALTIMESTAMP from " +
> rawTable +>
> " WHERE (TIMESTAMPDIFF(MINUTE, lastTry, LOCALTIMESTAMP) > 1)";>
> logger.debug("Query: " + query);>
>
> Table qTable = tableEnv.sqlQuery(query);>
>
>