Hi Phil,

Your codes look good. I mean how do you run the python script. Maybe you
are using flink cli? i.e. run commands like ` flink run -t .. -py job.py -j
/path/to/flink-sql-kafka-connector.jar`. If that's the case, the `-j
/path/to/flink-sql-kafka-connector.jar` is necessary so that in client
side, flink can generate the job graph successfully.

As for the second question, in your case, yes, your client’s env need to
have the flink-sql-kafka-connector JAR. You can check the doc
<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/overview/#deployment-modes>
for
more details. In short, your yaml shows that you are using the session
mode, which needs connector jar to generate job graph in the client side.


Best,
Biao Geng

Phil Stavridis <phi...@gmail.com> 于2024年4月10日周三 18:14写道:

> Hi Biao,
>
> For submitting the job, I run t_env.execute_sql.
> Shouldn’t that be sufficient for submitting the job using the Table API
> with PyFlink? Isn’t that the recommended way for submitting and running
> PyFlink jobs on a running Flink cluster? The Flink cluster runs without
> issues, but there is no Running Job when the container flink_app, that has
> the client code, runs.
> I could submit a job, collect, using the Flink SQL console, and it was
> available on the Running Jobs tab, but that was also not ingesting data
> from the Kafka topic.
> Does my client’s env need to have the flink-sql-kafka-connector JAR? Is
> that needed just in the class path of the Flink JobManager and TaskManager?
>
> Kind regards
> Phil
>
>
> On 10 Apr 2024, at 03:33, Biao Geng <biaoge...@gmail.com> wrote:
>
> Hi Phil,
>
> Thanks for sharing the detailed information of the job. For you question,
> how to you submit the job? After applying your yaml file, I think you will
> successfully launch a flink cluster with 1 JM and 1 TM. Then you would
> submit the pyflink job to the flink cluster. As the error you showed is the
> client-side error, it is possible that your client's env does not contain
> the flink-sql-kafka-connector jar which may lead to the exception.
> By the way, the "Table API" codes in your mail is actually using the flink
> SQL API, so the flink-sql-kafka-connector jar is required, which is exactly
> what you have prepared. For pyflink's table API, you can have a look at
> this document:
> https://nightlies.apache.org/flink/flink-docs-master/api/python/examples/table/word_count.html
>
> Best,
> Biao Geng
>
>
> Phil Stavridis <phi...@gmail.com> 于2024年4月10日周三 03:10写道:
>
>> Hello,
>>
>> I have set up Flink and Kafka containers using docker-compose, for
>> testing how Flink works for processing Kafka messages.
>> I primarily want to check how the Table API works but also how the Stream
>> API would process the Kafka messages.
>>
>> I have included the main part of the docker-compose.yaml file I am using
>> for the Flink cluster. I have commented out some of the JAR files I have
>> tried out but I have mainly used the flink-sql-connector-kafka JAR. I would
>> like to confirm if I am using any wrong configurations, which JARs should I
>> be using for each API and if just need to use one JAR for both Table and
>> Datastream APIs?
>> I have also included the Flink client module I have used for both the
>> Table and the Datastream APIs and the error messages.
>> Any idea what is missing or if there is any configuration that seems
>> wrong?
>>
>> docker-compose.yml
>> flink_jobmanager:
>> image: flink:1.18.1-scala_2.12
>> container_name: flink_jobmanager
>> ports:
>> - "8081:8081"
>> command: jobmanager
>> volumes:
>> -
>> ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar
>> # -
>> ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar
>> # -
>> ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar
>> # -
>> ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar
>> environment:
>> - |
>> FLINK_PROPERTIES=
>> jobmanager.rpc.address: jobmanager
>> networks:
>> - standard
>> depends_on:
>> - kafka
>>
>> flink_taskmanager:
>> image: flink:1.18.1-scala_2.12
>> container_name: flink_taskmanager
>> volumes:
>> -
>> ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar
>> # -
>> ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar
>> # -
>> ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar
>> # -
>> ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar
>> depends_on:
>> - kafka
>> - jobmanager
>> command: taskmanager
>> environment:
>> - |
>> FLINK_PROPERTIES=
>> jobmanager.rpc.address: jobmanager
>> taskmanager.numberOfTaskSlots: 2
>> networks:
>> - standard
>> Table API:
>> def process_table():
>>
>> print('Running the Table job now.’)
>> env_settings = (
>> EnvironmentSettings
>> .new_instance()
>> .in_streaming_mode()
>> # .with_configuration(config)
>> .build()
>> )
>> t_env = TableEnvironment.create(env_settings)
>>
>> t_env.execute_sql(
>> f"""
>> CREATE TABLE kafka_test_logs (
>> action2 STRING
>> ) WITH (
>> 'connector' = 'kafka',
>> 'topic' = 'test_logs',
>> 'properties.bootstrap.servers' = 'kafka:9092',
>> 'properties.group.id' = 'flink_group',
>> 'scan.startup.mode' = 'earliest-offset',
>> 'format' = 'json',
>> 'json.ignore-parse-errors' = 'true'
>> )
>> """)
>>
>>
>> t_env.execute_sql("""
>> SELECT COUNT(*) AS message_count
>> FROM kafka_test_logs
>> """).print()
>>
>> print('Table job has now completed.')
>>
>>
>> if __name__ == "__main__":
>> process_table()
>> error:
>> link_app | Caused by: org.apache.flink.table.api.ValidationException:
>> Could not find any factory for identifier 'kafka' that implements
>> 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
>> flink_app |
>> flink_app | Available factory identifiers are:
>> flink_app |
>> flink_app | blackhole
>> flink_app | datagen
>> flink_app | filesystem
>> flink_app | print
>> flink_app | python-input-format
>> The '/opt/flink/lib' has the JARs from both the image and the JARs i
>> added in the docker-compose.yml file.
>>
>> Stream API:
>> def process_stream():
>> env = StreamExecutionEnvironment.get_execution_environment()
>>
>> properties = {
>> "bootstrap.servers": 'kafka:9092',
>> "group.id": "test_group"
>> }
>>
>> kafka_consumer = FlinkKafkaConsumer(
>> topics='test_logs',
>> deserialization_schema=SimpleStringSchema(),
>> properties=properties)
>>
>> data_stream = env.add_source(kafka_consumer)
>>
>> parsed_stream = data_stream.map(lambda x: json.loads(x),
>> output_type=Types.ROW([Types.STRING()]))
>> parsed_stream.print()
>>
>> env.execute("Kafka DataStream Test”)
>>
>> if __name__ == "__main__":
>> process_stream()
>> error:
>> TypeError: Could not found the Java class
>> 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'. The Java
>> dependencies could be specified via command line argument '--jarfile' or
>> the config option 'pipeline.jars'
>>
>> I have used the JAR dependencies based on:
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/configuration/overview/#flink-apis
>>
>> Kind regards
>> Phil
>>
>
>

Reply via email to