Hi Nick,

You need to use another method to add sink to your job - sinkTo.
KinesisStreamsSink implements newer Sink interface, while addSink expect
old SinkFunction. You can see this by looking at method signatures[1] and
in usage examples in documentation[2]

[1]
https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/data_stream.py#L811-L837
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/#kinesis-streams-sink

Best,
Aleksandr


On Thu, 23 May 2024 at 17:19, Nick Hecht <nick.he...@zainartech.com> wrote:

> Hello,
>
> I am currently having issues trying to use the python flink 1.18
> Datastream api with the Amazon Kinesis Data Streams Connector.
>
> From the documentation
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kinesis/
>  I have downloaded the "flink-connector-kinesis" jar
> https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kinesis/4.2.0-1.18/flink-sql-connector-kinesis-4.2.0-1.18.jar
>
> and i have added it in my code:
>
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
> env.enable_checkpointing(5000)
>
> env.add_jars(
>     "file:///workspaces/flink/flink-sql-connector-kinesis-4.2.0-1.18.jar",
> )
>
> and it has worked perfectly so far when setting up my kinesis source,  i
> recently added a kinesis sink to complete my pipeline (was testing with
> print before)
>
> # ds = ds.print() sink = KinesisStreamsSink.builder() \
>     .set_kinesis_client_properties(config) \
>     .set_serialization_schema(SimpleStringSchema()) \
>     .set_partition_key_generator(PartitionKeyGenerator.fixed()) \
>     .set_stream_name(stream_name) \
>     .build()
>
> ds = ds.add_sink(sink)
>
> s_env.execute('pipeline')
>
> now when i run my python flink application it throws an error at my
> add_sink call with the following exception:
>
> > python locations_flink_app.py
>
> 2024-05-23 14:53:10,219 - INFO -
> apache_beam.typehints.native_type_compatibility - 315 - Using Any for
> unsupported type: typing.Sequence[~T]
> 2024-05-23 14:53:10,287 - INFO - apache_beam.io.gcp.bigquery - 436 - No
> module named google.cloud.bigquery_storage_v1. As a result, the
> ReadFromBigQuery transform *CANNOT* be used with `method=DIRECT_READ`.
> Traceback (most recent call last):
>   File "locations_flink_app.py", line 90, in <module>
>     setup_flink_app(s_env, props)
>   File "locations_flink_app.py", line 71, in setup_flink_app
>     ds = ds.add_sink(create_sink(
>   File
> "/usr/local/lib/python3.8/site-packages/pyflink/datastream/data_stream.py",
> line 819, in add_sink
>     return
> DataStreamSink(self._j_data_stream.addSink(sink_func.get_java_function()))
>   File "/usr/local/lib/python3.8/site-packages/py4j/java_gateway.py", line
> 1322, in __call__
>     return_value = get_return_value(
>   File
> "/usr/local/lib/python3.8/site-packages/pyflink/util/exceptions.py", line
> 146, in deco
>     return f(*a, **kw)
>   File "/usr/local/lib/python3.8/site-packages/py4j/protocol.py", line
> 330, in get_return_value
>     raise Py4JError(
> py4j.protocol.Py4JError: An error occurred while calling o245.addSink.
> Trace:
> org.apache.flink.api.python.shaded.py4j.Py4JException: Method
> addSink([class org.apache.flink.connector.kinesis.sink.KinesisStreamsSink])
> does not exist
>         at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:321)
>         at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:329)
>         at
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:274)
>         at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>         at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>         at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>         at java.base/java.lang.Thread.run(Thread.java:829)
>
> when i open the jar i downloaded
> (flink-sql-connector-kinesis-4.2.0-1.18.jar)   i can see it actually has
> the classes i need
> Downloads\flink-sql-connector-kinesis-4.2.0-1.18.zip\org\apache\flink\connector\kinesis\sink
> has  KinesisStreamsSink.class    [class
> org.apache.flink.connector.kinesis.sink.KinesisStreamsSink]
>
> If I remove the sink the source still works perfectly fine
> (FlinkKinesisConsumer),  but I don't understand what I'm missing. The jar
> I'm using should have everything.
>
> anyone else have similar issues?  or know what I might need to do?
>
>
> Thank you,
>
> Nick Hecht
>

Reply via email to