The problem is not related with Kafka broker. The problem is related when i submit the job to apache flink
El El mié, 11 mar. 2026 a la(s) 5:29 p.m., Nathan Fisher < [email protected]> escribió: > You’re trying to access localhost which is bound to individual containers. > > Have you tried kafka-broker:9092 instead of localhost for the broker? > > On Wed, Mar 11, 2026 at 18:19 Juan Romero <[email protected]> wrote: > >> Hi!! I have the following services definition in docker compose file : >> version: "3" >> >> >> services: >> kafka-broker: >> image: apache/kafka:latest >> container_name: broker >> environment: >> KAFKA_NODE_ID: 1 >> KAFKA_PROCESS_ROLES: broker,controller >> KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 >> KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 >> KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER >> KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: >> CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT >> KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093 >> KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 >> KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 >> KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 >> KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 >> KAFKA_NUM_PARTITIONS: 3 >> ports: >> - "9092:9092" >> >> >> jobmanager: >> image: flink:1.19.0-java17 >> depends_on: >> - kafka-broker >> ports: >> - "8081:8081" >> command: jobmanager >> environment: >> - | >> FLINK_PROPERTIES= >> jobmanager.rpc.address: jobmanager >> taskmanager: >> image: flink:1.19.0-java17 >> depends_on: >> - jobmanager >> - kafka-broker >> command: taskmanager >> scale: 1 >> environment: >> - | >> FLINK_PROPERTIES= >> jobmanager.rpc.address: jobmanager >> taskmanager.numberOfTaskSlots: 2 >> >> The pipeline definition is this one: >> >> import json >> import typing >> import argparse >> import apache_beam as beam >> from apache_beam.io.kafka import ReadFromKafka >> from apache_beam import Pipeline >> from apache_beam.options.pipeline_options import PipelineOptions >> >> >> >> def format_message(element) : >> key = 'test-key' >> return (key.encode('utf8'), json.dumps(element).encode('utf8')) >> >> def run(): >> parser = argparse.ArgumentParser() >> args, beam_args = parser.parse_known_args() >> print(beam_args) >> pipeline_options = PipelineOptions( >> beam_args, >> streaming=True, >> save_main_session=True >> ) >> consumer_config = { >> 'bootstrap.servers': "localhost:9092", >> 'group.id': 'consumer_id', # Consumer group ID >> 'auto.offset.reset': 'earliest' # Start reading from the beginning if no >> offset is committed, >> } >> >> with Pipeline(options=pipeline_options) as p: >> kafka_messages = p | "ReadFromKafka" >> ReadFromKafka( >> consumer_config=consumer_config, >> topics=['test-topic'] >> ) >> (kafka_messages | "Prepare " >> beam.Map(format_message >> ).with_output_types(typing.Tuple[bytes, bytes]) >> | "Print" >> beam.Map(print) >> ) >> >> if __name__ == "__main__": >> run() >> >> >> The command im using to execute the pipeline is this : >> >> python pipeline.py --runner=FlinkRunner --flink_master=localhost:8081 >> --environment_type=LOOPBACK --streaming >> >> I'm using apache beam 2.71 . This is the steps recommended by the >> documentation but i got this error : >> >> WARNING:root:Waiting for grpc channel to be ready at localhost:52001. >> WARNING:root:Waiting for grpc channel to be ready at localhost:39153. >> WARNING:root:Waiting for grpc channel to be ready at localhost:39153. >> WARNING:root:Waiting for grpc channel to be ready at localhost:39153. >> WARNING:root:Waiting for grpc channel to be ready at localhost:39153. >> ERROR:root:java.lang.reflect.InaccessibleObjectException: Unable to make >> field private final byte[] java.lang.String.value accessible: module >> java.base does not "opens java.lang" to unnamed module @a4114d5 >> Traceback (most recent call last): >> File "/home/juansebastian.romero/git-repos/trusted_data/pipeline.py", >> line 46, in <module> >> run() >> File "/home/juansebastian.romero/git-repos/trusted_data/pipeline.py", >> line 34, in run >> with Pipeline(options=pipeline_options) as p: >> File >> "/home/juansebastian.romero/git-repos/trusted_data/.env/lib/python3.11/site-packages/apache_beam/pipeline.py", >> line 601, in __exit__ >> self.result.wait_until_finish() >> File >> "/home/juansebastian.romero/git-repos/trusted_data/.env/lib/python3.11/site-packages/apache_beam/runners/portability/portable_runner.py", >> line 614, in wait_until_finish >> raise self._runtime_exception >> RuntimeError: Pipeline >> BeamApp-juansebastian0romero-0311221307-4bfaded2_9fc33fde-6e9b-4454-9c1f-b23109ed0983 >> failed in state FAILED: java.lang.reflect.InaccessibleObjectException: >> Unable to make field private final byte[] java.lang.String.value >> accessible: module java.base does not "opens java.lang" to unnamed module >> @a4114d5 >> >> I have tried many things like change the image to flink1.19-java11 , and >> put the properties >> >> env.java.opts.all: > >> --add-opens=java.base/java.lang=ALL-UNNAMED >> --add-opens=java.base/java.util=ALL-UNNAMED >> --add-opens=java.base/java.io=ALL-UNNAMED >> --add-opens=java.base/java.nio=ALL-UNNAMED. , in flink config , but i >> havent beee able to solve the problem. *Anyone have an example of >> kafka reading in apache beam 2.71 running over flink. ?* >> >
