Hi all,

I'm still dealing with the PyFlink deployment issue as described below. I
see that I accidentally didn't forward it to the mailing list. Anyways, my
job is stuck in `Initializing` and the logs don't really give me a clue
what is going on.
In my IDE it runs fine. The command I use to submit to the cluster:

export
PYFLINK_CLIENT_EXECUTABLE=~/Documents/stateflow-evaluation/venv/bin/python

./flink run \
  --target remote \
  -m localhost:8081 \
  -pyarch venv.zip \
  --pyExecutable venv.zip/venv/bin/python \
  --parallelism 1 \
  --python ~/Documents/stateflow-evaluation/pyflink_runtime.py \
  --jarfile ~/Documents/stateflow-evaluation/benchmark/bin/combined.jar

I hope someone can help me with this because it is a blocker for me.

Thanks in advance,
Wouter
---------- Forwarded message ---------
From: Wouter Zorgdrager <zorgdrag...@gmail.com>
Date: Thu, 8 Jul 2021 at 12:20
Subject: Re: PyFlink performance and deployment issues
To: Xingbo Huang <hxbks...@gmail.com>


HI Xingbo, all,

Regarding point 2, I actually made a mistake there. I picked port 8081
(WebUI port) rather than the job submission port (--target remote -m
localhost:8081). For some reason, this does not give an error or warning
and just starts a local cluster. However, now I got another issue: my job
is stuck at initialization. Here an excerpt from the JM log:

2021-07-08 12:12:18,094 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Deploying _stream_key_by_map_operator (1/1) (attempt #0)
with attempt id ca9abcc644c05f62a47b83f391c85cd9 to 127.0.1.1:38179-f09c77 @
wouter (dataPort=40987) with allocation id d6b810455e97d0a952fb825ccec27c45
2021-07-08 12:12:18,097 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Process-Stateful-User (1/1)
(f40fac621cb94c79cdb82146ae5521bb) switched from SCHEDULED to DEPLOYING.
2021-07-08 12:12:18,097 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Deploying Process-Stateful-User (1/1) (attempt #0) with
attempt id f40fac621cb94c79cdb82146ae5521bb to 127.0.1.1:38179-f09c77 @
wouter (dataPort=40987) with allocation id d6b810455e97d0a952fb825ccec27c45
2021-07-08 12:12:18,098 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Map-Egress -> (Filter -> Kafka-To-Client -> Sink:
Unnamed, Filter -> Kafka-To-Internal -> Sink: Unnamed) (1/1)
(58deef879a00052ba6b3447917005c35)
switched from SCHEDULED to DEPLOYING.
2021-07-08 12:12:18,098 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Deploying Map-Egress -> (Filter -> Kafka-To-Client ->
Sink: Unnamed, Filter -> Kafka-To-Internal -> Sink: Unnamed) (1/1) (attempt
#0) with attempt id 58deef879a00052ba6b3447917005c35 to 127.0.1.1:38179-f09c77
@ wouter (dataPort=40987) with allocation id
d6b810455e97d0a952fb825ccec27c45
2021-07-08 12:12:18,484 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Process-Stateful-User (1/1)
(f40fac621cb94c79cdb82146ae5521bb) switched from DEPLOYING to INITIALIZING.
2021-07-08 12:12:18,488 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - _stream_key_by_map_operator (1/1)
(ca9abcc644c05f62a47b83f391c85cd9) switched from DEPLOYING to INITIALIZING.
2021-07-08 12:12:18,489 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Map-Egress -> (Filter -> Kafka-To-Client -> Sink:
Unnamed, Filter -> Kafka-To-Internal -> Sink: Unnamed) (1/1)
(58deef879a00052ba6b3447917005c35)
switched from DEPLOYING to INITIALIZING.
2021-07-08 12:12:18,490 INFO org.apache.flink.runtime.executiongraph.
ExecutionGraph [] - Source: Custom Source -> Route-Incoming-Events -> (
Filter-On-User -> Map -> (Filter-Init-User -> Init-User, Filter-Stateful-
User), Filter -> Map) (1/1) (c48649bd76abaf77486104e8cfcee7d8) switched from
DEPLOYING to INITIALIZING.

I run with parallelism 1 and these are the latest loglines from the TM
(there is no obvious error):
2021-07-08 12:12:18,729 INFO org.apache.flink.streaming.api.operators.
AbstractStreamOperator [] - The maximum bundle size is configured to 5.
2021-07-08 12:12:18,729 INFO org.apache.flink.streaming.api.operators.
AbstractStreamOperator [] - The maximum bundle time is configured to 1
milliseconds.
2021-07-08 12:12:18,791 WARN
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
[] - Error while loading kafka-version.properties: inStream parameter is
null
2021-07-08 12:12:18,792 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
[] - Kafka version: unknown
2021-07-08 12:12:18,792 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
[] - Kafka commitId: unknown
2021-07-08 12:12:18,792 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
[] - Kafka startTimeMs: 1625739138789
2021-07-08 12:12:18,806 INFO org.apache.flink.streaming.connectors.kafka.
FlinkKafkaProducer [] - Starting FlinkKafkaInternalProducer (1/1) to
produce into default topic client_reply
2021-07-08 12:12:18,815 INFO org.apache.flink.streaming.api.operators.
AbstractStreamOperator [] - The maximum bundle size is configured to 5.
2021-07-08 12:12:18,816 INFO org.apache.flink.streaming.api.operators.
AbstractStreamOperator [] - The maximum bundle time is configured to 1
milliseconds.
2021-07-08 12:12:19,119 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata [] - [
Producer clientId=producer-1] Cluster ID: NRL80FEjRjWzJKcqedz80A
2021-07-08 12:12:20,138 INFO org.apache.flink.configuration.
GlobalConfiguration [] - Loading configuration property:
jobmanager.rpc.address, localhost
2021-07-08 12:12:20,138 INFO org.apache.flink.configuration.
GlobalConfiguration [] - Loading configuration property:
jobmanager.rpc.port, 6123
2021-07-08 12:12:20,139 INFO org.apache.flink.configuration.
GlobalConfiguration [] - Loading configuration property:
jobmanager.memory.process.size, 1600m
2021-07-08 12:12:20,139 INFO org.apache.flink.configuration.
GlobalConfiguration [] - Loading configuration property:
taskmanager.memory.process.size, 1728m
2021-07-08 12:12:20,139 INFO org.apache.flink.configuration.
GlobalConfiguration [] - Loading configuration property:
taskmanager.numberOfTaskSlots, 1
2021-07-08 12:12:20,141 INFO org.apache.flink.configuration.
GlobalConfiguration [] - Loading configuration property: parallelism.default,
1
2021-07-08 12:12:20,142 INFO org.apache.flink.configuration.
GlobalConfiguration [] - Loading configuration property:
jobmanager.execution.failover-strategy, region
2021-07-08 12:12:20,953 INFO org.apache.flink.python.env.beam.
ProcessPythonEnvironmentManager [] - PYTHONPATH of python worker: null
2021-07-08 12:12:20,953 INFO org.apache.flink.python.env.beam.
ProcessPythonEnvironmentManager [] - Python working dir of python worker:
/tmp/python-dist-98c4419a-2048-4d9c-a082-7f83ef35c35c/python-archives
2021-07-08 12:12:27,869 INFO org.apache.flink.python.env.beam.
ProcessPythonEnvironmentManager [] - Python interpreter path:
venv.zip/venv/bin/python

I trimmed some of the Kafka config output. Any thoughts on what I am doing
wrong? It seems to be stuck setting up the Python environment/connections.

Thanks!
Wouter

On Thu, 8 Jul 2021 at 07:17, Xingbo Huang <hxbks...@gmail.com> wrote:

> Hi Wouter,
> Sorry for the late reply. I will try to answer your questions in detail.
>
> 1. >>> Perforce problem.
> When running udf job locally, beam will use a loopback way to connect back
> to the python process used by the compilation job, so the time of starting
> up the job will come faster than pyflink which will create a new python
> process to execute udf code.
>
> 2. >>> However, this command created a local MiniCluster again rather than
> submitting it to my remote cluster.
> I tried to successfully submit a Python job to the standalone cluster to
> run through the following command
>
> .bin/start-cluster.sh
> ./bin/flink run --target remote \
> -m localhost:8086 \
> -pyarch /Users/duanchen/venv/venv.zip \
> -pyexec venv.zip/venv/bin/python \
> --parallelism 1 \
> --python
> /Users/duanchen/sourcecode/pyflink-performance-demo/python/flink/flink-perf-test.py
> \
> --jarfile
> /Users/duanchen/sourcecode/pyflink-performance-demo/java/target/flink-perf-tests-0.1.jar
>
> The situation you encountered is very strange
>
> 3. >>> In my second attempt, I tried deploying it to a Kubernetes cluster
> using the following command:
>
> When running in Application mode, you should be sure that all paths are
> accessible by the JobManager of your application. The path of
> ~/Documents/runtime.py is under your client side, right? You need to use
> the path under your k8s cluster. This part of the document does not explain
> these implicit things well.
>
> 4. >>> Lastly, I wondered if it is possible to set a key for events send
> to the KafkaProducer.
> You can see if Kafka Table Connector[1] can meet your needs.
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#features
>
> Best,
> Xingbo
>
> Wouter Zorgdrager <zorgdrag...@gmail.com> 于2021年7月6日周二 下午4:58写道:
>
>> Dear community,
>>
>> I have been struggling a lot with the deployment of my PyFlink job.
>> Moreover, the performance seems to be very disappointing especially the
>> low-throughput latency. I have been playing around with configuration
>> values, but it has not been improving.
>> In short, I have a Datastream job with multiple Python operators
>> including a ProcessFunction. The job reads from Kafka and writes to Kafka
>> again. For single events, E2E latency has been somewhere between 600ms and
>> 2000ms. When I'm increasing throughput, latency becomes in the order of
>> seconds.
>> This is when I configure my job like this
>>         config.set_integer("python.fn-execution.bundle.time", 1)
>>         config.set_integer("python.fn-execution.bundle.size", 1)
>> I tried several configuration values, but the results are similar.
>> Interestingly, I have a similar Python streaming application written in
>> Apache Beam which does have low-latency, single events are processed <
>> 30ms.  If I recall correctly, they use the same technique with bundling and
>> sending to Python processes.
>> On the other hand, Beam uses an in-memory runner when running locally
>> which might change the situation. I'm not sure how that compares to a local
>> Flink MiniCluster.
>>
>> I hoped that performance might improve when I deploy this on a (remote)
>> Flink cluster. Unfortunately, I had a lot of trouble deploying this PyFlink
>> job to a remote Flink cluster. In my first attempt, I created a local TM +
>> JM setup and tried to deploy it using the "./flink run" command.
>> However, this command created a local MiniCluster again rather than
>> submitting it to my remote cluster. The full command was:
>> ./flink run --target remote \
>> -m localhost:8081 \
>> -pyarch venv.zip \
>> -pyexec venv.zip/venv/bin/python \
>> --parallelism 4 \
>> --python ~/Documents/runtime.py \
>> --jarfile ~/Documents/combined.jar
>>
>> Note that venv.zip stores all the Python dependencies for my PyFlink job
>> whereas combined.jar stores the Java dependencies. I tried several
>> variants of this command, but it *never *submitted to my running
>> JobManager and always ran it locally.
>> In my second attempt, I tried deploying it to a Kubernetes cluster using
>> the following command:
>>
>> ./flink run-application \
>> --target kubernetes-application \
>> -Dkubernetes.cluster-id=flink-cluster \
>> -Dtaskmanager.memory.process.size=4096m \
>> -Dkubernetes.taskmanager.cpu=2 \
>> -Dkubernetes.service-account=flink-service-account \
>> -Dtaskmanager.numberOfTaskSlots=4 \
>> -Dkubernetes.container.image=pyflink:latest \
>> -pyarch venv.zip \
>> -pyexec venv.zip/venv/bin/python \
>> --parallelism 4 \
>> -py ~/Documents/runtime.py \
>> --jarfile ~/Documents/combined.jar
>>
>> I created the pyflink:latest image by following the documentation here
>> [1] It was unclear to me if had to include my project files in this Docker
>> image.
>> When running it like this, it did submit it to the remote K8s cluster but
>> I got an exception that it could not find my runtime.py file in some
>> sort of tmp folder.
>>
>> Lastly, I wondered if it is possible to set a key for events send to the
>> KafkaProducer. Right now, it seems you can only configure some (static)
>> properties and the serializer.
>> Is there are a workaround to be able to set the key and value of an event
>> in PyFlink?
>>
>> I hope you can help me out with my struggles! Thanks in advance.
>>
>> Regards,
>> Wouter
>>
>> [1] -
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/docker/#enabling-python
>>
>>
>

Reply via email to