[ 
https://issues.apache.org/jira/browse/SPARK-23739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16415093#comment-16415093
 ] 

Davide Isoardi commented on SPARK-23739:
----------------------------------------

We know that kafka client 0.8 has not this class, but the druid packet has.

Is it possible that this issue is cause by druid packet? In this case, can not 
you install druid and use structured streaming for get data by Kafka?

> Spark structured streaming long running problem
> -----------------------------------------------
>
>                 Key: SPARK-23739
>                 URL: https://issues.apache.org/jira/browse/SPARK-23739
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.0
>            Reporter: Florencio
>            Priority: Critical
>              Labels: spark, streaming, structured
>
> I had a problem with long running spark structured streaming in spark 2.1. 
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.kafka.common.requests.LeaveGroupResponse.
> The detailed error is the following:
> 18/03/16 16:10:57 INFO StreamExecution: Committed offsets for batch 2110. 
> Metadata OffsetSeqMetadata(0,1521216656590)
> 18/03/16 16:10:57 INFO KafkaSource: GetBatch called with start = 
> Some(\{"TopicName":{"2":5520197,"1":5521045,"3":5522054,"0":5527915}}), end = 
> \{"TopicName":{"2":5522730,"1":5523577,"3":5524586,"0":5530441}}
> 18/03/16 16:10:57 INFO KafkaSource: Partitions added: Map()
> 18/03/16 16:10:57 ERROR StreamExecution: Query [id = 
> a233b9ff-cc39-44d3-b953-a255986c04bf, runId = 
> 8520e3c0-2455-4ac1-9021-8518fb58b3f8] terminated with error
> java.util.zip.ZipException: invalid code lengths set
>  at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:164)
>  at java.io.FilterInputStream.read(FilterInputStream.java:133)
>  at java.io.FilterInputStream.read(FilterInputStream.java:107)
>  at 
> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:354)
>  at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:322)
>  at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:322)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1303)
>  at org.apache.spark.util.Utils$.copyStream(Utils.scala:362)
>  at 
> org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:45)
>  at 
> org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:83)
>  at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:173)
>  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>  at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)
>  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
>  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
>  at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>  at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>  at org.apache.spark.rdd.RDD.map(RDD.scala:369)
>  at org.apache.spark.sql.kafka010.KafkaSource.getBatch(KafkaSource.scala:287)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$6.apply(StreamExecution.scala:503)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$6.apply(StreamExecution.scala:499)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>  at 
> org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
>  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> 18/03/16 16:10:57 ERROR ClientUtils: Failed to close coordinator
> java.lang.NoClassDefFoundError: 
> org/apache/kafka/common/requests/LeaveGroupResponse
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.sendLeaveGroupRequest(AbstractCoordinator.java:575)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.maybeLeaveGroup(AbstractCoordinator.java:566)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:555)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:377)
>  at org.apache.kafka.clients.ClientUtils.closeQuietly(ClientUtils.java:66)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1383)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1364)
>  at org.apache.spark.sql.kafka010.KafkaSource.stop(KafkaSource.scala:311)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$stopSources$1.apply(StreamExecution.scala:574)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$stopSources$1.apply(StreamExecution.scala:572)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution.stopSources(StreamExecution.scala:572)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:325)
>  at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.kafka.common.requests.LeaveGroupResponse
>  at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>  ... 15 more
> 18/03/16 16:10:57 WARN StreamExecution: Failed to stop streaming source: 
> KafkaSource[Subscribe[TPusciteStazMinuto]]. Resources may have leaked.
> org.apache.kafka.common.KafkaException: Failed to close kafka consumer
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to