And I also get this error occasionally when I execute a streaming pipeline with a new cluster instead of an existing cluster.
https://issues.apache.org/jira/browse/BEAM-12032?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel On Tue, Feb 1, 2022 at 1:59 PM Utkarsh Parekh <utkarsh.s.par...@gmail.com> wrote: > If you tested earlier with the same stack, which version did you use? > > *Can you enable debug logs to check what’s happening there? *So far the > following warning was received from from log4j which I received from log4j > on Databricks (no errors other than that). > > *Can you make sure that there is no issue with firewall or something? *No > I don't think so. Because it's working fine locally and databricks notebook. > > *Can you run this pipeline locally against a real Kafka server, not Azure > Event Hub, to make sure that it works fine? *Yes it's working fine with > both Azure EventHub and Kafka > > > > org.springframework.core.convert.support.DefaultConversionService.getSharedInstance()' > at > org.springframework.expression.spel.support.StandardTypeConverter.<init>(StandardTypeConverter.java:46) > at > org.springframework.expression.spel.support.StandardEvaluationContext.getTypeConverter(StandardEvaluationContext.java:197) > at > org.springframework.expression.spel.support.ReflectiveMethodResolver.resolve(ReflectiveMethodResolver.java:115) > at > org.springframework.expression.spel.ast.MethodReference.findAccessorForMethod(MethodReference.java:201) > at > org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:130) > at > org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:52) > at > org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:377) > at > org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88) > at > org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:121) > at > org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:262) > at > org.apache.beam.sdk.io.kafka.ConsumerSpEL.evaluateAssign(ConsumerSpEL.java:124) > at > org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:85) > at > org.apache.beam.runners.spark.io.MicrobatchSource$Reader.startIfNeeded(MicrobatchSource.java:207) > at > org.apache.beam.runners.spark.io.MicrobatchSource$Reader.start(MicrobatchSource.java:227) > at > org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:172) > at > org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:111) > at > org.apache.spark.streaming.StateSpec$.$anonfun$function$1(StateSpec.scala:181) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.$anonfun$updateRecordWithData$3(MapWithStateRDD.scala:57) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at > org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55) > at > org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:159) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380) > at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:393) > at > org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1486) > at org.apache.spark.storage.BlockManager.org > $apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1413) > at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1477) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1296) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:391) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:342) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:344) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:344) > at > org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75) > at > com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) > at > org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75) > at > com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55) > at org.apache.spark.scheduler.Task.doRunTask(Task.scala:153) > at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:122) > at > com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) > at org.apache.spark.scheduler.Task.run(Task.scala:93) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:824) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1621) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:827) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > at > com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:683) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829) > > On Tue, Feb 1, 2022 at 12:07 PM Alexey Romanenko <aromanenko....@gmail.com> > wrote: > >> Well, personally I didn’t test with this version, but it should be fine… >> Can you enable debug logs to check what’s happening there? >> Can you make sure that there is no issue with firewall or something? >> Can you run this pipeline locally against a real Kafka server, not Azure >> Event Hub, to make sure that it works fine? >> Otherwise, it would need to debug remotely the worker process. >> >> On 1 Feb 2022, at 19:18, Utkarsh Parekh <utkarsh.s.par...@gmail.com> >> wrote: >> >> Sorry I sent the last message in a hurry. Here is the Beam java to kafka: >> Is something missing here? >> >> <dependency> >> <groupId>org.apache.beam</groupId> >> <artifactId>beam-sdks-java-io-kafka</artifactId> >> <version>2.35.0</version> >> </dependency> >> >> >> On Tue, Feb 1, 2022 at 9:01 AM Utkarsh Parekh <utkarsh.s.par...@gmail.com> >> wrote: >> >>> Here it is >>> >>> <dependency> >>> <groupId>org.apache.kafka</groupId> >>> <artifactId>kafka-clients</artifactId> >>> <version>2.8.0</version> >>> </dependency> >>> >>> >>> On Tue, Feb 1, 2022 at 8:53 AM Alexey Romanenko < >>> aromanenko....@gmail.com> wrote: >>> >>>> Hmm, this is strange. Which version of Kafka client do you use while >>>> running it with Beam? >>>> >>>> On 1 Feb 2022, at 16:56, Utkarsh Parekh <utkarsh.s.par...@gmail.com> >>>> wrote: >>>> >>>> Hi Alexey, >>>> >>>> First of all, thank you for the response! Yes I did have it in Consumer >>>> configuration and try to increase "session.timeout". >>>> >>>> From consumer side so far I've following settings: >>>> >>>> props.put("sasl.mechanism", SASL_MECHANISM); >>>> props.put("security.protocol", SECURITY_PROTOCOL); >>>> props.put("sasl.jaas.config", saslJaasConfig); >>>> props.put("request.timeout.ms", 60000); >>>> props.put("session.timeout.ms", 60000); >>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, >>>> AUTO_OFFSET_RESET_CONFIG); >>>> props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup); >>>> >>>> >>>> It works fine using following code in Databricks Notebook. The problem >>>> has been occurring when I run it through Apache beam and KafkaIO (Just >>>> providing more context if that may help you to understand problem) >>>> >>>> val df = spark.readStream >>>> .format("kafka") >>>> .option("subscribe", TOPIC) >>>> .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) >>>> .option("kafka.sasl.mechanism", "PLAIN") >>>> .option("kafka.security.protocol", "SASL_SSL") >>>> .option("kafka.sasl.jaas.config", EH_SASL) >>>> .option("kafka.request.timeout.ms", "60000") >>>> .option("kafka.session.timeout.ms", "60000") >>>> .option("failOnDataLoss", "false") >>>> //.option("kafka.group.id", "testsink") >>>> .option("startingOffsets", "latest") >>>> .load() >>>> >>>> Utkarsh >>>> >>>> On Tue, Feb 1, 2022 at 6:20 AM Alexey Romanenko < >>>> aromanenko....@gmail.com> wrote: >>>> >>>>> Hi Utkarsh, >>>>> >>>>> Can it be related to this configuration problem? >>>>> >>>>> https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide#no-records-received >>>>> >>>>> Did you check timeout settings? >>>>> >>>>> — >>>>> Alexey >>>>> >>>>> >>>>> On 1 Feb 2022, at 02:27, Utkarsh Parekh <utkarsh.s.par...@gmail.com> >>>>> wrote: >>>>> >>>>> Hello, >>>>> >>>>> I'm doing POC with KafkaIO and spark runner on Azure Databricks. I'm >>>>> trying to create a simple streaming app with Apache Beam, where it reads >>>>> data from an Azure event hub and produces messages into another Azure >>>>> event >>>>> hub. >>>>> >>>>> I'm creating and running spark jobs on Azure Databricks. >>>>> >>>>> The problem is the consumer (uses SparkRunner) is not able to read >>>>> data from Event hub (queue). There is no activity and no errors on the >>>>> Spark cluster. >>>>> >>>>> I would appreciate it if anyone could help to fix this issue. >>>>> >>>>> Thank you >>>>> >>>>> Utkarsh >>>>> >>>>> >>>>> >>>> >>