Hello,
I’ve been trying to write a pipeline in Python with apache-beam==2.38.0, which
starts with a ReadFromKafka step.
I have a flink cluster running in docker-compose, as well as the Flink job
service image, which should serve the expansion service on port 8097, according
to the docs:
https://beam.apache.org/releases/pydoc/2.38.0/apache_beam.io.kafka.html
<https://beam.apache.org/releases/pydoc/2.38.0/apache_beam.io.kafka.html>
However, when trying to launch the pipeline, the ReadFromKafka step fails with:
failed to connect to all addresses; last error: UNAVAILABLE:
ipv4:127.0.0.1:8097: Socket closed
I have tried a range of possible values for the expansion service passed to the
ReadFromKafka step: internal docker compose DNS, my LAN IP address, and they
all fail with various reasons. I have tried using different runners, changing
everything that I could change, but no luck.
What am I doing wrong?
Here are the minimal beam pipeline and docker-compose to reproduce:
pipeline.py
import apache_beam as beam
from apache_beam import window
from apache_beam.transforms.trigger import (
AccumulationMode,
Repeatedly,
AfterCount,
)
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.io.kafka import ReadFromKafka
pipeline_options = PipelineOptions(
[
"--runner=PortableRunner",
"--job_endpoint=localhost:8099",
"--environment_type=LOOPBACK",
]
)
pipeline_options.view_as(StandardOptions).streaming = True
with beam.Pipeline(options=pipeline_options) as pipeline:
_ = (
pipeline
| ReadFromKafka(
consumer_config={"bootstrap.servers": "redpanda:9092"},
topics=["my-topic"],
expansion_service="localhost:8097",
)
| beam.WindowInto(
window.GlobalWindows(),
trigger=Repeatedly(AfterCount(1)),
accumulation_mode=AccumulationMode.DISCARDING,
)
| beam.Map(print)
)
docker-compose.yml
version: '3.7'
services:
flink-jobmanager:
image: flink:1.14
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
flink-taskmanager:
image: flink:1.14
depends_on:
- flink-jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
beam-flink-jobserver:
image: apache/beam_flink1.14_job_server:latest
ports:
- "8097:8097"
- "8098:8098"
- "8099:8099"
Thanks in advance for your help!
Best,
Florentin