Hi Biao,

I will check out running with flink run, but should this be run in the Flink 
JobManager? Would that mean that the container for the Flink JobManager would 
require both Python installed and a copy of the flink_client.py module? Are 
there some examples of running flink run in a Dockerized application instead of 
the local CLI?
I have added the pipeline.jars in the flink_client module but it is still 
complaining about not finding kafka and the DynamicTable factory class in the 
class path. 
Thanks for your feedback.

Kind regards
Phil

> On 10 Apr 2024, at 16:21, Biao Geng <biaoge...@gmail.com> wrote:
> 
> Hi Phil,
> It should be totally ok to use `python -m flink_client.job`. It just seems to 
> me that the flink cli is being used more often.
> And yes, you also need to add the sql connector jar to the flink_client 
> container. After putting the jar in your client container, add codes like 
> `table_env.get_config().set("pipeline.jars", 
> "file:///path/in/container/to/connector.jar")` to your job.py. Then in the 
> client side, the flink can see the connector jar and the jar would be 
> uploaded to the cluster as well. See the doc of Jar Dependencies Management 
> <https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/python/dependency_management/>
>  for more details.
> 
> Best,
> Biao Geng
> 
> 
> Phil Stavridis <phi...@gmail.com <mailto:phi...@gmail.com>> 于2024年4月10日周三 
> 22:04写道:
>> Hi Biao,
>> 
>> 1. I have a Flink client container like this:
>> # Flink client
>> flink_client:
>> container_name: flink_client
>> image: flink-client:local
>> build:
>> context: .
>> dockerfile: flink_client/Dockerfile
>> networks:
>> - standard
>> depends_on:
>> - jobmanager
>> - Kafka
>> 
>> The flink_client/Dockerfile has this bash file which only runs the Flink 
>> client, when the Flink cluster is up an running:
>> Dockerfile:
>> ENTRYPOINT [“./run_when_ready.sh”]
>> 
>> run_when_ready.sh:
>> 
>> #!/bin/bash
>> 
>> wait_for_jobmanager() {
>> echo "Waiting for Flink JobManager to be ready..."
>> while ! nc -z jobmanager 8081; do
>> sleep 1
>> done
>> echo "Flink JobManager is ready."
>> }
>> 
>> # Call the function
>> wait_for_jobmanager
>> 
>> # Now run the PyFlink job
>> python -m flink_client.job
>> flink_client.job.py <http://flink_client.job.py/> is the file I attached 
>> earlier which has:
>> 
>>>>>> if __name__ == "__main__":
>>>>>>  process_table()
>> 
>> 
>> So it runs that file which should submit the Table job to the Flink cluster 
>> as that file runs:
>>>>>> t_env.execute_sql("""
>>>>>> SELECT COUNT(*) AS message_count
>>>>>> FROM kafka_test_logs
>>>>>> """).print()
>> 
>> 
>> I don’t use flink run -t …
>> Is this the wrong way to run PyFlink jobs?
>> 
>> Also, do I need to also add the flink-sql-connector.jar to the 
>> flink_client/Dockerfile, i.e. COPY /libs/*.jar /opt/flink/lib/ and instead 
>> of relying on the environment.execute_sql(…), I need to run it with 
>> flink_run -t and the JAR file as you mentioned earlier?
>> 
>> 
>>> On 10 Apr 2024, at 11:37, Biao Geng <biaoge...@gmail.com 
>>> <mailto:biaoge...@gmail.com>> wrote:
>>> 
>>> Hi Phil,
>>> 
>>> Your codes look good. I mean how do you run the python script. Maybe you 
>>> are using flink cli? i.e. run commands like ` flink run -t .. -py job.py -j 
>>> /path/to/flink-sql-kafka-connector.jar`. If that's the case, the `-j 
>>> /path/to/flink-sql-kafka-connector.jar` is necessary so that in client 
>>> side, flink can generate the job graph successfully.
>>> 
>>> As for the second question, in your case, yes, your client’s env need to 
>>> have the flink-sql-kafka-connector JAR. You can check the doc 
>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/overview/#deployment-modes>
>>>  for more details. In short, your yaml shows that you are using the session 
>>> mode, which needs connector jar to generate job graph in the client side.
>>> 
>>> 
>>> Best,
>>> Biao Geng
>>> 
>>> Phil Stavridis <phi...@gmail.com <mailto:phi...@gmail.com>> 于2024年4月10日周三 
>>> 18:14写道:
>>>> Hi Biao,
>>>> 
>>>> For submitting the job, I run t_env.execute_sql.
>>>> Shouldn’t that be sufficient for submitting the job using the Table API 
>>>> with PyFlink? Isn’t that the recommended way for submitting and running 
>>>> PyFlink jobs on a running Flink cluster? The Flink cluster runs without 
>>>> issues, but there is no Running Job when the container flink_app, that has 
>>>> the client code, runs. 
>>>> I could submit a job, collect, using the Flink SQL console, and it was 
>>>> available on the Running Jobs tab, but that was also not ingesting data 
>>>> from the Kafka topic.
>>>> Does my client’s env need to have the flink-sql-kafka-connector JAR? Is 
>>>> that needed just in the class path of the Flink JobManager and TaskManager?
>>>> 
>>>> Kind regards
>>>> Phil
>>>> 
>>>> 
>>>>> On 10 Apr 2024, at 03:33, Biao Geng <biaoge...@gmail.com 
>>>>> <mailto:biaoge...@gmail.com>> wrote:
>>>>> 
>>>>> Hi Phil,
>>>>> 
>>>>> Thanks for sharing the detailed information of the job. For you question, 
>>>>> how to you submit the job? After applying your yaml file, I think you 
>>>>> will successfully launch a flink cluster with 1 JM and 1 TM. Then you 
>>>>> would submit the pyflink job to the flink cluster. As the error you 
>>>>> showed is the client-side error, it is possible that your client's env 
>>>>> does not contain the flink-sql-kafka-connector jar which may lead to the 
>>>>> exception.
>>>>> By the way, the "Table API" codes in your mail is actually using the 
>>>>> flink SQL API, so the flink-sql-kafka-connector jar is required, which is 
>>>>> exactly what you have prepared. For pyflink's table API, you can have a 
>>>>> look at this document: 
>>>>> https://nightlies.apache.org/flink/flink-docs-master/api/python/examples/table/word_count.html
>>>>> 
>>>>> Best,
>>>>> Biao Geng
>>>>> 
>>>>> 
>>>>> Phil Stavridis <phi...@gmail.com <mailto:phi...@gmail.com>> 于2024年4月10日周三 
>>>>> 03:10写道:
>>>>>> Hello,
>>>>>> 
>>>>>> I have set up Flink and Kafka containers using docker-compose, for 
>>>>>> testing how Flink works for processing Kafka messages.
>>>>>> I primarily want to check how the Table API works but also how the 
>>>>>> Stream API would process the Kafka messages.
>>>>>> 
>>>>>> I have included the main part of the docker-compose.yaml file I am using 
>>>>>> for the Flink cluster. I have commented out some of the JAR files I have 
>>>>>> tried out but I have mainly used the flink-sql-connector-kafka JAR. I 
>>>>>> would like to confirm if I am using any wrong configurations, which JARs 
>>>>>> should I be using for each API and if just need to use one JAR for both 
>>>>>> Table and Datastream APIs?
>>>>>> I have also included the Flink client module I have used for both the 
>>>>>> Table and the Datastream APIs and the error messages.
>>>>>> Any idea what is missing or if there is any configuration that seems 
>>>>>> wrong?
>>>>>> 
>>>>>> docker-compose.yml
>>>>>> flink_jobmanager:
>>>>>> image: flink:1.18.1-scala_2.12
>>>>>> container_name: flink_jobmanager
>>>>>> ports:
>>>>>> - "8081:8081"
>>>>>> command: jobmanager
>>>>>> volumes:
>>>>>> - 
>>>>>> ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar
>>>>>> # - 
>>>>>> ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar
>>>>>> # - 
>>>>>> ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar
>>>>>> # - 
>>>>>> ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar
>>>>>> environment:
>>>>>> - |
>>>>>> FLINK_PROPERTIES=
>>>>>> jobmanager.rpc.address: jobmanager
>>>>>> networks:
>>>>>> - standard
>>>>>> depends_on:
>>>>>> - kafka
>>>>>> 
>>>>>> flink_taskmanager:
>>>>>> image: flink:1.18.1-scala_2.12
>>>>>> container_name: flink_taskmanager
>>>>>> volumes:
>>>>>> - 
>>>>>> ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar
>>>>>> # - 
>>>>>> ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar
>>>>>> # - 
>>>>>> ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar
>>>>>> # - 
>>>>>> ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar
>>>>>> depends_on:
>>>>>> - kafka
>>>>>> - jobmanager
>>>>>> command: taskmanager
>>>>>> environment:
>>>>>> - |
>>>>>> FLINK_PROPERTIES=
>>>>>> jobmanager.rpc.address: jobmanager
>>>>>> taskmanager.numberOfTaskSlots: 2
>>>>>> networks:
>>>>>> - standard
>>>>>> Table API:
>>>>>> def process_table():
>>>>>> 
>>>>>> print('Running the Table job now.’)      
>>>>>> env_settings = (
>>>>>> EnvironmentSettings
>>>>>> .new_instance()
>>>>>> .in_streaming_mode()
>>>>>> # .with_configuration(config)
>>>>>> .build()
>>>>>> )
>>>>>> t_env = TableEnvironment.create(env_settings)
>>>>>> 
>>>>>> t_env.execute_sql(
>>>>>> f"""
>>>>>> CREATE TABLE kafka_test_logs (
>>>>>> action2 STRING
>>>>>> ) WITH (
>>>>>> 'connector' = 'kafka',
>>>>>> 'topic' = 'test_logs',
>>>>>> 'properties.bootstrap.servers' = 'kafka:9092',
>>>>>> 'properties.group.id <http://properties.group.id/>' = 'flink_group',
>>>>>> 'scan.startup.mode' = 'earliest-offset',
>>>>>> 'format' = 'json',
>>>>>> 'json.ignore-parse-errors' = 'true'
>>>>>> )
>>>>>> """)
>>>>>> 
>>>>>> 
>>>>>> t_env.execute_sql("""
>>>>>> SELECT COUNT(*) AS message_count
>>>>>> FROM kafka_test_logs
>>>>>> """).print()
>>>>>> 
>>>>>> print('Table job has now completed.')
>>>>>> 
>>>>>> 
>>>>>> if __name__ == "__main__":
>>>>>>  process_table()
>>>>>> error:
>>>>>> link_app | Caused by: org.apache.flink.table.api.ValidationException: 
>>>>>> Could not find any factory for identifier 'kafka' that implements 
>>>>>> 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
>>>>>> flink_app |
>>>>>> flink_app | Available factory identifiers are:
>>>>>> flink_app |
>>>>>> flink_app | blackhole
>>>>>> flink_app | datagen
>>>>>> flink_app | filesystem
>>>>>> flink_app | print
>>>>>> flink_app | python-input-format
>>>>>> The '/opt/flink/lib' has the JARs from both the image and the JARs i 
>>>>>> added in the docker-compose.yml file.
>>>>>> 
>>>>>> Stream API:
>>>>>> def process_stream():
>>>>>> env = StreamExecutionEnvironment.get_execution_environment()
>>>>>> 
>>>>>> properties = {
>>>>>> "bootstrap.servers": 'kafka:9092',
>>>>>> "group.id <http://group.id/>": "test_group"
>>>>>> }
>>>>>> 
>>>>>> kafka_consumer = FlinkKafkaConsumer(
>>>>>> topics='test_logs',
>>>>>> deserialization_schema=SimpleStringSchema(),
>>>>>> properties=properties)
>>>>>> 
>>>>>> data_stream = env.add_source(kafka_consumer)
>>>>>> 
>>>>>> parsed_stream = data_stream.map(lambda x: json.loads(x), 
>>>>>> output_type=Types.ROW([Types.STRING()]))
>>>>>> parsed_stream.print()
>>>>>> 
>>>>>> env.execute("Kafka DataStream Test”)
>>>>>> 
>>>>>> if __name__ == "__main__":
>>>>>>  process_stream()
>>>>>> error:
>>>>>> TypeError: Could not found the Java class 
>>>>>> 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'. The 
>>>>>> Java dependencies could be specified via command line argument 
>>>>>> '--jarfile' or the config option 'pipeline.jars'
>>>>>> 
>>>>>> I have used the JAR dependencies based on:
>>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/configuration/overview/#flink-apis
>>>>>> 
>>>>>> Kind regards
>>>>>> Phil
>>>> 
>> 

Reply via email to