Control who can manage Flink jobs

2023-11-30 Thread Поротиков Станислав Вячеславович via user
Hello!
Is there any way to control who can manage (submit/cancel) jobs to Flink 
cluster. We have multiple teams and I am looking for decision how can we use 
Beam+Flink safely.

Best regards,
Stanislav Porotikov


С уважением,
Поротиков Станислав
Инженер эскплуатации веб-сервисов
Команда SRS



Error while trying to connect to Kafka from Flink runner

2023-11-22 Thread Поротиков Станислав Вячеславович via user
Hello!
I am trying to run Beam pipeline in local docker-compose environment on top of 
Flink. I wrote my own Dockerfile for Flink jobmanager and taskmanager.
I need to connect to secure Kafka cluster through kerberos.
Dockerfile for my-image-apache-beam/flink:1.16-java11:
FROM flink:1.16-java11



# python SDK
COPY --from=apache/beam_python3.10_sdk /opt/apache/beam/ /opt/apache/beam/

# java SDK
COPY --from=apache/beam_java11_sdk:2.51.0 /opt/apache/beam/ 
/opt/apache/beam_java/

COPY krb5.conf /etc/
My docker-compose.yml
version: "2.2"
services:
  jobmanager:
image: my-image-apache-beam/flink:1.16-java11
ports:
  - "8081:8081"
volumes:
  - artifacts:/tmp/beam-artifact-staging
command: jobmanager
environment:
  - |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager

  taskmanager:
image: registry.kontur.host/srs/apache-beam/flink:1.16-java11
depends_on:
  - jobmanager
command: taskmanager
ports:
  - "8100-8200:8100-8200"
volumes:
  - artifacts:/tmp/beam-artifact-staging
scale: 1
extra_hosts:
  - "host.docker.internal:host-gateway"
environment:
  - |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
taskmanager.memory.process.size: 2Gb

  beam_job_server:
image: apache/beam_flink1.16_job_server
command: --flink-master=jobmanager --job-host=0.0.0.0
ports:
  - "8097:8097"
  - "8098:8098"
  - "8099:8099"
volumes:
  - artifacts:/tmp/beam-artifact-staging

  python-worker-harness:
image: "apache/beam_python3.10_sdk"
command: "-worker_pool"
ports:
  - "5:5"
volumes:
  - artifacts:/tmp/beam-artifact-staging


volumes:
artifacts:
And eventually my pipeline:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.kafka import ReadFromKafka, WriteToKafka, 
default_io_expansion_service

import os
import logging


job_server = "localhost"


pipeline_external_environment = [
"--runner=PortableRunner",
f"--job_endpoint={job_server}:8099",
f"--artifact_endpoint={job_server}:8098",
"--environment_type=EXTERNAL",
"--environment_config=python-worker-harness:5"
]

kafka_process_expansion_service = default_io_expansion_service(
append_args=[
"--defaultEnvironmentType=PROCESS",

"--defaultEnvironmentConfig={\"command\":\"/opt/apache/beam_java/boot\"}"
]
)


def run():
pipeline_options = PipelineOptions(pipeline_external_environment)

sasl_kerberos_principal = os.getenv('SASL_KERBEROS_PRINCIPAL')
sasl_kerberos_password = os.getenv('SASL_KERBEROS_PASSWORD')

source_config = {
'bootstrap.servers':
'kafka-host1:9093,kafka-host2:9093,kafka-host3:9093',
'security.protocol': 'SASL_PLAINTEXT',
'sasl.mechanism': 'GSSAPI',
'sasl.kerberos.service.name': 'kafka',
'sasl.kerberos.principal': f'{sasl_kerberos_principal}',
'sasl.kerberos.kinit.cmd': f'kinit -R || echo {sasl_kerberos_password} 
| kinit {sasl_kerberos_principal}',
'sasl.jaas.config':
f'com.sun.security.auth.module.Krb5LoginModule required debug=true 
principal={sasl_kerberos_principal} useTicketCache=true;',
'group.id': 'test_group_1',
'auto.offset.reset': 'earliest'}

source_topic = 'Test_Source2-0_0_0_0.id-0'

sink_topic = 'Beam.Test'

with beam.Pipeline(options=pipeline_options) as pipeline:
outputs = (pipeline
   | 'Read topic from Kafka' >> 
ReadFromKafka(consumer_config=source_config,
  
topics=[source_topic],
  
expansion_service=kafka_process_expansion_service
  )
   | 'Write topic to Kafka' >> 
WriteToKafka(producer_config=source_config,
topic=sink_topic,

expansion_service=kafka_process_expansion_service
)
   )


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()

But I got stuck with ERROR below:
INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Receive 
slot request 2f0a7a3cd89226651c2f84bd11e23321 for job 
1dc3e31750be59cab4f2fcd0710b255e from resource manager with leader id 
.
2023-11-22 12:52:29,065 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Allocated 
slot for 2f0a7a3cd89226651c2f84bd11e23321.
2023-11-22 12:52:29,065 INFO  
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job 
1dc3e31750be59cab4f2fcd0710b255e for job leader monitoring.
2023-11-22 12:52:29,066 INF