Hi all, I'm still dealing with the PyFlink deployment issue as described below. I see that I accidentally didn't forward it to the mailing list. Anyways, my job is stuck in `Initializing` and the logs don't really give me a clue what is going on. In my IDE it runs fine. The command I use to submit to the cluster:
export PYFLINK_CLIENT_EXECUTABLE=~/Documents/stateflow-evaluation/venv/bin/python ./flink run \ --target remote \ -m localhost:8081 \ -pyarch venv.zip \ --pyExecutable venv.zip/venv/bin/python \ --parallelism 1 \ --python ~/Documents/stateflow-evaluation/pyflink_runtime.py \ --jarfile ~/Documents/stateflow-evaluation/benchmark/bin/combined.jar I hope someone can help me with this because it is a blocker for me. Thanks in advance, Wouter ---------- Forwarded message --------- From: Wouter Zorgdrager <zorgdrag...@gmail.com> Date: Thu, 8 Jul 2021 at 12:20 Subject: Re: PyFlink performance and deployment issues To: Xingbo Huang <hxbks...@gmail.com> HI Xingbo, all, Regarding point 2, I actually made a mistake there. I picked port 8081 (WebUI port) rather than the job submission port (--target remote -m localhost:8081). For some reason, this does not give an error or warning and just starts a local cluster. However, now I got another issue: my job is stuck at initialization. Here an excerpt from the JM log: 2021-07-08 12:12:18,094 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Deploying _stream_key_by_map_operator (1/1) (attempt #0) with attempt id ca9abcc644c05f62a47b83f391c85cd9 to 127.0.1.1:38179-f09c77 @ wouter (dataPort=40987) with allocation id d6b810455e97d0a952fb825ccec27c45 2021-07-08 12:12:18,097 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Process-Stateful-User (1/1) (f40fac621cb94c79cdb82146ae5521bb) switched from SCHEDULED to DEPLOYING. 2021-07-08 12:12:18,097 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Deploying Process-Stateful-User (1/1) (attempt #0) with attempt id f40fac621cb94c79cdb82146ae5521bb to 127.0.1.1:38179-f09c77 @ wouter (dataPort=40987) with allocation id d6b810455e97d0a952fb825ccec27c45 2021-07-08 12:12:18,098 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Map-Egress -> (Filter -> Kafka-To-Client -> Sink: Unnamed, Filter -> Kafka-To-Internal -> Sink: Unnamed) (1/1) (58deef879a00052ba6b3447917005c35) switched from SCHEDULED to DEPLOYING. 2021-07-08 12:12:18,098 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Deploying Map-Egress -> (Filter -> Kafka-To-Client -> Sink: Unnamed, Filter -> Kafka-To-Internal -> Sink: Unnamed) (1/1) (attempt #0) with attempt id 58deef879a00052ba6b3447917005c35 to 127.0.1.1:38179-f09c77 @ wouter (dataPort=40987) with allocation id d6b810455e97d0a952fb825ccec27c45 2021-07-08 12:12:18,484 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Process-Stateful-User (1/1) (f40fac621cb94c79cdb82146ae5521bb) switched from DEPLOYING to INITIALIZING. 2021-07-08 12:12:18,488 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - _stream_key_by_map_operator (1/1) (ca9abcc644c05f62a47b83f391c85cd9) switched from DEPLOYING to INITIALIZING. 2021-07-08 12:12:18,489 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Map-Egress -> (Filter -> Kafka-To-Client -> Sink: Unnamed, Filter -> Kafka-To-Internal -> Sink: Unnamed) (1/1) (58deef879a00052ba6b3447917005c35) switched from DEPLOYING to INITIALIZING. 2021-07-08 12:12:18,490 INFO org.apache.flink.runtime.executiongraph. ExecutionGraph [] - Source: Custom Source -> Route-Incoming-Events -> ( Filter-On-User -> Map -> (Filter-Init-User -> Init-User, Filter-Stateful- User), Filter -> Map) (1/1) (c48649bd76abaf77486104e8cfcee7d8) switched from DEPLOYING to INITIALIZING. I run with parallelism 1 and these are the latest loglines from the TM (there is no obvious error): 2021-07-08 12:12:18,729 INFO org.apache.flink.streaming.api.operators. AbstractStreamOperator [] - The maximum bundle size is configured to 5. 2021-07-08 12:12:18,729 INFO org.apache.flink.streaming.api.operators. AbstractStreamOperator [] - The maximum bundle time is configured to 1 milliseconds. 2021-07-08 12:12:18,791 WARN org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Error while loading kafka-version.properties: inStream parameter is null 2021-07-08 12:12:18,792 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Kafka version: unknown 2021-07-08 12:12:18,792 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Kafka commitId: unknown 2021-07-08 12:12:18,792 INFO org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - Kafka startTimeMs: 1625739138789 2021-07-08 12:12:18,806 INFO org.apache.flink.streaming.connectors.kafka. FlinkKafkaProducer [] - Starting FlinkKafkaInternalProducer (1/1) to produce into default topic client_reply 2021-07-08 12:12:18,815 INFO org.apache.flink.streaming.api.operators. AbstractStreamOperator [] - The maximum bundle size is configured to 5. 2021-07-08 12:12:18,816 INFO org.apache.flink.streaming.api.operators. AbstractStreamOperator [] - The maximum bundle time is configured to 1 milliseconds. 2021-07-08 12:12:19,119 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata [] - [ Producer clientId=producer-1] Cluster ID: NRL80FEjRjWzJKcqedz80A 2021-07-08 12:12:20,138 INFO org.apache.flink.configuration. GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.address, localhost 2021-07-08 12:12:20,138 INFO org.apache.flink.configuration. GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.port, 6123 2021-07-08 12:12:20,139 INFO org.apache.flink.configuration. GlobalConfiguration [] - Loading configuration property: jobmanager.memory.process.size, 1600m 2021-07-08 12:12:20,139 INFO org.apache.flink.configuration. GlobalConfiguration [] - Loading configuration property: taskmanager.memory.process.size, 1728m 2021-07-08 12:12:20,139 INFO org.apache.flink.configuration. GlobalConfiguration [] - Loading configuration property: taskmanager.numberOfTaskSlots, 1 2021-07-08 12:12:20,141 INFO org.apache.flink.configuration. GlobalConfiguration [] - Loading configuration property: parallelism.default, 1 2021-07-08 12:12:20,142 INFO org.apache.flink.configuration. GlobalConfiguration [] - Loading configuration property: jobmanager.execution.failover-strategy, region 2021-07-08 12:12:20,953 INFO org.apache.flink.python.env.beam. ProcessPythonEnvironmentManager [] - PYTHONPATH of python worker: null 2021-07-08 12:12:20,953 INFO org.apache.flink.python.env.beam. ProcessPythonEnvironmentManager [] - Python working dir of python worker: /tmp/python-dist-98c4419a-2048-4d9c-a082-7f83ef35c35c/python-archives 2021-07-08 12:12:27,869 INFO org.apache.flink.python.env.beam. ProcessPythonEnvironmentManager [] - Python interpreter path: venv.zip/venv/bin/python I trimmed some of the Kafka config output. Any thoughts on what I am doing wrong? It seems to be stuck setting up the Python environment/connections. Thanks! Wouter On Thu, 8 Jul 2021 at 07:17, Xingbo Huang <hxbks...@gmail.com> wrote: > Hi Wouter, > Sorry for the late reply. I will try to answer your questions in detail. > > 1. >>> Perforce problem. > When running udf job locally, beam will use a loopback way to connect back > to the python process used by the compilation job, so the time of starting > up the job will come faster than pyflink which will create a new python > process to execute udf code. > > 2. >>> However, this command created a local MiniCluster again rather than > submitting it to my remote cluster. > I tried to successfully submit a Python job to the standalone cluster to > run through the following command > > .bin/start-cluster.sh > ./bin/flink run --target remote \ > -m localhost:8086 \ > -pyarch /Users/duanchen/venv/venv.zip \ > -pyexec venv.zip/venv/bin/python \ > --parallelism 1 \ > --python > /Users/duanchen/sourcecode/pyflink-performance-demo/python/flink/flink-perf-test.py > \ > --jarfile > /Users/duanchen/sourcecode/pyflink-performance-demo/java/target/flink-perf-tests-0.1.jar > > The situation you encountered is very strange > > 3. >>> In my second attempt, I tried deploying it to a Kubernetes cluster > using the following command: > > When running in Application mode, you should be sure that all paths are > accessible by the JobManager of your application. The path of > ~/Documents/runtime.py is under your client side, right? You need to use > the path under your k8s cluster. This part of the document does not explain > these implicit things well. > > 4. >>> Lastly, I wondered if it is possible to set a key for events send > to the KafkaProducer. > You can see if Kafka Table Connector[1] can meet your needs. > > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#features > > Best, > Xingbo > > Wouter Zorgdrager <zorgdrag...@gmail.com> 于2021年7月6日周二 下午4:58写道: > >> Dear community, >> >> I have been struggling a lot with the deployment of my PyFlink job. >> Moreover, the performance seems to be very disappointing especially the >> low-throughput latency. I have been playing around with configuration >> values, but it has not been improving. >> In short, I have a Datastream job with multiple Python operators >> including a ProcessFunction. The job reads from Kafka and writes to Kafka >> again. For single events, E2E latency has been somewhere between 600ms and >> 2000ms. When I'm increasing throughput, latency becomes in the order of >> seconds. >> This is when I configure my job like this >> config.set_integer("python.fn-execution.bundle.time", 1) >> config.set_integer("python.fn-execution.bundle.size", 1) >> I tried several configuration values, but the results are similar. >> Interestingly, I have a similar Python streaming application written in >> Apache Beam which does have low-latency, single events are processed < >> 30ms. If I recall correctly, they use the same technique with bundling and >> sending to Python processes. >> On the other hand, Beam uses an in-memory runner when running locally >> which might change the situation. I'm not sure how that compares to a local >> Flink MiniCluster. >> >> I hoped that performance might improve when I deploy this on a (remote) >> Flink cluster. Unfortunately, I had a lot of trouble deploying this PyFlink >> job to a remote Flink cluster. In my first attempt, I created a local TM + >> JM setup and tried to deploy it using the "./flink run" command. >> However, this command created a local MiniCluster again rather than >> submitting it to my remote cluster. The full command was: >> ./flink run --target remote \ >> -m localhost:8081 \ >> -pyarch venv.zip \ >> -pyexec venv.zip/venv/bin/python \ >> --parallelism 4 \ >> --python ~/Documents/runtime.py \ >> --jarfile ~/Documents/combined.jar >> >> Note that venv.zip stores all the Python dependencies for my PyFlink job >> whereas combined.jar stores the Java dependencies. I tried several >> variants of this command, but it *never *submitted to my running >> JobManager and always ran it locally. >> In my second attempt, I tried deploying it to a Kubernetes cluster using >> the following command: >> >> ./flink run-application \ >> --target kubernetes-application \ >> -Dkubernetes.cluster-id=flink-cluster \ >> -Dtaskmanager.memory.process.size=4096m \ >> -Dkubernetes.taskmanager.cpu=2 \ >> -Dkubernetes.service-account=flink-service-account \ >> -Dtaskmanager.numberOfTaskSlots=4 \ >> -Dkubernetes.container.image=pyflink:latest \ >> -pyarch venv.zip \ >> -pyexec venv.zip/venv/bin/python \ >> --parallelism 4 \ >> -py ~/Documents/runtime.py \ >> --jarfile ~/Documents/combined.jar >> >> I created the pyflink:latest image by following the documentation here >> [1] It was unclear to me if had to include my project files in this Docker >> image. >> When running it like this, it did submit it to the remote K8s cluster but >> I got an exception that it could not find my runtime.py file in some >> sort of tmp folder. >> >> Lastly, I wondered if it is possible to set a key for events send to the >> KafkaProducer. Right now, it seems you can only configure some (static) >> properties and the serializer. >> Is there are a workaround to be able to set the key and value of an event >> in PyFlink? >> >> I hope you can help me out with my struggles! Thanks in advance. >> >> Regards, >> Wouter >> >> [1] - >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/docker/#enabling-python >> >> >