Re: Flink 1.18.1 cannot read from Kafka

2024-04-21 Thread Phil Stavridis
Thanks Biao.

Kind regards
Phil

> On 14 Apr 2024, at 18:04, Biao Geng  wrote:
> 
> Hi Phil,
> 
> You can check my github link 
>  
> for a detailed tutorial and example codes :).
> 
> Best,
> Biao Geng
> 
> Phil Stavridis mailto:phi...@gmail.com>> 于2024年4月12日周五 
> 19:10写道:
>> Hi Biao,
>> 
>> Thanks for looking into it and providing a detailed example.
>> I am not sure I am following some of the setup and I just want to make sure 
>> I run what you have tested without messing up anything with the setup. 
>> Especially for how you create the Docker images etc. Do you have this 
>> committed somewhere or could you perhaps provide a JAR file with the 
>> implementation? Thanks.
>> 
>>> On 11 Apr 2024, at 19:43, Biao Geng >> > wrote:
>>> 
>>> Hi Phil,
>>> I test it on my local docker environment and find that we do need to use 
>>> "flink run" to submit the job to the session cluster. Simply using `python 
>>> xx.py` may just launch a local mini cluster and would not submit the job to 
>>> the cluster you created. Also note, that all required dependencies (e.g. 
>>> the kafka connector) need to be available in the cluster as well as the 
>>> client.
>>> 
>>> Here is my test codes which work in my env, hope this helps:
>>> my flink-cluster-and-client.yaml:
>>> version: "2.2"
>>> services:
>>>   jobmanager:
>>> image: pyflink:latest
>>> ports:
>>>   - "8081:8081"
>>> command: jobmanager
>>> environment:
>>>   - |
>>> FLINK_PROPERTIES=
>>> jobmanager.rpc.address: jobmanager
>>> 
>>>   taskmanager:
>>> image: pyflink:latest
>>> depends_on:
>>>   - jobmanager
>>> command: taskmanager
>>> scale: 1
>>> environment:
>>>   - |
>>> FLINK_PROPERTIES=
>>> jobmanager.rpc.address: jobmanager
>>> taskmanager.numberOfTaskSlots: 2
>>> 
>>>   client:
>>> image: pyflink-client:latest
>>> depends_on:
>>>   - jobmanager
>>> command: /opt/flink/run.sh
>>> environment:
>>>   - |
>>> FLINK_PROPERTIES=
>>> rest.address: jobmanager
>>> rest.bind-address: jobmanager
>>> scale: 1
>>> 
>>> my read_kafka.py:
>>> from pyflink.table import (EnvironmentSettings, TableEnvironment)
>>> 
>>> def process_table():
>>> env_settings = (
>>> EnvironmentSettings
>>> .new_instance()
>>> .in_streaming_mode()
>>> .build()
>>> )
>>> t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
>>> 
>>> 
>>> t_env.execute_sql(
>>> f"""
>>> CREATE TABLE kafka_input_topic (
>>> action STRING
>>> ) WITH (
>>> 'connector' = 'kafka',
>>> 'topic' = 'input_topic',
>>> 'properties.bootstrap.servers' = 'kafka:9092',
>>> 'properties.group.id ' = 'test',
>>> 'scan.startup.mode' = 'earliest-offset',
>>> 'format' = 'json',
>>> 'json.ignore-parse-errors' = 'true'
>>> )
>>> """)
>>> 
>>> t_env.execute_sql(
>>> f"""
>>> CREATE TABLE print_sink (
>>> action STRING
>>> ) WITH (
>>> 'connector' = 'print'
>>> )
>>> """)
>>> 
>>> t_env.execute_sql("""
>>> INSERT INTO print_sink SELECT * FROM kafka_input_topic
>>> """).print()
>>> 
>>> if __name__ == "__main__":
>>> process_table()
>>> 
>>> 
>>> the run.sh is just one line to use `flink run`:
>>> #!/bin/bash
>>> flink run -py read_kafka.py
>>> 
>>> my dockerfile to build pyflink:lastest image:
>>> 
>>> FROM flink:latest
>>> 
>>> # install python3 and pip3
>>> RUN apt-get update -y && \
>>> apt-get install -y python3 python3-pip python3-dev && rm -rf 
>>> /var/lib/apt/lists/*
>>> RUN ln -s /usr/bin/python3 /usr/bin/python
>>> 
>>> # install PyFlink
>>> 
>>> COPY apache-flink*.tar.gz /
>>> RUN pip3 install /apache-flink-libraries*.tar.gz && pip3 install 
>>> /apache-flink*.tar.gz
>>> COPY flink-sql-connector-kafka-3.1.0-1.18.jar /opt/flink/lib
>>> 
>>> my dockerfile to build pyflink-client:lastest image:
>>> FROM pyflink:latest
>>> COPY read_kafka.py /opt/flink/
>>> COPY run.sh /opt/flink
>>> RUN chmod 777 /opt/flink/run.sh
>>> 
>>> Possible output (dependent on the content of the kafka topic):
>>> 
>>> 
>>> Best,
>>> Biao Geng
>>> 
>>> Biao Geng mailto:biaoge...@gmail.com>> 于2024年4月11日周四 
>>> 09:53写道:
 Hi Phil,
 "should this be run in the Flink JobManager?” It should be fine to run in 
 your client container. 
 "examples of running flink run in a Dockerized application" This sql 
 client example 
 
  can be used as a good example.
 It is somehow strange to learn that the pipeline.jars does not work. I 
 will try to run the pyflink using kafka example in docker locally later 
 today. 

RE: Flink 1.18.1 cannot read from Kafka

2024-04-14 Thread Sohil Shah
Hi Phil,


if __name__ == "__main__": process_table() error: link_app | Caused by:
org.apache.flink.table.api.ValidationException: Could not find any factory
for identifier 'kafka' that implements
'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
flink_app | flink_app | Available factory identifiers are: flink_app |
flink_app | blackhole flink_app | datagen flink_app | filesystem flink_app
| print flink_app | python-input-format The '/opt/flink/lib' has the JARs
from both the image and the JARs i added in the docker-compose.yml file.


Based on the stacktrace it seems your Kafka data is being processed but
table creation which by default I believe is 'print' or 'datagen', has an
issue with the factory that creates tables dynamically.

You try 'blackhole' to atleast confirm Kafka data is infact being processed
by flink, except table creation issue. Note: 'blackhole' is not a solution
as that is basically '/dev/null' and usually for testing.

After that you may have to look into DynamicTableFactory section in the
reference documentation as it has more step by step details.

Thanks
Sohil


Re: Flink 1.18.1 cannot read from Kafka

2024-04-14 Thread Biao Geng
Hi Phil,

You can check my github link

for a detailed tutorial and example codes :).

Best,
Biao Geng

Phil Stavridis  于2024年4月12日周五 19:10写道:

> Hi Biao,
>
> Thanks for looking into it and providing a detailed example.
> I am not sure I am following some of the setup and I just want to make
> sure I run what you have tested without messing up anything with the setup.
> Especially for how you create the Docker images etc. Do you have this
> committed somewhere or could you perhaps provide a JAR file with the
> implementation? Thanks.
>
> On 11 Apr 2024, at 19:43, Biao Geng  wrote:
>
> Hi Phil,
> I test it on my local docker environment and find that we do need to use
> "flink run" to submit the job to the session cluster. Simply using `python
> xx.py` may just launch a local mini cluster and would not submit the job to
> the cluster you created. Also note, that all required dependencies (e.g.
> the kafka connector) need to be available in the cluster as well as the
> client.
>
> Here is my test codes which work in my env, hope this helps:
> my flink-cluster-and-client.yaml:
> version: "2.2"
> services:
> jobmanager:
> image: pyflink:latest
> ports:
> - "8081:8081"
> command: jobmanager
> environment:
> - |
> FLINK_PROPERTIES=
> jobmanager.rpc.address: jobmanager
>
> taskmanager:
> image: pyflink:latest
> depends_on:
> - jobmanager
> command: taskmanager
> scale: 1
> environment:
> - |
> FLINK_PROPERTIES=
> jobmanager.rpc.address: jobmanager
> taskmanager.numberOfTaskSlots: 2
>
> client:
> image: pyflink-client:latest
> depends_on:
> - jobmanager
> command: /opt/flink/run.sh
> environment:
> - |
> FLINK_PROPERTIES=
> rest.address: jobmanager
> rest.bind-address: jobmanager
> scale: 1
>
> my read_kafka.py:
> from pyflink.table import (EnvironmentSettings, TableEnvironment)
>
> def process_table():
> env_settings = (
> EnvironmentSettings
> .new_instance()
> .in_streaming_mode()
> .build()
> )
> t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
>
>
> t_env.execute_sql(
> f"""
> CREATE TABLE kafka_input_topic (
> action STRING
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'input_topic',
> 'properties.bootstrap.servers' = 'kafka:9092',
> 'properties.group.id' = 'test',
> 'scan.startup.mode' = 'earliest-offset',
> 'format' = 'json',
> 'json.ignore-parse-errors' = 'true'
> )
> """)
> t_env.execute_sql(
> f"""
> CREATE TABLE print_sink (
> action STRING
> ) WITH (
> 'connector' = 'print'
> )
> """)
>
> t_env.execute_sql("""
> INSERT INTO print_sink SELECT * FROM kafka_input_topic
> """).print()
>
> if __name__ == "__main__":
> process_table()
>
>
> the run.sh is just one line to use `flink run`:
> #!/bin/bash
> flink run -py read_kafka.py
>
> my dockerfile to build pyflink:lastest image:
>
> FROM flink:latest
>
> # install python3 and pip3
> RUN apt-get update -y && \
> apt-get install -y python3 python3-pip python3-dev && rm -rf
> /var/lib/apt/lists/*
> RUN ln -s /usr/bin/python3 /usr/bin/python
>
> # install PyFlink
>
> COPY apache-flink*.tar.gz /
> RUN pip3 install /apache-flink-libraries*.tar.gz && pip3 install
> /apache-flink*.tar.gz
> COPY flink-sql-connector-kafka-3.1.0-1.18.jar /opt/flink/lib
>
> my dockerfile to build pyflink-client:lastest image:
> FROM pyflink:latest
> COPY read_kafka.py /opt/flink/
> COPY run.sh /opt/flink
> RUN chmod 777 /opt/flink/run.sh
>
> Possible output (dependent on the content of the kafka topic):
> 
>
> Best,
> Biao Geng
>
> Biao Geng  于2024年4月11日周四 09:53写道:
>
>> Hi Phil,
>> "should this be run in the Flink JobManager?” It should be fine to run in
>> your client container.
>> "examples of running flink run in a Dockerized application" This sql
>> client example
>> 
>>  can
>> be used as a good example.
>> It is somehow strange to learn that the pipeline.jars does not work. I
>> will try to run the pyflink using kafka example in docker locally later
>> today. Besides that, I see that in your all 3 containers, the connector jar
>> are all located in /opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar,
>> could you please give a try of using `table_env.get_config().set(
>> "pipeline.classpaths", "file:///my/jar/path/connector.jar")` in your
>> python script?
>>
>> Best,
>> Biao Geng
>>
>>
>> Phil Stavridis  于2024年4月11日周四 00:49写道:
>>
>>> Hi Biao,
>>>
>>> I will check out running with flink run, but should this be run in the
>>> Flink JobManager? Would that mean that the container for the Flink
>>> JobManager would require both Python installed and a copy of the
>>> flink_client.py module? Are there some examples of running flink run in a
>>> Dockerized application instead of the local CLI?
>>> I have added the pipeline.jars in the flink_client module but it is
>>> still complaining about not finding kafka and the DynamicTable 

Re: Flink 1.18.1 cannot read from Kafka

2024-04-10 Thread Phil Stavridis
Hi Biao,

I will check out running with flink run, but should this be run in the Flink 
JobManager? Would that mean that the container for the Flink JobManager would 
require both Python installed and a copy of the flink_client.py module? Are 
there some examples of running flink run in a Dockerized application instead of 
the local CLI?
I have added the pipeline.jars in the flink_client module but it is still 
complaining about not finding kafka and the DynamicTable factory class in the 
class path. 
Thanks for your feedback.

Kind regards
Phil

> On 10 Apr 2024, at 16:21, Biao Geng  wrote:
> 
> Hi Phil,
> It should be totally ok to use `python -m flink_client.job`. It just seems to 
> me that the flink cli is being used more often.
> And yes, you also need to add the sql connector jar to the flink_client 
> container. After putting the jar in your client container, add codes like 
> `table_env.get_config().set("pipeline.jars", 
> "file:///path/in/container/to/connector.jar")` to your job.py. Then in the 
> client side, the flink can see the connector jar and the jar would be 
> uploaded to the cluster as well. See the doc of Jar Dependencies Management 
> 
>  for more details.
> 
> Best,
> Biao Geng
> 
> 
> Phil Stavridis mailto:phi...@gmail.com>> 于2024年4月10日周三 
> 22:04写道:
>> Hi Biao,
>> 
>> 1. I have a Flink client container like this:
>> # Flink client
>> flink_client:
>> container_name: flink_client
>> image: flink-client:local
>> build:
>> context: .
>> dockerfile: flink_client/Dockerfile
>> networks:
>> - standard
>> depends_on:
>> - jobmanager
>> - Kafka
>> 
>> The flink_client/Dockerfile has this bash file which only runs the Flink 
>> client, when the Flink cluster is up an running:
>> Dockerfile:
>> ENTRYPOINT [“./run_when_ready.sh”]
>> 
>> run_when_ready.sh:
>> 
>> #!/bin/bash
>> 
>> wait_for_jobmanager() {
>> echo "Waiting for Flink JobManager to be ready..."
>> while ! nc -z jobmanager 8081; do
>> sleep 1
>> done
>> echo "Flink JobManager is ready."
>> }
>> 
>> # Call the function
>> wait_for_jobmanager
>> 
>> # Now run the PyFlink job
>> python -m flink_client.job
>> flink_client.job.py  is the file I attached 
>> earlier which has:
>> 
>> if __name__ == "__main__":
>>  process_table()
>> 
>> 
>> So it runs that file which should submit the Table job to the Flink cluster 
>> as that file runs:
>> t_env.execute_sql("""
>> SELECT COUNT(*) AS message_count
>> FROM kafka_test_logs
>> """).print()
>> 
>> 
>> I don’t use flink run -t …
>> Is this the wrong way to run PyFlink jobs?
>> 
>> Also, do I need to also add the flink-sql-connector.jar to the 
>> flink_client/Dockerfile, i.e. COPY /libs/*.jar /opt/flink/lib/ and instead 
>> of relying on the environment.execute_sql(…), I need to run it with 
>> flink_run -t and the JAR file as you mentioned earlier?
>> 
>> 
>>> On 10 Apr 2024, at 11:37, Biao Geng >> > wrote:
>>> 
>>> Hi Phil,
>>> 
>>> Your codes look good. I mean how do you run the python script. Maybe you 
>>> are using flink cli? i.e. run commands like ` flink run -t .. -py job.py -j 
>>> /path/to/flink-sql-kafka-connector.jar`. If that's the case, the `-j 
>>> /path/to/flink-sql-kafka-connector.jar` is necessary so that in client 
>>> side, flink can generate the job graph successfully.
>>> 
>>> As for the second question, in your case, yes, your client’s env need to 
>>> have the flink-sql-kafka-connector JAR. You can check the doc 
>>> 
>>>  for more details. In short, your yaml shows that you are using the session 
>>> mode, which needs connector jar to generate job graph in the client side.
>>> 
>>> 
>>> Best,
>>> Biao Geng
>>> 
>>> Phil Stavridis mailto:phi...@gmail.com>> 于2024年4月10日周三 
>>> 18:14写道:
 Hi Biao,
 
 For submitting the job, I run t_env.execute_sql.
 Shouldn’t that be sufficient for submitting the job using the Table API 
 with PyFlink? Isn’t that the recommended way for submitting and running 
 PyFlink jobs on a running Flink cluster? The Flink cluster runs without 
 issues, but there is no Running Job when the container flink_app, that has 
 the client code, runs. 
 I could submit a job, collect, using the Flink SQL console, and it was 
 available on the Running Jobs tab, but that was also not ingesting data 
 from the Kafka topic.
 Does my client’s env need to have the flink-sql-kafka-connector JAR? Is 
 that needed just in the class path of the Flink JobManager and TaskManager?
 
 Kind regards
 Phil
 
 
> On 10 Apr 2024, at 03:33, Biao Geng  > wrote:
> 
> Hi Phil,
> 
> Thanks for sharing the detailed information of the job. For you question, 
> how to you submit the 

Re: Flink 1.18.1 cannot read from Kafka

2024-04-10 Thread Biao Geng
Hi Phil,
It should be totally ok to use `python -m flink_client.job`. It just seems
to me that the flink cli is being used more often.
And yes, you also need to add the sql connector jar to the flink_client
container. After putting the jar in your client container, add codes
like `table_env.get_config().set("pipeline.jars",
"file:///path/in/container/to/connector.jar")` to your job.py. Then in the
client side, the flink can see the connector jar and the jar would be
uploaded to the cluster as well. See the doc of Jar Dependencies Management

for
more details.

Best,
Biao Geng


Phil Stavridis  于2024年4月10日周三 22:04写道:

> Hi Biao,
>
> 1. I have a Flink client container like this:
> # Flink client
> flink_client:
> container_name: flink_client
> image: flink-client:local
> build:
> context: .
> dockerfile: flink_client/Dockerfile
> networks:
> - standard
> depends_on:
> - jobmanager
> - Kafka
>
> The flink_client/Dockerfile has this bash file which only runs the Flink
> client, when the Flink cluster is up an running:
> Dockerfile:
> ENTRYPOINT [“./run_when_ready.sh”]
>
> run_when_ready.sh:
>
> #!/bin/bash
>
> wait_for_jobmanager() {
> echo "Waiting for Flink JobManager to be ready..."
> while ! nc -z jobmanager 8081; do
> sleep 1
> done
> echo "Flink JobManager is ready."
> }
>
> # Call the function
> wait_for_jobmanager
>
> # Now run the PyFlink job
> python -m flink_client.job
> flink_client.job.py is the file I attached earlier which has:
>
> if __name__ == "__main__":
>>> process_table()
>>>
>>
> So it runs that file which should submit the Table job to the Flink
> cluster as that file runs:
>
> t_env.execute_sql("""
>>> SELECT COUNT(*) AS message_count
>>> FROM kafka_test_logs
>>> """).print()
>>>
>>
> I don’t use flink run -t …
> Is this the wrong way to run PyFlink jobs?
>
> Also, do I need to also add the flink-sql-connector.jar to the
> flink_client/Dockerfile, i.e. COPY /libs/*.jar /opt/flink/lib/ and instead
> of relying on the environment.execute_sql(…), I need to run it with
> flink_run -t and the JAR file as you mentioned earlier?
>
>
> On 10 Apr 2024, at 11:37, Biao Geng  wrote:
>
> Hi Phil,
>
> Your codes look good. I mean how do you run the python script. Maybe you
> are using flink cli? i.e. run commands like ` flink run -t .. -py job.py -j
> /path/to/flink-sql-kafka-connector.jar`. If that's the case, the `-j
> /path/to/flink-sql-kafka-connector.jar` is necessary so that in client
> side, flink can generate the job graph successfully.
>
> As for the second question, in your case, yes, your client’s env need to
> have the flink-sql-kafka-connector JAR. You can check the doc
> 
>  for
> more details. In short, your yaml shows that you are using the session
> mode, which needs connector jar to generate job graph in the client side.
>
>
> Best,
> Biao Geng
>
> Phil Stavridis  于2024年4月10日周三 18:14写道:
>
>> Hi Biao,
>>
>> For submitting the job, I run t_env.execute_sql.
>> Shouldn’t that be sufficient for submitting the job using the Table API
>> with PyFlink? Isn’t that the recommended way for submitting and running
>> PyFlink jobs on a running Flink cluster? The Flink cluster runs without
>> issues, but there is no Running Job when the container flink_app, that has
>> the client code, runs.
>> I could submit a job, collect, using the Flink SQL console, and it was
>> available on the Running Jobs tab, but that was also not ingesting data
>> from the Kafka topic.
>> Does my client’s env need to have the flink-sql-kafka-connector JAR? Is
>> that needed just in the class path of the Flink JobManager and TaskManager?
>>
>> Kind regards
>> Phil
>>
>>
>> On 10 Apr 2024, at 03:33, Biao Geng  wrote:
>>
>> Hi Phil,
>>
>> Thanks for sharing the detailed information of the job. For you question,
>> how to you submit the job? After applying your yaml file, I think you will
>> successfully launch a flink cluster with 1 JM and 1 TM. Then you would
>> submit the pyflink job to the flink cluster. As the error you showed is the
>> client-side error, it is possible that your client's env does not contain
>> the flink-sql-kafka-connector jar which may lead to the exception.
>> By the way, the "Table API" codes in your mail is actually using the
>> flink SQL API, so the flink-sql-kafka-connector jar is required, which is
>> exactly what you have prepared. For pyflink's table API, you can have a
>> look at this document:
>> https://nightlies.apache.org/flink/flink-docs-master/api/python/examples/table/word_count.html
>>
>> Best,
>> Biao Geng
>>
>>
>> Phil Stavridis  于2024年4月10日周三 03:10写道:
>>
>>> Hello,
>>>
>>> I have set up Flink and Kafka containers using docker-compose, for
>>> testing how Flink works for processing Kafka messages.
>>> I primarily want to check how the Table API works but also how the
>>> Stream API would 

Re: Flink 1.18.1 cannot read from Kafka

2024-04-10 Thread Phil Stavridis
Hi Biao,

1. I have a Flink client container like this:
# Flink client
flink_client:
container_name: flink_client
image: flink-client:local
build:
context: .
dockerfile: flink_client/Dockerfile
networks:
- standard
depends_on:
- jobmanager
- Kafka

The flink_client/Dockerfile has this bash file which only runs the Flink 
client, when the Flink cluster is up an running:
Dockerfile:
ENTRYPOINT [“./run_when_ready.sh”]

run_when_ready.sh:

#!/bin/bash

wait_for_jobmanager() {
echo "Waiting for Flink JobManager to be ready..."
while ! nc -z jobmanager 8081; do
sleep 1
done
echo "Flink JobManager is ready."
}

# Call the function
wait_for_jobmanager

# Now run the PyFlink job
python -m flink_client.job
flink_client.job.py is the file I attached earlier which has:

 if __name__ == "__main__":
process_table()


So it runs that file which should submit the Table job to the Flink cluster as 
that file runs:
 t_env.execute_sql("""
 SELECT COUNT(*) AS message_count
 FROM kafka_test_logs
 """).print()


I don’t use flink run -t …
Is this the wrong way to run PyFlink jobs?

Also, do I need to also add the flink-sql-connector.jar to the 
flink_client/Dockerfile, i.e. COPY /libs/*.jar /opt/flink/lib/ and instead of 
relying on the environment.execute_sql(…), I need to run it with flink_run -t 
and the JAR file as you mentioned earlier?


> On 10 Apr 2024, at 11:37, Biao Geng  wrote:
> 
> Hi Phil,
> 
> Your codes look good. I mean how do you run the python script. Maybe you are 
> using flink cli? i.e. run commands like ` flink run -t .. -py job.py -j 
> /path/to/flink-sql-kafka-connector.jar`. If that's the case, the `-j 
> /path/to/flink-sql-kafka-connector.jar` is necessary so that in client side, 
> flink can generate the job graph successfully.
> 
> As for the second question, in your case, yes, your client’s env need to have 
> the flink-sql-kafka-connector JAR. You can check the doc 
> 
>  for more details. In short, your yaml shows that you are using the session 
> mode, which needs connector jar to generate job graph in the client side.
> 
> 
> Best,
> Biao Geng
> 
> Phil Stavridis mailto:phi...@gmail.com>> 于2024年4月10日周三 
> 18:14写道:
>> Hi Biao,
>> 
>> For submitting the job, I run t_env.execute_sql.
>> Shouldn’t that be sufficient for submitting the job using the Table API with 
>> PyFlink? Isn’t that the recommended way for submitting and running PyFlink 
>> jobs on a running Flink cluster? The Flink cluster runs without issues, but 
>> there is no Running Job when the container flink_app, that has the client 
>> code, runs. 
>> I could submit a job, collect, using the Flink SQL console, and it was 
>> available on the Running Jobs tab, but that was also not ingesting data from 
>> the Kafka topic.
>> Does my client’s env need to have the flink-sql-kafka-connector JAR? Is that 
>> needed just in the class path of the Flink JobManager and TaskManager?
>> 
>> Kind regards
>> Phil
>> 
>> 
>>> On 10 Apr 2024, at 03:33, Biao Geng >> > wrote:
>>> 
>>> Hi Phil,
>>> 
>>> Thanks for sharing the detailed information of the job. For you question, 
>>> how to you submit the job? After applying your yaml file, I think you will 
>>> successfully launch a flink cluster with 1 JM and 1 TM. Then you would 
>>> submit the pyflink job to the flink cluster. As the error you showed is the 
>>> client-side error, it is possible that your client's env does not contain 
>>> the flink-sql-kafka-connector jar which may lead to the exception.
>>> By the way, the "Table API" codes in your mail is actually using the flink 
>>> SQL API, so the flink-sql-kafka-connector jar is required, which is exactly 
>>> what you have prepared. For pyflink's table API, you can have a look at 
>>> this document: 
>>> https://nightlies.apache.org/flink/flink-docs-master/api/python/examples/table/word_count.html
>>> 
>>> Best,
>>> Biao Geng
>>> 
>>> 
>>> Phil Stavridis mailto:phi...@gmail.com>> 于2024年4月10日周三 
>>> 03:10写道:
 Hello,
 
 I have set up Flink and Kafka containers using docker-compose, for testing 
 how Flink works for processing Kafka messages.
 I primarily want to check how the Table API works but also how the Stream 
 API would process the Kafka messages.
 
 I have included the main part of the docker-compose.yaml file I am using 
 for the Flink cluster. I have commented out some of the JAR files I have 
 tried out but I have mainly used the flink-sql-connector-kafka JAR. I 
 would like to confirm if I am using any wrong configurations, which JARs 
 should I be using for each API and if just need to use one JAR for both 
 Table and Datastream APIs?
 I have also included the Flink client module I have used for both the 
 Table and the Datastream APIs and the error messages.
 Any idea what is missing or if there is any configuration 

Re: Flink 1.18.1 cannot read from Kafka

2024-04-10 Thread Biao Geng
Hi Phil,

Your codes look good. I mean how do you run the python script. Maybe you
are using flink cli? i.e. run commands like ` flink run -t .. -py job.py -j
/path/to/flink-sql-kafka-connector.jar`. If that's the case, the `-j
/path/to/flink-sql-kafka-connector.jar` is necessary so that in client
side, flink can generate the job graph successfully.

As for the second question, in your case, yes, your client’s env need to
have the flink-sql-kafka-connector JAR. You can check the doc

for
more details. In short, your yaml shows that you are using the session
mode, which needs connector jar to generate job graph in the client side.


Best,
Biao Geng

Phil Stavridis  于2024年4月10日周三 18:14写道:

> Hi Biao,
>
> For submitting the job, I run t_env.execute_sql.
> Shouldn’t that be sufficient for submitting the job using the Table API
> with PyFlink? Isn’t that the recommended way for submitting and running
> PyFlink jobs on a running Flink cluster? The Flink cluster runs without
> issues, but there is no Running Job when the container flink_app, that has
> the client code, runs.
> I could submit a job, collect, using the Flink SQL console, and it was
> available on the Running Jobs tab, but that was also not ingesting data
> from the Kafka topic.
> Does my client’s env need to have the flink-sql-kafka-connector JAR? Is
> that needed just in the class path of the Flink JobManager and TaskManager?
>
> Kind regards
> Phil
>
>
> On 10 Apr 2024, at 03:33, Biao Geng  wrote:
>
> Hi Phil,
>
> Thanks for sharing the detailed information of the job. For you question,
> how to you submit the job? After applying your yaml file, I think you will
> successfully launch a flink cluster with 1 JM and 1 TM. Then you would
> submit the pyflink job to the flink cluster. As the error you showed is the
> client-side error, it is possible that your client's env does not contain
> the flink-sql-kafka-connector jar which may lead to the exception.
> By the way, the "Table API" codes in your mail is actually using the flink
> SQL API, so the flink-sql-kafka-connector jar is required, which is exactly
> what you have prepared. For pyflink's table API, you can have a look at
> this document:
> https://nightlies.apache.org/flink/flink-docs-master/api/python/examples/table/word_count.html
>
> Best,
> Biao Geng
>
>
> Phil Stavridis  于2024年4月10日周三 03:10写道:
>
>> Hello,
>>
>> I have set up Flink and Kafka containers using docker-compose, for
>> testing how Flink works for processing Kafka messages.
>> I primarily want to check how the Table API works but also how the Stream
>> API would process the Kafka messages.
>>
>> I have included the main part of the docker-compose.yaml file I am using
>> for the Flink cluster. I have commented out some of the JAR files I have
>> tried out but I have mainly used the flink-sql-connector-kafka JAR. I would
>> like to confirm if I am using any wrong configurations, which JARs should I
>> be using for each API and if just need to use one JAR for both Table and
>> Datastream APIs?
>> I have also included the Flink client module I have used for both the
>> Table and the Datastream APIs and the error messages.
>> Any idea what is missing or if there is any configuration that seems
>> wrong?
>>
>> docker-compose.yml
>> flink_jobmanager:
>> image: flink:1.18.1-scala_2.12
>> container_name: flink_jobmanager
>> ports:
>> - "8081:8081"
>> command: jobmanager
>> volumes:
>> -
>> ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar
>> # -
>> ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar
>> # -
>> ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar
>> # -
>> ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar
>> environment:
>> - |
>> FLINK_PROPERTIES=
>> jobmanager.rpc.address: jobmanager
>> networks:
>> - standard
>> depends_on:
>> - kafka
>>
>> flink_taskmanager:
>> image: flink:1.18.1-scala_2.12
>> container_name: flink_taskmanager
>> volumes:
>> -
>> ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar
>> # -
>> ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar
>> # -
>> ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar
>> # -
>> ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar
>> depends_on:
>> - kafka
>> - jobmanager
>> command: taskmanager
>> environment:
>> - |
>> FLINK_PROPERTIES=
>> jobmanager.rpc.address: jobmanager
>> taskmanager.numberOfTaskSlots: 2
>> networks:
>> - standard
>> Table API:
>> def process_table():
>>
>> print('Running the Table job now.’)
>> env_settings = (
>> EnvironmentSettings
>> 

Re: Flink 1.18.1 cannot read from Kafka

2024-04-10 Thread Phil Stavridis
Hi Biao,

For submitting the job, I run t_env.execute_sql.
Shouldn’t that be sufficient for submitting the job using the Table API with 
PyFlink? Isn’t that the recommended way for submitting and running PyFlink jobs 
on a running Flink cluster? The Flink cluster runs without issues, but there is 
no Running Job when the container flink_app, that has the client code, runs. 
I could submit a job, collect, using the Flink SQL console, and it was 
available on the Running Jobs tab, but that was also not ingesting data from 
the Kafka topic.
Does my client’s env need to have the flink-sql-kafka-connector JAR? Is that 
needed just in the class path of the Flink JobManager and TaskManager?

Kind regards
Phil


> On 10 Apr 2024, at 03:33, Biao Geng  wrote:
> 
> Hi Phil,
> 
> Thanks for sharing the detailed information of the job. For you question, how 
> to you submit the job? After applying your yaml file, I think you will 
> successfully launch a flink cluster with 1 JM and 1 TM. Then you would submit 
> the pyflink job to the flink cluster. As the error you showed is the 
> client-side error, it is possible that your client's env does not contain the 
> flink-sql-kafka-connector jar which may lead to the exception.
> By the way, the "Table API" codes in your mail is actually using the flink 
> SQL API, so the flink-sql-kafka-connector jar is required, which is exactly 
> what you have prepared. For pyflink's table API, you can have a look at this 
> document: 
> https://nightlies.apache.org/flink/flink-docs-master/api/python/examples/table/word_count.html
> 
> Best,
> Biao Geng
> 
> 
> Phil Stavridis mailto:phi...@gmail.com>> 于2024年4月10日周三 
> 03:10写道:
>> Hello,
>> 
>> I have set up Flink and Kafka containers using docker-compose, for testing 
>> how Flink works for processing Kafka messages.
>> I primarily want to check how the Table API works but also how the Stream 
>> API would process the Kafka messages.
>> 
>> I have included the main part of the docker-compose.yaml file I am using for 
>> the Flink cluster. I have commented out some of the JAR files I have tried 
>> out but I have mainly used the flink-sql-connector-kafka JAR. I would like 
>> to confirm if I am using any wrong configurations, which JARs should I be 
>> using for each API and if just need to use one JAR for both Table and 
>> Datastream APIs?
>> I have also included the Flink client module I have used for both the Table 
>> and the Datastream APIs and the error messages.
>> Any idea what is missing or if there is any configuration that seems wrong?
>> 
>> docker-compose.yml
>> flink_jobmanager:
>> image: flink:1.18.1-scala_2.12
>> container_name: flink_jobmanager
>> ports:
>> - "8081:8081"
>> command: jobmanager
>> volumes:
>> - 
>> ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar
>> # - 
>> ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar
>> # - 
>> ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar
>> # - 
>> ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar
>> environment:
>> - |
>> FLINK_PROPERTIES=
>> jobmanager.rpc.address: jobmanager
>> networks:
>> - standard
>> depends_on:
>> - kafka
>> 
>> flink_taskmanager:
>> image: flink:1.18.1-scala_2.12
>> container_name: flink_taskmanager
>> volumes:
>> - 
>> ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar
>> # - 
>> ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar
>> # - 
>> ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar
>> # - 
>> ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar
>> depends_on:
>> - kafka
>> - jobmanager
>> command: taskmanager
>> environment:
>> - |
>> FLINK_PROPERTIES=
>> jobmanager.rpc.address: jobmanager
>> taskmanager.numberOfTaskSlots: 2
>> networks:
>> - standard
>> Table API:
>> def process_table():
>> 
>> print('Running the Table job now.’)  
>> env_settings = (
>> EnvironmentSettings
>> .new_instance()
>> .in_streaming_mode()
>> # .with_configuration(config)
>> .build()
>> )
>> t_env = TableEnvironment.create(env_settings)
>> 
>> t_env.execute_sql(
>> f"""
>> CREATE TABLE kafka_test_logs (
>> action2 STRING
>> ) WITH (
>> 'connector' = 'kafka',
>> 'topic' = 'test_logs',
>> 'properties.bootstrap.servers' = 'kafka:9092',
>> 'properties.group.id ' = 'flink_group',
>> 'scan.startup.mode' = 'earliest-offset',
>> 'format' = 'json',
>> 'json.ignore-parse-errors' = 'true'
>> )
>> """)
>> 
>> 
>> t_env.execute_sql("""
>> SELECT COUNT(*) AS message_count
>> FROM kafka_test_logs
>> """).print()
>> 
>> print('Table job has now completed.')
>> 
>> 
>> if __name__ == "__main__":
>>  process_table()
>> error:
>> 

Re: Flink 1.18.1 cannot read from Kafka

2024-04-09 Thread Biao Geng
Hi Phil,

Thanks for sharing the detailed information of the job. For you question,
how to you submit the job? After applying your yaml file, I think you will
successfully launch a flink cluster with 1 JM and 1 TM. Then you would
submit the pyflink job to the flink cluster. As the error you showed is the
client-side error, it is possible that your client's env does not contain
the flink-sql-kafka-connector jar which may lead to the exception.
By the way, the "Table API" codes in your mail is actually using the flink
SQL API, so the flink-sql-kafka-connector jar is required, which is exactly
what you have prepared. For pyflink's table API, you can have a look at
this document:
https://nightlies.apache.org/flink/flink-docs-master/api/python/examples/table/word_count.html

Best,
Biao Geng


Phil Stavridis  于2024年4月10日周三 03:10写道:

> Hello,
>
> I have set up Flink and Kafka containers using docker-compose, for testing
> how Flink works for processing Kafka messages.
> I primarily want to check how the Table API works but also how the Stream
> API would process the Kafka messages.
>
> I have included the main part of the docker-compose.yaml file I am using
> for the Flink cluster. I have commented out some of the JAR files I have
> tried out but I have mainly used the flink-sql-connector-kafka JAR. I would
> like to confirm if I am using any wrong configurations, which JARs should I
> be using for each API and if just need to use one JAR for both Table and
> Datastream APIs?
> I have also included the Flink client module I have used for both the
> Table and the Datastream APIs and the error messages.
> Any idea what is missing or if there is any configuration that seems wrong?
>
> docker-compose.yml
> flink_jobmanager:
> image: flink:1.18.1-scala_2.12
> container_name: flink_jobmanager
> ports:
> - "8081:8081"
> command: jobmanager
> volumes:
> -
> ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar
> # -
> ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar
> # -
> ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar
> # -
> ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar
> environment:
> - |
> FLINK_PROPERTIES=
> jobmanager.rpc.address: jobmanager
> networks:
> - standard
> depends_on:
> - kafka
>
> flink_taskmanager:
> image: flink:1.18.1-scala_2.12
> container_name: flink_taskmanager
> volumes:
> -
> ./libs/flink-sql-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-sql-connector-kafka-3.1.0-1.18.jar
> # -
> ./libs/flink-connector-kafka-3.1.0-1.18.jar:/opt/flink/lib/flink-connector-kafka-3.1.0-1.18.jar
> # -
> ./libs/flink-table-api-java-bridge-1.18.1.jar:/opt/flink/lib/flink-table-api-java-bridge-1.18.1.jar
> # -
> ./libs/flink-table-api-scala-bridge_2.12-1.18.1.jar:/opt/flink/lib/flink-table-api-scala-bridge_2.12-1.18.1.jar
> depends_on:
> - kafka
> - jobmanager
> command: taskmanager
> environment:
> - |
> FLINK_PROPERTIES=
> jobmanager.rpc.address: jobmanager
> taskmanager.numberOfTaskSlots: 2
> networks:
> - standard
> Table API:
> def process_table():
>
> print('Running the Table job now.’)
> env_settings = (
> EnvironmentSettings
> .new_instance()
> .in_streaming_mode()
> # .with_configuration(config)
> .build()
> )
> t_env = TableEnvironment.create(env_settings)
>
> t_env.execute_sql(
> f"""
> CREATE TABLE kafka_test_logs (
> action2 STRING
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'test_logs',
> 'properties.bootstrap.servers' = 'kafka:9092',
> 'properties.group.id' = 'flink_group',
> 'scan.startup.mode' = 'earliest-offset',
> 'format' = 'json',
> 'json.ignore-parse-errors' = 'true'
> )
> """)
>
>
> t_env.execute_sql("""
> SELECT COUNT(*) AS message_count
> FROM kafka_test_logs
> """).print()
>
> print('Table job has now completed.')
>
>
> if __name__ == "__main__":
> process_table()
> error:
> link_app | Caused by: org.apache.flink.table.api.ValidationException:
> Could not find any factory for identifier 'kafka' that implements
> 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
> flink_app |
> flink_app | Available factory identifiers are:
> flink_app |
> flink_app | blackhole
> flink_app | datagen
> flink_app | filesystem
> flink_app | print
> flink_app | python-input-format
> The '/opt/flink/lib' has the JARs from both the image and the JARs i added
> in the docker-compose.yml file.
>
> Stream API:
> def process_stream():
> env = StreamExecutionEnvironment.get_execution_environment()
>
> properties = {
> "bootstrap.servers": 'kafka:9092',
> "group.id": "test_group"
> }
>
> kafka_consumer = FlinkKafkaConsumer(
> topics='test_logs',
> deserialization_schema=SimpleStringSchema(),
> properties=properties)
>
> data_stream = env.add_source(kafka_consumer)
>
> parsed_stream = data_stream.map(lambda x: json.loads(x),
> output_type=Types.ROW([Types.STRING()]))
>