Sorry I sent the last message in a hurry. Here is the Beam java to kafka:
Is something missing here?

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-kafka</artifactId>
    <version>2.35.0</version>
</dependency>


On Tue, Feb 1, 2022 at 9:01 AM Utkarsh Parekh <utkarsh.s.par...@gmail.com>
wrote:

> Here it is
>
> <dependency>
>     <groupId>org.apache.kafka</groupId>
>     <artifactId>kafka-clients</artifactId>
>     <version>2.8.0</version>
> </dependency>
>
>
> On Tue, Feb 1, 2022 at 8:53 AM Alexey Romanenko <aromanenko....@gmail.com>
> wrote:
>
>> Hmm, this is strange. Which version of Kafka client do you use while
>> running it with Beam?
>>
>> On 1 Feb 2022, at 16:56, Utkarsh Parekh <utkarsh.s.par...@gmail.com>
>> wrote:
>>
>> Hi Alexey,
>>
>> First of all, thank you for the response! Yes I did have it in Consumer
>> configuration and try to increase "session.timeout".
>>
>> From consumer side so far I've following settings:
>>
>> props.put("sasl.mechanism", SASL_MECHANISM);
>> props.put("security.protocol", SECURITY_PROTOCOL);
>> props.put("sasl.jaas.config", saslJaasConfig);
>> props.put("request.timeout.ms", 60000);
>> props.put("session.timeout.ms", 60000);
>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, AUTO_OFFSET_RESET_CONFIG);
>> props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
>>
>>
>> It works fine using following code in Databricks Notebook. The problem
>> has been occurring when I run it through Apache beam and KafkaIO (Just
>> providing more context if that may help you to understand problem)
>>
>> val df = spark.readStream
>>     .format("kafka")
>>     .option("subscribe", TOPIC)
>>     .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
>>     .option("kafka.sasl.mechanism", "PLAIN")
>>     .option("kafka.security.protocol", "SASL_SSL")
>>     .option("kafka.sasl.jaas.config", EH_SASL)
>>     .option("kafka.request.timeout.ms", "60000")
>>     .option("kafka.session.timeout.ms", "60000")
>>     .option("failOnDataLoss", "false")
>> //.option("kafka.group.id", "testsink")
>> .option("startingOffsets", "latest")
>>     .load()
>>
>> Utkarsh
>>
>> On Tue, Feb 1, 2022 at 6:20 AM Alexey Romanenko <aromanenko....@gmail.com>
>> wrote:
>>
>>> Hi Utkarsh,
>>>
>>> Can it be related to this configuration problem?
>>>
>>> https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received
>>>
>>> Did you check timeout settings?
>>>
>>> —
>>> Alexey
>>>
>>>
>>> On 1 Feb 2022, at 02:27, Utkarsh Parekh <utkarsh.s.par...@gmail.com>
>>> wrote:
>>>
>>> Hello,
>>>
>>> I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm
>>> trying to create a simple streaming app with Apache Beam, where it reads
>>> data from an Azure event hub and produces messages into another Azure event
>>> hub.
>>>
>>> I'm creating and running spark jobs on Azure Databricks.
>>>
>>> The problem is the consumer (uses SparkRunner) is not able to read data
>>> from Event hub (queue). There is no activity and no errors on the Spark
>>> cluster.
>>>
>>> I would appreciate it if anyone could help to fix this issue.
>>>
>>> Thank you
>>>
>>> Utkarsh
>>>
>>>
>>>
>>

Reply via email to