Re: Running Beam on a native Kubernetes Flink cluster

2021-08-15 Thread Gorjan Todorovski
Yes! That did it. Changed to localhost and all works fine now.
I was wrong thinking it  would like to connect to Beam SDK worker from my
client machine, hence i added the load balancer.

Thank you Jan!

On Sun, 15 Aug 2021 at 16:45, Jan Lukavský  wrote:

> Hi Gorjan,
>
> the address of localhost is hard-coded in the python worker pool (see
> [1]). There should be no need to setup a load-balancer for the worker_pool,
> if you have it as another container in each TM pod, it should suffice to
> replace {beam_sdk_url} with 'localhost'. Each TM will then have its own
> worker_pool, which should be just fine.
>
> Best,
>
>  Jan
>
> [1]
> https://github.com/apache/beam/blob/af59192ac95a564ca3d29b6385a6d1448f5a407d/sdks/python/apache_beam/runners/worker/worker_pool_main.py#L81
> On 8/14/21 4:37 PM, Gorjan Todorovski wrote:
>
> Hi!
>
> I need help implementing a native Kubernetes Flink cluster that needs to
> run batch jobs (run by TensorFlow Extended), but I am not sure I am
> configuring it right as I have issues running jobs on more than one task
> manager, while jobs run fine if there is only one TM.
>
> I use the following parameters for the job:
>
> "--runner=FlinkRunner",
> "--parallelism=4",
> f"--flink_master={flink_url}:8081",
> "--environment_type=EXTERNAL",
> f"--environment_config={beam_sdk_url}:5",
> "--flink_submit_uber_jar",
> "--worker_harness_container_image=none",
>
>
> I have configured the Beam workers to run as side-cars to the TM
> containers. I do this by configuring. task manager template for the pods
> like this:
>
> kubernetes.pod-template-file.taskmanager
>
> it is pointing out to a template file with contents:
>
> kind: Pod
> metadata:
>   name: taskmanager-pod-template
> spec:
>  #hostNetwork: true
>  containers:
>   - name: flink-main-container
> #image: apache/flink:scala_2.12
> env:
>   - name: AWS_REGION
> value: "eu-central-1"
>   - name: S3_VERIFY_SSL
> value: "0"
>   - name: PYTHONPATH
> value: "/data/flink/src"
> args: ["taskmanager"]
> ports:
> - containerPort: 6122 #22
>   name: rpc
> - containerPort: 6125
>   name: query-state
> livenessProbe:
>   tcpSocket:
> port: 6122 #22
>   initialDelaySeconds: 30
>   periodSeconds: 60
>   - name: beam-worker-pool
> env:
>   - name: PYTHONPATH
> value: "/data/flink/src"
>   - name: AWS_REGION
> value: "eu-central-1"
>   - name: S3_VERIFY_SSL
> value: "0"
> image: 848221505146.dkr.ecr.eu-central-1.amazonaws.com/flink-workers
> imagePullPolicy: Always
> args: ["--worker_pool"]
> ports:
> - containerPort: 5
>   name: pool
> livenessProbe:
>   tcpSocket:
> port: 5
>   initialDelaySeconds: 30
>   periodSeconds: 60
>
> I have also created a kubernetes load balancer for the task managers, so
> clients can connect on port 5. So I use that address when configuring:
>
> f"--environment_config={beam_sdk_url}:5",
>
> the problem is as it looks like the Beam SDK harness on one task manager
> wants to connect to the endpoint running on the other task manager, but
> looks for it on localhost:
>
> Log from beam-worker-pool on TM 2:
>
> 2021/08/11 09:43:16 Failed to obtain provisioning information: failed to dial 
> server at localhost:33705
> caused by:
> context deadline exceeded
>
> The provision endpoint on TM 1 is the one actually listening on the port
> 33705, while this is looking for it on localhost, so cannot connect to it.
>
> Showing how I test this:
>
> ...
>
> TM 1:
> 
> $ kubectl logs my-first-flink-cluster-taskmanager-1-1 -c beam-worker-pool
> 2021/08/12 09:10:34 Starting worker pool 1: python -m 
> apache_beam.runners.worker.worker_pool_main --service_port=5 
> --container_executable=/opt/apache/beam/boot
> Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', 
> '--logging_endpoint=localhost:33383', '--artifact_endpoint=localhost:43477', 
> '--provision_endpoint=localhost:40983', '--control_endpoint=localhost:34793']
> 2021/08/12 09:13:05 Failed to obtain provisioning information: failed to dial 
> server at localhost:40983
> caused by:
> context deadline exceeded
>
> TM 2:
> =
> $ kubectl logs my-first-flink-cluster-taskmanager-1-2 -c beam-worker-pool
> 2021/08/12 09:10:33 Starting worker pool 1: python -m 
> apache_beam.runners.worker.worker_pool_main --service_port=5 
> --container_executable=/opt/apache/beam/boot
> Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', 
> '--logging_endpoint=localhost:40497', '--artifact_endpoint=localhost:36245', 
> '--provision_endpoint=localhost:32907', '--control_endpoint=localhost:46083']
> 2021/08/12 09:13:09 

Re: Running Beam on a native Kubernetes Flink cluster

2021-08-15 Thread Jan Lukavský

Hi Gorjan,

the address of localhost is hard-coded in the python worker pool (see 
[1]). There should be no need to setup a load-balancer for the 
worker_pool, if you have it as another container in each TM pod, it 
should suffice to replace {beam_sdk_url} with 'localhost'. Each TM will 
then have its own worker_pool, which should be just fine.


Best,

 Jan

[1] 
https://github.com/apache/beam/blob/af59192ac95a564ca3d29b6385a6d1448f5a407d/sdks/python/apache_beam/runners/worker/worker_pool_main.py#L81


On 8/14/21 4:37 PM, Gorjan Todorovski wrote:

Hi!

I need help implementing a native Kubernetes Flink cluster that needs 
to run batch jobs (run by TensorFlow Extended), but I am not sure I am 
configuring it right as I have issues running jobs on more than one 
task manager, while jobs run fine if there is only one TM.


I use the following parameters for the job:

|"--runner=FlinkRunner", "--parallelism=4", 
f"--flink_master={flink_url}:8081", "--environment_type=EXTERNAL", 
f"--environment_config={beam_sdk_url}:5", 
"--flink_submit_uber_jar", "--worker_harness_container_image=none", |


I have configured the Beam workers to run as side-cars to the TM 
containers. I do this by configuring. task manager template for the 
pods like this:


|kubernetes.pod-template-file.taskmanager|

it is pointing out to a template file with contents:

|kind: Pod metadata: name: taskmanager-pod-template spec: 
#hostNetwork: true containers: - name: flink-main-container #image: 
apache/flink:scala_2.12 env: - name: AWS_REGION value: "eu-central-1" 
- name: S3_VERIFY_SSL value: "0" - name: PYTHONPATH value: 
"/data/flink/src" args: ["taskmanager"] ports: - containerPort: 6122 
#22 name: rpc - containerPort: 6125 name: query-state livenessProbe: 
tcpSocket: port: 6122 #22 initialDelaySeconds: 30 periodSeconds: 60 - 
name: beam-worker-pool env: - name: PYTHONPATH value: 
"/data/flink/src" - name: AWS_REGION value: "eu-central-1" - name: 
S3_VERIFY_SSL value: "0" image: 
848221505146.dkr.ecr.eu-central-1.amazonaws.com/flink-workers 
 
imagePullPolicy: Always args: ["--worker_pool"] ports: - 
containerPort: 5 name: pool livenessProbe: tcpSocket: port: 5 
initialDelaySeconds: 30 periodSeconds: 60 |


I have also created a kubernetes load balancer for the task managers, 
so clients can connect on port 5. So I use that address when 
configuring:


|f"--environment_config={beam_sdk_url}:5",|

the problem is as it looks like the Beam SDK harness on one task 
manager wants to connect to the endpoint running on the other task 
manager, but looks for it on localhost:


Log from beam-worker-pool on TM 2:

|2021/08/11 09:43:16 Failed to obtain provisioning information: failed 
to dial server at localhost:33705 caused by: context deadline exceeded |


The provision endpoint on TM 1 is the one actually listening on the 
port 33705, while this is looking for it on localhost, so cannot 
connect to it.


Showing how I test this:

|... TM 1:  $ kubectl logs 
my-first-flink-cluster-taskmanager-1-1 -c beam-worker-pool 2021/08/12 
09:10:34 Starting worker pool 1: python -m 
apache_beam.runners.worker.worker_pool_main --service_port=5 
--container_executable=/opt/apache/beam/boot Starting worker with 
command ['/opt/apache/beam/boot', '--id=1-1', 
'--logging_endpoint=localhost:33383', 
'--artifact_endpoint=localhost:43477', 
'--provision_endpoint=localhost:40983', 
'--control_endpoint=localhost:34793'] 2021/08/12 09:13:05 Failed to 
obtain provisioning information: failed to dial server at 
localhost:40983 caused by: context deadline exceeded TM 2: = $ 
kubectl logs my-first-flink-cluster-taskmanager-1-2 -c 
beam-worker-pool 2021/08/12 09:10:33 Starting worker pool 1: python -m 
apache_beam.runners.worker.worker_pool_main --service_port=5 
--container_executable=/opt/apache/beam/boot Starting worker with 
command ['/opt/apache/beam/boot', '--id=1-1', 
'--logging_endpoint=localhost:40497', 
'--artifact_endpoint=localhost:36245', 
'--provision_endpoint=localhost:32907', 
'--control_endpoint=localhost:46083'] 2021/08/12 09:13:09 Failed to 
obtain provisioning information: failed to dial server at 
localhost:32907 caused by: context deadline exceeded Testing: 
. TM 1:  $ kubectl exec -it 
my-first-flink-cluster-taskmanager-1-1 -c beam-worker-pool -- bash 
root@my-first-flink-cluster-taskmanager-1-1:/# curl localhost:40983 
curl: (7) Failed to connect to localhost port 40983: Connection 
refused root@my-first-flink-cluster-taskmanager-1-1:/# curl 
localhost:32907 Warning: Binary output can mess up your terminal. Use 
"--output -" to ... TM 2: = 
root@my-first-flink-cluster-taskmanager-1-2:/# curl localhost:32907 
curl: (7) Failed to connect to localhost port 32907: Connection 
refused root@my-first-flink-cluster-taskmanager-1-2:/# curl 
localhost:40983 Warning: Binary output can 

Running Beam on a native Kubernetes Flink cluster

2021-08-14 Thread Gorjan Todorovski
Hi!

I need help implementing a native Kubernetes Flink cluster that needs to
run batch jobs (run by TensorFlow Extended), but I am not sure I am
configuring it right as I have issues running jobs on more than one task
manager, while jobs run fine if there is only one TM.

I use the following parameters for the job:

"--runner=FlinkRunner",
"--parallelism=4",
f"--flink_master={flink_url}:8081",
"--environment_type=EXTERNAL",
f"--environment_config={beam_sdk_url}:5",
"--flink_submit_uber_jar",
"--worker_harness_container_image=none",


I have configured the Beam workers to run as side-cars to the TM
containers. I do this by configuring. task manager template for the pods
like this:

kubernetes.pod-template-file.taskmanager

it is pointing out to a template file with contents:

kind: Pod
metadata:
  name: taskmanager-pod-template
spec:
 #hostNetwork: true
 containers:
  - name: flink-main-container
#image: apache/flink:scala_2.12
env:
  - name: AWS_REGION
value: "eu-central-1"
  - name: S3_VERIFY_SSL
value: "0"
  - name: PYTHONPATH
value: "/data/flink/src"
args: ["taskmanager"]
ports:
- containerPort: 6122 #22
  name: rpc
- containerPort: 6125
  name: query-state
livenessProbe:
  tcpSocket:
port: 6122 #22
  initialDelaySeconds: 30
  periodSeconds: 60
  - name: beam-worker-pool
env:
  - name: PYTHONPATH
value: "/data/flink/src"
  - name: AWS_REGION
value: "eu-central-1"
  - name: S3_VERIFY_SSL
value: "0"
image: 848221505146.dkr.ecr.eu-central-1.amazonaws.com/flink-workers
imagePullPolicy: Always
args: ["--worker_pool"]
ports:
- containerPort: 5
  name: pool
livenessProbe:
  tcpSocket:
port: 5
  initialDelaySeconds: 30
  periodSeconds: 60

I have also created a kubernetes load balancer for the task managers, so
clients can connect on port 5. So I use that address when configuring:

f"--environment_config={beam_sdk_url}:5",

the problem is as it looks like the Beam SDK harness on one task manager
wants to connect to the endpoint running on the other task manager, but
looks for it on localhost:

Log from beam-worker-pool on TM 2:

2021/08/11 09:43:16 Failed to obtain provisioning information: failed
to dial server at localhost:33705
caused by:
context deadline exceeded

The provision endpoint on TM 1 is the one actually listening on the port
33705, while this is looking for it on localhost, so cannot connect to it.

Showing how I test this:

...

TM 1:

$ kubectl logs my-first-flink-cluster-taskmanager-1-1 -c beam-worker-pool
2021/08/12 09:10:34 Starting worker pool 1: python -m
apache_beam.runners.worker.worker_pool_main --service_port=5
--container_executable=/opt/apache/beam/boot
Starting worker with command ['/opt/apache/beam/boot', '--id=1-1',
'--logging_endpoint=localhost:33383',
'--artifact_endpoint=localhost:43477',
'--provision_endpoint=localhost:40983',
'--control_endpoint=localhost:34793']
2021/08/12 09:13:05 Failed to obtain provisioning information: failed
to dial server at localhost:40983
caused by:
context deadline exceeded

TM 2:
=
$ kubectl logs my-first-flink-cluster-taskmanager-1-2 -c beam-worker-pool
2021/08/12 09:10:33 Starting worker pool 1: python -m
apache_beam.runners.worker.worker_pool_main --service_port=5
--container_executable=/opt/apache/beam/boot
Starting worker with command ['/opt/apache/beam/boot', '--id=1-1',
'--logging_endpoint=localhost:40497',
'--artifact_endpoint=localhost:36245',
'--provision_endpoint=localhost:32907',
'--control_endpoint=localhost:46083']
2021/08/12 09:13:09 Failed to obtain provisioning information: failed
to dial server at localhost:32907
caused by:
context deadline exceeded

Testing:
.

TM 1:

$ kubectl exec -it my-first-flink-cluster-taskmanager-1-1 -c
beam-worker-pool -- bash
root@my-first-flink-cluster-taskmanager-1-1:/# curl localhost:40983
curl: (7) Failed to connect to localhost port 40983: Connection refused

root@my-first-flink-cluster-taskmanager-1-1:/# curl localhost:32907
Warning: Binary output can mess up your terminal. Use "--output -" to ...


TM 2:
=
root@my-first-flink-cluster-taskmanager-1-2:/# curl localhost:32907
curl: (7) Failed to connect to localhost port 32907: Connection refused

root@my-first-flink-cluster-taskmanager-1-2:/# curl localhost:40983
Warning: Binary output can mess up your terminal. Use "--output -" to tell
Warning: curl to output it to your terminal anyway, or consider "--output

Not sure how to fix this.

Thanks, Gorjan