Hello!
I am trying to run Beam pipeline in local docker-compose environment on top of
Flink. I wrote my own Dockerfile for Flink jobmanager and taskmanager.
I need to connect to secure Kafka cluster through kerberos.
Dockerfile for my-image-apache-beam/flink:1.16-java11:
FROM flink:1.16-java11
# python SDK
COPY --from=apache/beam_python3.10_sdk /opt/apache/beam/ /opt/apache/beam/
# java SDK
COPY --from=apache/beam_java11_sdk:2.51.0 /opt/apache/beam/
/opt/apache/beam_java/
COPY krb5.conf /etc/
My docker-compose.yml
version: "2.2"
services:
jobmanager:
image: my-image-apache-beam/flink:1.16-java11
ports:
- "8081:8081"
volumes:
- artifacts:/tmp/beam-artifact-staging
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager:
image: registry.kontur.host/srs/apache-beam/flink:1.16-java11
depends_on:
- jobmanager
command: taskmanager
ports:
- "8100-8200:8100-8200"
volumes:
- artifacts:/tmp/beam-artifact-staging
scale: 1
extra_hosts:
- "host.docker.internal:host-gateway"
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
taskmanager.memory.process.size: 2Gb
beam_job_server:
image: apache/beam_flink1.16_job_server
command: --flink-master=jobmanager --job-host=0.0.0.0
ports:
- "8097:8097"
- "8098:8098"
- "8099:8099"
volumes:
- artifacts:/tmp/beam-artifact-staging
python-worker-harness:
image: "apache/beam_python3.10_sdk"
command: "-worker_pool"
ports:
- "5:5"
volumes:
- artifacts:/tmp/beam-artifact-staging
volumes:
artifacts:
And eventually my pipeline:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.kafka import ReadFromKafka, WriteToKafka,
default_io_expansion_service
import os
import logging
job_server = "localhost"
pipeline_external_environment = [
"--runner=PortableRunner",
f"--job_endpoint={job_server}:8099",
f"--artifact_endpoint={job_server}:8098",
"--environment_type=EXTERNAL",
"--environment_config=python-worker-harness:5"
]
kafka_process_expansion_service = default_io_expansion_service(
append_args=[
"--defaultEnvironmentType=PROCESS",
"--defaultEnvironmentConfig={\"command\":\"/opt/apache/beam_java/boot\"}"
]
)
def run():
pipeline_options = PipelineOptions(pipeline_external_environment)
sasl_kerberos_principal = os.getenv('SASL_KERBEROS_PRINCIPAL')
sasl_kerberos_password = os.getenv('SASL_KERBEROS_PASSWORD')
source_config = {
'bootstrap.servers':
'kafka-host1:9093,kafka-host2:9093,kafka-host3:9093',
'security.protocol': 'SASL_PLAINTEXT',
'sasl.mechanism': 'GSSAPI',
'sasl.kerberos.service.name': 'kafka',
'sasl.kerberos.principal': f'{sasl_kerberos_principal}',
'sasl.kerberos.kinit.cmd': f'kinit -R || echo {sasl_kerberos_password}
| kinit {sasl_kerberos_principal}',
'sasl.jaas.config':
f'com.sun.security.auth.module.Krb5LoginModule required debug=true
principal={sasl_kerberos_principal} useTicketCache=true;',
'group.id': 'test_group_1',
'auto.offset.reset': 'earliest'}
source_topic = 'Test_Source2-0_0_0_0.id-0'
sink_topic = 'Beam.Test'
with beam.Pipeline(options=pipeline_options) as pipeline:
outputs = (pipeline
| 'Read topic from Kafka' >>
ReadFromKafka(consumer_config=source_config,
topics=[source_topic],
expansion_service=kafka_process_expansion_service
)
| 'Write topic to Kafka' >>
WriteToKafka(producer_config=source_config,
topic=sink_topic,
expansion_service=kafka_process_expansion_service
)
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
But I got stuck with ERROR below:
INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Receive
slot request 2f0a7a3cd89226651c2f84bd11e23321 for job
1dc3e31750be59cab4f2fcd0710b255e from resource manager with leader id
.
2023-11-22 12:52:29,065 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Allocated
slot for 2f0a7a3cd89226651c2f84bd11e23321.
2023-11-22 12:52:29,065 INFO
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job
1dc3e31750be59cab4f2fcd0710b255e for job leader monitoring.
2023-11-22 12:52:29,066 INF