Hi Marjan,

The method `collect` is used to collect the content of a table. However, as
`insert_statement` is a `INSERT INTO` statement and so there is no table to
collect from in your example. You could try the following code:
```
sql_statement = """
        SELECT window_start, window_end, COUNT(*) as how_any
        FROM TABLE(
            TUMBLE(TABLE source, DESCRIPTOR(ts), INTERVAL '2' SECONDS))
        GROUP BY window_start, window_end
    """

with t_env.execute_sql(sql_statement).collect() as results:
    for res in results:
        print(res)
```

Regards,
Dian


On Thu, Apr 14, 2022 at 6:24 PM Marjan Jordanovski <mjmarjan1...@gmail.com>
wrote:

> Hello,
>
> I have a simple source table (which is using kafka connector) that's
> reading and storing data from specific kafka topic. I also have print
> table:
>
>> t_env.execute_sql("""
>>         CREATE TABLE print (
>>             window_start TIMESTAMP(3),
>>             window_end TIMESTAMP(3),
>>             how_any BIGINT
>>         ) WITH (
>>             'connector' = 'print'
>>         )
>>     """)
>
>
>     insert_statement = """
>>     INSERT INTO print (
>>         SELECT window_start, window_end, COUNT(*) as how_any
>>         FROM TABLE(
>>             TUMBLE(TABLE source, DESCRIPTOR(ts), INTERVAL '2' SECONDS))
>>         GROUP BY window_start, window_end
>>     )
>>     """
>
> Now in order to print from print table, I used:
>
>> with t_env.execute_sql(insert_statement).collect() as results:
>>         for res in results:
>>                 print(res)
>
>
> But for loop here is useless, as program never gets there. I am seeing
> outputs from print table in my stdout, but program will run infinitely,
> because (I guess) all the outputs that I see are generated in collect()
> process.
>
> Is there a way to interrupt collect() here? I wanted to stop the program
> when for example res contains '2023' string, but I couldn't as
> program never gets to for loop.
> I tried with print() and wait() but same results (when grabbing from kafka
> topic, program never gets to for loop).
>
> Thank you,
> Marjan
>

Reply via email to