I've had success using the Java in pyflink via pyflink.java_gateway.
Something like:

from pyflink.java_gateway import get_gateway
jvm = get_gateway()

# then perhaps something like:
FlinkKinesisConsumer = jvm.
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer

There also seems to be a nice java_utils.py
<https://github.com/apache/flink/blob/release-1.15.0/flink-python/pyflink/util/java_utils.py>
 with helpers that may uh, help.

Not sure if this will work, you might need to use the python env's a java
StreamTableEnvironment to do it?  Here's an example
<https://github.com/apache/flink/blob/release-1.15.0/flink-python/pyflink/datastream/stream_execution_environment.py#L922-L937>
of how the python StreamTableEnvironment calls out to the Java one.

BTW: I'm not an authority nor I have I really tried this, so take this
advice with a grain of salt!  :)

Good luck!






On Fri, Jun 24, 2022 at 9:06 AM John Tipper <john_tip...@hotmail.com> wrote:

> Hi all,
>
> There are a number of connectors which do not appear to be in the Python
> API v1.15.0, e.g. Kinesis. I can see that it's possible to use these
> connectors by using the Table API:
>
> CREATE TABLE my_table (...)
> WITH ('connector' = 'kinesis' ...)
>
>
> I guess if you wanted the stream as a DataStream you'd I guess you'd
> create the Table and then convert into a DataStream?
>
> Is there a way of directly instantiating these connectors in PyFlink
> without needed to use SQL like this (and without having to wait until
> v1.16)? e.g. the Java API looks like this:
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
>     "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
>
>
> Many thanks,
>
> John
>

Reply via email to