Hi, Please find below the code I have been using to consume a Kafka Stream that is hosted on confluent. It returns an error regarding the jar files. Please find the error below the code snippet. Let me know what I am doing wrong. I am running this on Docker with Flink Version: 1.7.1.
Code: from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer from pyflink.datastream.formats.json import JsonRowDeserializationSchema import glob import os import sys import logging # Set up the execution environment env = StreamExecutionEnvironment.get_execution_environment() logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") # the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues env.add_jars("file://flink-sql-connector-kafka-1.17.1.jar") env.add_classpaths("file://flink-sql-connector-kafka-1.17.1.jar") env.add_jars("file://flink-connector-kafka_2.11-1.9.2-javadoc.jar") env.add_classpaths("file://flink-connector-kafka_2.11-1.9.2-javadoc.jar") # Set up the Confluent Cloud Kafka configuration kafka_config = { 'bootstrap.servers': 'bootstrap-server', 'security.protocol': 'SASL_SSL', 'sasl.mechanism': 'PLAIN', 'sasl.jaas.config': 'org.apache.kafka.common.security.plain.PlainLoginModule required username="API_KEY" password="API_SECRET";' } topic = 'TOPIC_NAME' deserialization_schema = JsonRowDeserializationSchema.Builder() \ .type_info(Types.ROW([Types.INT(), Types.STRING()])) \ .build() # Set up the Kafka consumer properties consumer_props = { 'bootstrap.servers': kafka_config['bootstrap.servers'], 'security.protocol': kafka_config['security.protocol'], 'sasl.mechanism': kafka_config['sasl.mechanism'], 'sasl.jaas.config': kafka_config['sasl.jaas.config'], 'group.id': 'python-group-1' } # Create a Kafka consumer kafka_consumer = FlinkKafkaConsumer( topics = topic, # Kafka topic deserialization_schema = deserialization_schema, properties = consumer_props, # Consumer properties ) kafka_consumer.set_start_from_earliest() # Add the Kafka consumer as a source to the execution environment stream = env.add_source(kafka_consumer) # Define your data processing logic here # For example, you can print the stream to the console stream.print() # Execute the job env.execute() Error: Traceback (most recent call last): File "/home/pyflink/test.py", line 45, in <module> kafka_consumer = FlinkKafkaConsumer( File "/usr/local/lib/python3.10/dist-packages/pyflink/datastream/connectors/kafka.py", line 203, in __init__ j_flink_kafka_consumer = _get_kafka_consumer(topics, properties, deserialization_schema, File "/usr/local/lib/python3.10/dist-packages/pyflink/datastream/connectors/kafka.py", line 161, in _get_kafka_consumer j_flink_kafka_consumer = j_consumer_clz(topics, File "/usr/local/lib/python3.10/dist-packages/pyflink/util/exceptions.py", line 185, in wrapped_call raise TypeError( TypeError: Could not found the Java class 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'. The Java dependencies could be specified via command line argument '--jarfile' or the config option 'pipeline.jars' Cheers & Regards, Ruthvik Kadiyala