Thank you for quick answers, Utkarsh, but unfortunately, I don’t see the real 
cause of this right now. Seems like, it will require some remote debugging on 
your site to see what workers are actually doing.


> On 1 Feb 2022, at 22:59, Utkarsh Parekh <utkarsh.s.par...@gmail.com> wrote:
> 
> If you tested earlier with the same stack, which version did you use?
> 
> Can you enable debug logs to check what’s happening there? So far the 
> following warning was received from from log4j which I received from log4j on 
> Databricks (no errors other than that).
> 
> Can you make sure that there is no issue with firewall or something? No I 
> don't think so. Because it's working fine locally and databricks notebook.
> 
> Can you run this pipeline locally against a real Kafka server, not Azure 
> Event Hub, to make sure that it works fine? Yes it's working fine with both 
> Azure EventHub and Kafka
> 
> 
> org.springframework.core.convert.support.DefaultConversionService.getSharedInstance()'
> at 
> org.springframework.expression.spel.support.StandardTypeConverter.<init>(StandardTypeConverter.java:46)
> at 
> org.springframework.expression.spel.support.StandardEvaluationContext.getTypeConverter(StandardEvaluationContext.java:197)
> at 
> org.springframework.expression.spel.support.ReflectiveMethodResolver.resolve(ReflectiveMethodResolver.java:115)
> at 
> org.springframework.expression.spel.ast.MethodReference.findAccessorForMethod(MethodReference.java:201)
> at 
> org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:130)
> at 
> org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:52)
> at 
> org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:377)
> at 
> org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88)
> at 
> org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:121)
> at 
> org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:262)
> at 
> org.apache.beam.sdk.io.kafka.ConsumerSpEL.evaluateAssign(ConsumerSpEL.java:124)
> at 
> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:85)
> at 
> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.startIfNeeded(MicrobatchSource.java:207)
> at 
> org.apache.beam.runners.spark.io.MicrobatchSource$Reader.start(MicrobatchSource.java:227)
> at 
> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:172)
> at 
> org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:111)
> at 
> org.apache.spark.streaming.StateSpec$.$anonfun$function$1(StateSpec.scala:181)
> at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.$anonfun$updateRecordWithData$3(MapWithStateRDD.scala:57)
> at scala.collection.Iterator.foreach(Iterator.scala:943)
> at scala.collection.Iterator.foreach$(Iterator.scala:943)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> at 
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
> at 
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
> at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:393)
> at 
> org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1486)
> at org.apache.spark.storage.BlockManager.org 
> <http://org.apache.spark.storage.blockmanager.org/>$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1413)
> at 
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1477)
> at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1296)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:391)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:342)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
> at 
> org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
> at 
> com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
> at 
> org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
> at 
> com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
> at org.apache.spark.scheduler.Task.doRunTask(Task.scala:153)
> at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:122)
> at 
> com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
> at org.apache.spark.scheduler.Task.run(Task.scala:93)
> at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:824)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1621)
> at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:827)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at 
> com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:683)
> at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829)
> 
> On Tue, Feb 1, 2022 at 12:07 PM Alexey Romanenko <aromanenko....@gmail.com 
> <mailto:aromanenko....@gmail.com>> wrote:
> Well, personally I didn’t test with this version, but it should be fine… 
> Can you enable debug logs to check what’s happening there? 
> Can you make sure that there is no issue with firewall or something? 
> Can you run this pipeline locally against a real Kafka server, not Azure 
> Event Hub, to make sure that it works fine?
> Otherwise, it would need to debug remotely the worker process.
> 
>> On 1 Feb 2022, at 19:18, Utkarsh Parekh <utkarsh.s.par...@gmail.com 
>> <mailto:utkarsh.s.par...@gmail.com>> wrote:
>> 
>> Sorry I sent the last message in a hurry. Here is the Beam java to kafka: Is 
>> something missing here?
>> 
>> <dependency>
>>     <groupId>org.apache.beam</groupId>
>>     <artifactId>beam-sdks-java-io-kafka</artifactId>
>>     <version>2.35.0</version>
>> </dependency>
>> 
>> On Tue, Feb 1, 2022 at 9:01 AM Utkarsh Parekh <utkarsh.s.par...@gmail.com 
>> <mailto:utkarsh.s.par...@gmail.com>> wrote:
>> Here it is 
>> 
>> <dependency>
>>     <groupId>org.apache.kafka</groupId>
>>     <artifactId>kafka-clients</artifactId>
>>     <version>2.8.0</version>
>> </dependency>
>> 
>> On Tue, Feb 1, 2022 at 8:53 AM Alexey Romanenko <aromanenko....@gmail.com 
>> <mailto:aromanenko....@gmail.com>> wrote:
>> Hmm, this is strange. Which version of Kafka client do you use while running 
>> it with Beam?
>> 
>>> On 1 Feb 2022, at 16:56, Utkarsh Parekh <utkarsh.s.par...@gmail.com 
>>> <mailto:utkarsh.s.par...@gmail.com>> wrote:
>>> 
>>> Hi Alexey, 
>>> 
>>> First of all, thank you for the response! Yes I did have it in Consumer 
>>> configuration and try to increase "session.timeout".
>>> 
>>> From consumer side so far I've following settings:
>>> props.put("sasl.mechanism", SASL_MECHANISM);
>>> props.put("security.protocol", SECURITY_PROTOCOL);
>>> props.put("sasl.jaas.config", saslJaasConfig);
>>> props.put("request.timeout.ms <http://request.timeout.ms/>", 60000);
>>> props.put("session.timeout.ms <http://session.timeout.ms/>", 60000);
>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
>>> AUTO_OFFSET_RESET_CONFIG);
>>> props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
>>> 
>>> It works fine using following code in Databricks Notebook. The problem has 
>>> been occurring when I run it through Apache beam and KafkaIO (Just 
>>> providing more context if that may help you to understand problem)
>>> 
>>> val df = spark.readStream
>>>     .format("kafka")
>>>     .option("subscribe", TOPIC)
>>>     .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
>>>     .option("kafka.sasl.mechanism", "PLAIN")
>>>     .option("kafka.security.protocol", "SASL_SSL")
>>>     .option("kafka.sasl.jaas.config", EH_SASL)
>>>     .option("kafka.request.timeout.ms <http://kafka.request.timeout.ms/>", 
>>> "60000")
>>>     .option("kafka.session.timeout.ms <http://kafka.session.timeout.ms/>", 
>>> "60000")
>>>     .option("failOnDataLoss", "false")
>>> //.option("kafka.group.id <http://kafka.group.id/>", "testsink")
>>> .option("startingOffsets", "latest")
>>>     .load()
>>> 
>>> Utkarsh
>>> 
>>> On Tue, Feb 1, 2022 at 6:20 AM Alexey Romanenko <aromanenko....@gmail.com 
>>> <mailto:aromanenko....@gmail.com>> wrote:
>>> Hi Utkarsh,
>>> 
>>> Can it be related to this configuration problem?
>>> https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received
>>>  
>>> <https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received>
>>> 
>>> Did you check timeout settings?
>>> 
>>> —
>>> Alexey      
>>> 
>>> 
>>>> On 1 Feb 2022, at 02:27, Utkarsh Parekh <utkarsh.s.par...@gmail.com 
>>>> <mailto:utkarsh.s.par...@gmail.com>> wrote:
>>>> 
>>>> Hello,
>>>> 
>>>> I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm 
>>>> trying to create a simple streaming app with Apache Beam, where it reads 
>>>> data from an Azure event hub and produces messages into another Azure 
>>>> event hub. 
>>>> 
>>>> I'm creating and running spark jobs on Azure Databricks.
>>>> 
>>>> The problem is the consumer (uses SparkRunner) is not able to read data 
>>>> from Event hub (queue). There is no activity and no errors on the Spark 
>>>> cluster.
>>>> 
>>>> I would appreciate it if anyone could help to fix this issue.
>>>> 
>>>> Thank you
>>>> 
>>>> Utkarsh
>>> 
>> 
> 

Reply via email to