Hi!! I have the following services definition in docker compose file :
version: "3"


services:
kafka-broker:
image: apache/kafka:latest
container_name: broker
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 3
ports:
- "9092:9092"


jobmanager:
image: flink:1.19.0-java17
depends_on:
- kafka-broker
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager:
image: flink:1.19.0-java17
depends_on:
- jobmanager
- kafka-broker
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2

The pipeline definition is this one:

import json
import typing
import argparse
import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka
from apache_beam import Pipeline
from apache_beam.options.pipeline_options import PipelineOptions



def format_message(element) :
key = 'test-key'
return (key.encode('utf8'), json.dumps(element).encode('utf8'))

def run():
parser = argparse.ArgumentParser()
args, beam_args = parser.parse_known_args()
print(beam_args)
pipeline_options = PipelineOptions(
beam_args,
streaming=True,
save_main_session=True
)
consumer_config = {
'bootstrap.servers': "localhost:9092",
'group.id': 'consumer_id', # Consumer group ID
'auto.offset.reset': 'earliest' # Start reading from the beginning if no
offset is committed,
}

with Pipeline(options=pipeline_options) as p:
kafka_messages = p | "ReadFromKafka" >> ReadFromKafka(
consumer_config=consumer_config,
topics=['test-topic']
)
(kafka_messages | "Prepare " >> beam.Map(format_message).with_output_types(
typing.Tuple[bytes, bytes])
| "Print" >> beam.Map(print)
)

if __name__ == "__main__":
run()


The command im using to execute the pipeline is this :

python pipeline.py --runner=FlinkRunner --flink_master=localhost:8081
--environment_type=LOOPBACK --streaming

I'm using apache beam 2.71 . This is the steps recommended by the
documentation but i got this error :

WARNING:root:Waiting for grpc channel to be ready at localhost:52001.
WARNING:root:Waiting for grpc channel to be ready at localhost:39153.
WARNING:root:Waiting for grpc channel to be ready at localhost:39153.
WARNING:root:Waiting for grpc channel to be ready at localhost:39153.
WARNING:root:Waiting for grpc channel to be ready at localhost:39153.
ERROR:root:java.lang.reflect.InaccessibleObjectException: Unable to make
field private final byte[] java.lang.String.value accessible: module
java.base does not "opens java.lang" to unnamed module @a4114d5
Traceback (most recent call last):
  File "/home/juansebastian.romero/git-repos/trusted_data/pipeline.py",
line 46, in <module>
    run()
  File "/home/juansebastian.romero/git-repos/trusted_data/pipeline.py",
line 34, in run
    with Pipeline(options=pipeline_options) as p:
  File
"/home/juansebastian.romero/git-repos/trusted_data/.env/lib/python3.11/site-packages/apache_beam/pipeline.py",
line 601, in __exit__
    self.result.wait_until_finish()
  File
"/home/juansebastian.romero/git-repos/trusted_data/.env/lib/python3.11/site-packages/apache_beam/runners/portability/portable_runner.py",
line 614, in wait_until_finish
    raise self._runtime_exception
RuntimeError: Pipeline
BeamApp-juansebastian0romero-0311221307-4bfaded2_9fc33fde-6e9b-4454-9c1f-b23109ed0983
failed in state FAILED: java.lang.reflect.InaccessibleObjectException:
Unable to make field private final byte[] java.lang.String.value
accessible: module java.base does not "opens java.lang" to unnamed module
@a4114d5

I have tried many things like change the image to flink1.19-java11 , and
put the properties

env.java.opts.all: >
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED. , in flink config , but i
havent beee able to solve the problem. *Anyone have an example of
kafka reading in apache beam 2.71 running over flink. ?*

Reply via email to