Hi,

Please find below the code I have been using to consume a Kafka Stream that is 
hosted on confluent. It returns an error regarding the jar files. Please find 
the error below the code snippet. Let me know what I am doing wrong. I am 
running this on Docker with Flink Version: 1.7.1.

Code:

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import FlinkKafkaConsumer
from pyflink.datastream.formats.json import JsonRowDeserializationSchema
import glob
import os
import sys
import logging

# Set up the execution environment
env = StreamExecutionEnvironment.get_execution_environment()

logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

# the sql connector for kafka is used here as it's a fat jar and could avoid 
dependency issues
env.add_jars("file://flink-sql-connector-kafka-1.17.1.jar")
env.add_classpaths("file://flink-sql-connector-kafka-1.17.1.jar")
env.add_jars("file://flink-connector-kafka_2.11-1.9.2-javadoc.jar")
env.add_classpaths("file://flink-connector-kafka_2.11-1.9.2-javadoc.jar")

# Set up the Confluent Cloud Kafka configuration
kafka_config = {
    'bootstrap.servers': 'bootstrap-server',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanism': 'PLAIN',
    'sasl.jaas.config': 
'org.apache.kafka.common.security.plain.PlainLoginModule required 
username="API_KEY" password="API_SECRET";'
}

topic = 'TOPIC_NAME'

deserialization_schema = JsonRowDeserializationSchema.Builder() \
        .type_info(Types.ROW([Types.INT(), Types.STRING()])) \
        .build()

# Set up the Kafka consumer properties
consumer_props = {
    'bootstrap.servers': kafka_config['bootstrap.servers'],
    'security.protocol': kafka_config['security.protocol'],
    'sasl.mechanism': kafka_config['sasl.mechanism'],
    'sasl.jaas.config': kafka_config['sasl.jaas.config'],
    'group.id': 'python-group-1'
}

# Create a Kafka consumer
kafka_consumer = FlinkKafkaConsumer(
    topics = topic,  # Kafka topic
    deserialization_schema = deserialization_schema,
    properties = consumer_props,  # Consumer properties
)
kafka_consumer.set_start_from_earliest()
# Add the Kafka consumer as a source to the execution environment
stream = env.add_source(kafka_consumer)

# Define your data processing logic here
# For example, you can print the stream to the console
stream.print()

# Execute the job
env.execute()

Error:

Traceback (most recent call last):
  File "/home/pyflink/test.py", line 45, in <module>
    kafka_consumer = FlinkKafkaConsumer(
  File 
"/usr/local/lib/python3.10/dist-packages/pyflink/datastream/connectors/kafka.py",
 line 203, in __init__
    j_flink_kafka_consumer = _get_kafka_consumer(topics, properties, 
deserialization_schema,
  File 
"/usr/local/lib/python3.10/dist-packages/pyflink/datastream/connectors/kafka.py",
 line 161, in _get_kafka_consumer
    j_flink_kafka_consumer = j_consumer_clz(topics,
  File "/usr/local/lib/python3.10/dist-packages/pyflink/util/exceptions.py", 
line 185, in wrapped_call
    raise TypeError(
TypeError: Could not found the Java class 
'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'. The Java 
dependencies could be specified via command line argument '--jarfile' or the 
config option 'pipeline.jars'



Cheers & Regards,
Ruthvik Kadiyala



Reply via email to