The problem is not related with Kafka broker. The problem is related when i
submit the job to apache flink

El El mié, 11 mar. 2026 a la(s) 5:29 p.m., Nathan Fisher <
[email protected]> escribió:

> You’re trying to access localhost which is bound to individual containers.
>
> Have you tried kafka-broker:9092 instead of localhost for the broker?
>
> On Wed, Mar 11, 2026 at 18:19 Juan Romero <[email protected]> wrote:
>
>> Hi!! I have the following services definition in docker compose file :
>> version: "3"
>>
>>
>> services:
>> kafka-broker:
>> image: apache/kafka:latest
>> container_name: broker
>> environment:
>> KAFKA_NODE_ID: 1
>> KAFKA_PROCESS_ROLES: broker,controller
>> KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
>> KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
>> KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
>> KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
>> CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
>> KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
>> KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
>> KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
>> KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
>> KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
>> KAFKA_NUM_PARTITIONS: 3
>> ports:
>> - "9092:9092"
>>
>>
>> jobmanager:
>> image: flink:1.19.0-java17
>> depends_on:
>> - kafka-broker
>> ports:
>> - "8081:8081"
>> command: jobmanager
>> environment:
>> - |
>> FLINK_PROPERTIES=
>> jobmanager.rpc.address: jobmanager
>> taskmanager:
>> image: flink:1.19.0-java17
>> depends_on:
>> - jobmanager
>> - kafka-broker
>> command: taskmanager
>> scale: 1
>> environment:
>> - |
>> FLINK_PROPERTIES=
>> jobmanager.rpc.address: jobmanager
>> taskmanager.numberOfTaskSlots: 2
>>
>> The pipeline definition is this one:
>>
>> import json
>> import typing
>> import argparse
>> import apache_beam as beam
>> from apache_beam.io.kafka import ReadFromKafka
>> from apache_beam import Pipeline
>> from apache_beam.options.pipeline_options import PipelineOptions
>>
>>
>>
>> def format_message(element) :
>> key = 'test-key'
>> return (key.encode('utf8'), json.dumps(element).encode('utf8'))
>>
>> def run():
>> parser = argparse.ArgumentParser()
>> args, beam_args = parser.parse_known_args()
>> print(beam_args)
>> pipeline_options = PipelineOptions(
>> beam_args,
>> streaming=True,
>> save_main_session=True
>> )
>> consumer_config = {
>> 'bootstrap.servers': "localhost:9092",
>> 'group.id': 'consumer_id', # Consumer group ID
>> 'auto.offset.reset': 'earliest' # Start reading from the beginning if no
>> offset is committed,
>> }
>>
>> with Pipeline(options=pipeline_options) as p:
>> kafka_messages = p | "ReadFromKafka" >> ReadFromKafka(
>> consumer_config=consumer_config,
>> topics=['test-topic']
>> )
>> (kafka_messages | "Prepare " >> beam.Map(format_message
>> ).with_output_types(typing.Tuple[bytes, bytes])
>> | "Print" >> beam.Map(print)
>> )
>>
>> if __name__ == "__main__":
>> run()
>>
>>
>> The command im using to execute the pipeline is this :
>>
>> python pipeline.py --runner=FlinkRunner --flink_master=localhost:8081
>> --environment_type=LOOPBACK --streaming
>>
>> I'm using apache beam 2.71 . This is the steps recommended by the
>> documentation but i got this error :
>>
>> WARNING:root:Waiting for grpc channel to be ready at localhost:52001.
>> WARNING:root:Waiting for grpc channel to be ready at localhost:39153.
>> WARNING:root:Waiting for grpc channel to be ready at localhost:39153.
>> WARNING:root:Waiting for grpc channel to be ready at localhost:39153.
>> WARNING:root:Waiting for grpc channel to be ready at localhost:39153.
>> ERROR:root:java.lang.reflect.InaccessibleObjectException: Unable to make
>> field private final byte[] java.lang.String.value accessible: module
>> java.base does not "opens java.lang" to unnamed module @a4114d5
>> Traceback (most recent call last):
>>   File "/home/juansebastian.romero/git-repos/trusted_data/pipeline.py",
>> line 46, in <module>
>>     run()
>>   File "/home/juansebastian.romero/git-repos/trusted_data/pipeline.py",
>> line 34, in run
>>     with Pipeline(options=pipeline_options) as p:
>>   File
>> "/home/juansebastian.romero/git-repos/trusted_data/.env/lib/python3.11/site-packages/apache_beam/pipeline.py",
>> line 601, in __exit__
>>     self.result.wait_until_finish()
>>   File
>> "/home/juansebastian.romero/git-repos/trusted_data/.env/lib/python3.11/site-packages/apache_beam/runners/portability/portable_runner.py",
>> line 614, in wait_until_finish
>>     raise self._runtime_exception
>> RuntimeError: Pipeline
>> BeamApp-juansebastian0romero-0311221307-4bfaded2_9fc33fde-6e9b-4454-9c1f-b23109ed0983
>> failed in state FAILED: java.lang.reflect.InaccessibleObjectException:
>> Unable to make field private final byte[] java.lang.String.value
>> accessible: module java.base does not "opens java.lang" to unnamed module
>> @a4114d5
>>
>> I have tried many things like change the image to flink1.19-java11 , and
>> put the properties
>>
>> env.java.opts.all: >
>> --add-opens=java.base/java.lang=ALL-UNNAMED
>> --add-opens=java.base/java.util=ALL-UNNAMED
>> --add-opens=java.base/java.io=ALL-UNNAMED
>> --add-opens=java.base/java.nio=ALL-UNNAMED. , in flink config , but i
>> havent beee able to solve the problem. *Anyone have an example of
>> kafka reading in apache beam 2.71 running over flink. ?*
>>
>

Reply via email to