Never mind.  The code is correct; the input test data was not. All is well.

 FWIW, it’s useful while debugging to select the results of the time function 
itself:

String query = "SELECT lastTry, LOCALTIMESTAMP, TIMESTAMPDIFF(MINUTE, lastTry, 
LOCALTIMESTAMP) from " + rawTable +
    " WHERE (TIMESTAMPDIFF(MINUTE, lastTry, LOCALTIMESTAMP) > 30)";
logger.debug("Query: " + query);

Table qTable = tableEnv.sqlQuery(query);

TupleTypeInfo<Tuple3<Timestamp,Timestamp, Integer>> typeInfoTs =
    new TupleTypeInfo<>( Types.SQL_TIMESTAMP(), Types.SQL_TIMESTAMP(), 
Types.INT());
tableEnv.toAppendStream(qTable, typeInfoTs)
    .process(new ProcessFunction<Tuple3<Timestamp,Timestamp, Integer>, 
Tuple3<Timestamp,Timestamp, Integer>>() {
      @Override
      public void processElement(Tuple3<Timestamp, Timestamp, Integer> t, 
Context context,
                                 Collector<Tuple3<Timestamp,Timestamp, 
Integer>> collector) throws Exception {
        logger.debug("QR: " + t);
        collector.collect(t);
      }
    })
.addSink(new DiscardingSink<>());
env.execute();
19/12/19 17:50:37 DEBUG manual.KafkaRRConsumerTest: QR: (2019-12-19 
16:32:40.58,2019-12-19 17:50:37.955,77)
19/12/19 17:50:46 DEBUG manual.KafkaRRConsumerTest: QR: (2019-12-19 
16:42:40.58,2019-12-19 17:50:46.955,68)
19/12/19 17:50:55 DEBUG manual.KafkaRRConsumerTest: QR: (2019-12-19 
16:52:40.58,2019-12-19 17:50:55.958,58)


On 2019/12/19 21:41:17, Cindy McMullen <c...@oracle.com> wrote: 
> This code runs and returns the correct result on the initial query, but fails 
> to trigger as data continues to stream in on Kafka.  Is there anything 
> obvious I’m missing?> 
> 
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);> 
> tableEnv = StreamTableEnvironment.create(env);> 
> 
> // Consume RedactionResults from Kafka into DataStream> 
> DataStream<RedactionResult> rrStream => 
>     env.addSource(kafkaConsumer, "Kafka source for topic: " + getTopic());> 
>  Table rawTable = tableEnv.fromDataStream(rrStream, "lastTry, pid, tid, 
> status, UserActionTime.proctime");> 
> rawTable.printSchema();> 
> 
> // This works on initial query, but fails to trigger afterwards.> 
> String query = "SELECT UserActionTime, lastTry, LOCALTIMESTAMP from " + 
> rawTable +> 
>     " WHERE (TIMESTAMPDIFF(MINUTE, lastTry, LOCALTIMESTAMP) > 1)";> 
> logger.debug("Query: " + query);> 
> 
> Table qTable = tableEnv.sqlQuery(query);> 
> 
> 

Reply via email to