Re: Pyflink data stream API to Table API conversion with multiple sinks.

2021-10-08 Thread Dian Fu
Hi Kamil,

I guess `statement_set.execute()` should be enough. You could also check
whether the job graph is expected via one of the following ways:
- Call `print(statement_set.explain())`
- Check the Flink web ui to see the job graph of the running job

For your problems, could you double check whether the FileSystem sinks were
well configured. You could refer to [1] for more details.

Regards,
Dian

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/filesystem/#rolling-policy


On Fri, Oct 8, 2021 at 8:47 PM Kamil ty  wrote:

> Hello,
> In my pyflink job I have such flow:
>
> 1. Use table API to get messages from Kafka
> 2. Convert the table to a data stream
> 3. Convert the data stream back to the table API
> 4. Use a statement set to write the data to two filesystem sinks (avro and
> parquet)
>
> I'm able to run the job and everything seems to be working but the files
> are not filling with data and are stuck in progress.
>
> I'm suspecting that I'm doing something wrong with how i run .execute().
>
> Currently at the end of my script I use:
> statement_set.execute()
> streaming_environment.execute("My job")
>
> My question is what would be the correct way to run a job with the flow
> specified. I can share the code if needed.
>
> I would appreciate any help.
>
> Kind regards
> Kamil
>


Pyflink data stream API to Table API conversion with multiple sinks.

2021-10-08 Thread Kamil ty
Hello,
In my pyflink job I have such flow:

1. Use table API to get messages from Kafka
2. Convert the table to a data stream
3. Convert the data stream back to the table API
4. Use a statement set to write the data to two filesystem sinks (avro and
parquet)

I'm able to run the job and everything seems to be working but the files
are not filling with data and are stuck in progress.

I'm suspecting that I'm doing something wrong with how i run .execute().

Currently at the end of my script I use:
statement_set.execute()
streaming_environment.execute("My job")

My question is what would be the correct way to run a job with the flow
specified. I can share the code if needed.

I would appreciate any help.

Kind regards
Kamil