Hello,
I have a simple source table (which is using kafka connector) that's
reading and storing data from specific kafka topic. I also have print
table:
> t_env.execute_sql("""
> CREATE TABLE print (
> window_start TIMESTAMP(3),
> window_end TIMESTAMP(3),
> how_any BIGINT
> ) WITH (
> 'connector' = 'print'
> )
> """)
insert_statement = """
> INSERT INTO print (
> SELECT window_start, window_end, COUNT(*) as how_any
> FROM TABLE(
> TUMBLE(TABLE source, DESCRIPTOR(ts), INTERVAL '2' SECONDS))
> GROUP BY window_start, window_end
> )
> """
Now in order to print from print table, I used:
> with t_env.execute_sql(insert_statement).collect() as results:
> for res in results:
> print(res)
But for loop here is useless, as program never gets there. I am seeing
outputs from print table in my stdout, but program will run infinitely,
because (I guess) all the outputs that I see are generated in collect()
process.
Is there a way to interrupt collect() here? I wanted to stop the program
when for example res contains '2023' string, but I couldn't as
program never gets to for loop.
I tried with print() and wait() but same results (when grabbing from kafka
topic, program never gets to for loop).
Thank you,
Marjan