You’re trying to access localhost which is bound to individual containers.

Have you tried kafka-broker:9092 instead of localhost for the broker?

On Wed, Mar 11, 2026 at 18:19 Juan Romero <[email protected]> wrote:

> Hi!! I have the following services definition in docker compose file :
> version: "3"
>
>
> services:
> kafka-broker:
> image: apache/kafka:latest
> container_name: broker
> environment:
> KAFKA_NODE_ID: 1
> KAFKA_PROCESS_ROLES: broker,controller
> KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
> KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
> KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
> KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
> CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
> KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
> KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
> KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
> KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
> KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
> KAFKA_NUM_PARTITIONS: 3
> ports:
> - "9092:9092"
>
>
> jobmanager:
> image: flink:1.19.0-java17
> depends_on:
> - kafka-broker
> ports:
> - "8081:8081"
> command: jobmanager
> environment:
> - |
> FLINK_PROPERTIES=
> jobmanager.rpc.address: jobmanager
> taskmanager:
> image: flink:1.19.0-java17
> depends_on:
> - jobmanager
> - kafka-broker
> command: taskmanager
> scale: 1
> environment:
> - |
> FLINK_PROPERTIES=
> jobmanager.rpc.address: jobmanager
> taskmanager.numberOfTaskSlots: 2
>
> The pipeline definition is this one:
>
> import json
> import typing
> import argparse
> import apache_beam as beam
> from apache_beam.io.kafka import ReadFromKafka
> from apache_beam import Pipeline
> from apache_beam.options.pipeline_options import PipelineOptions
>
>
>
> def format_message(element) :
> key = 'test-key'
> return (key.encode('utf8'), json.dumps(element).encode('utf8'))
>
> def run():
> parser = argparse.ArgumentParser()
> args, beam_args = parser.parse_known_args()
> print(beam_args)
> pipeline_options = PipelineOptions(
> beam_args,
> streaming=True,
> save_main_session=True
> )
> consumer_config = {
> 'bootstrap.servers': "localhost:9092",
> 'group.id': 'consumer_id', # Consumer group ID
> 'auto.offset.reset': 'earliest' # Start reading from the beginning if no
> offset is committed,
> }
>
> with Pipeline(options=pipeline_options) as p:
> kafka_messages = p | "ReadFromKafka" >> ReadFromKafka(
> consumer_config=consumer_config,
> topics=['test-topic']
> )
> (kafka_messages | "Prepare " >> beam.Map(format_message
> ).with_output_types(typing.Tuple[bytes, bytes])
> | "Print" >> beam.Map(print)
> )
>
> if __name__ == "__main__":
> run()
>
>
> The command im using to execute the pipeline is this :
>
> python pipeline.py --runner=FlinkRunner --flink_master=localhost:8081
> --environment_type=LOOPBACK --streaming
>
> I'm using apache beam 2.71 . This is the steps recommended by the
> documentation but i got this error :
>
> WARNING:root:Waiting for grpc channel to be ready at localhost:52001.
> WARNING:root:Waiting for grpc channel to be ready at localhost:39153.
> WARNING:root:Waiting for grpc channel to be ready at localhost:39153.
> WARNING:root:Waiting for grpc channel to be ready at localhost:39153.
> WARNING:root:Waiting for grpc channel to be ready at localhost:39153.
> ERROR:root:java.lang.reflect.InaccessibleObjectException: Unable to make
> field private final byte[] java.lang.String.value accessible: module
> java.base does not "opens java.lang" to unnamed module @a4114d5
> Traceback (most recent call last):
>   File "/home/juansebastian.romero/git-repos/trusted_data/pipeline.py",
> line 46, in <module>
>     run()
>   File "/home/juansebastian.romero/git-repos/trusted_data/pipeline.py",
> line 34, in run
>     with Pipeline(options=pipeline_options) as p:
>   File
> "/home/juansebastian.romero/git-repos/trusted_data/.env/lib/python3.11/site-packages/apache_beam/pipeline.py",
> line 601, in __exit__
>     self.result.wait_until_finish()
>   File
> "/home/juansebastian.romero/git-repos/trusted_data/.env/lib/python3.11/site-packages/apache_beam/runners/portability/portable_runner.py",
> line 614, in wait_until_finish
>     raise self._runtime_exception
> RuntimeError: Pipeline
> BeamApp-juansebastian0romero-0311221307-4bfaded2_9fc33fde-6e9b-4454-9c1f-b23109ed0983
> failed in state FAILED: java.lang.reflect.InaccessibleObjectException:
> Unable to make field private final byte[] java.lang.String.value
> accessible: module java.base does not "opens java.lang" to unnamed module
> @a4114d5
>
> I have tried many things like change the image to flink1.19-java11 , and
> put the properties
>
> env.java.opts.all: >
> --add-opens=java.base/java.lang=ALL-UNNAMED
> --add-opens=java.base/java.util=ALL-UNNAMED
> --add-opens=java.base/java.io=ALL-UNNAMED
> --add-opens=java.base/java.nio=ALL-UNNAMED. , in flink config , but i
> havent beee able to solve the problem. *Anyone have an example of
> kafka reading in apache beam 2.71 running over flink. ?*
>

Reply via email to