Hi Marjan, The method `collect` is used to collect the content of a table. However, as `insert_statement` is a `INSERT INTO` statement and so there is no table to collect from in your example. You could try the following code: ``` sql_statement = """ SELECT window_start, window_end, COUNT(*) as how_any FROM TABLE( TUMBLE(TABLE source, DESCRIPTOR(ts), INTERVAL '2' SECONDS)) GROUP BY window_start, window_end """
with t_env.execute_sql(sql_statement).collect() as results: for res in results: print(res) ``` Regards, Dian On Thu, Apr 14, 2022 at 6:24 PM Marjan Jordanovski <mjmarjan1...@gmail.com> wrote: > Hello, > > I have a simple source table (which is using kafka connector) that's > reading and storing data from specific kafka topic. I also have print > table: > >> t_env.execute_sql(""" >> CREATE TABLE print ( >> window_start TIMESTAMP(3), >> window_end TIMESTAMP(3), >> how_any BIGINT >> ) WITH ( >> 'connector' = 'print' >> ) >> """) > > > insert_statement = """ >> INSERT INTO print ( >> SELECT window_start, window_end, COUNT(*) as how_any >> FROM TABLE( >> TUMBLE(TABLE source, DESCRIPTOR(ts), INTERVAL '2' SECONDS)) >> GROUP BY window_start, window_end >> ) >> """ > > Now in order to print from print table, I used: > >> with t_env.execute_sql(insert_statement).collect() as results: >> for res in results: >> print(res) > > > But for loop here is useless, as program never gets there. I am seeing > outputs from print table in my stdout, but program will run infinitely, > because (I guess) all the outputs that I see are generated in collect() > process. > > Is there a way to interrupt collect() here? I wanted to stop the program > when for example res contains '2023' string, but I couldn't as > program never gets to for loop. > I tried with print() and wait() but same results (when grabbing from kafka > topic, program never gets to for loop). > > Thank you, > Marjan >