[ https://issues.apache.org/jira/browse/SPARK-21561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen resolved SPARK-21561. ------------------------------- Resolution: Invalid This isn't a place to ask for input on your code -- you'd have to show a reproducible bug here that you've narrowed down > spark-streaming-kafka-010 DSteam is not pulling anything from Kafka > ------------------------------------------------------------------- > > Key: SPARK-21561 > URL: https://issues.apache.org/jira/browse/SPARK-21561 > Project: Spark > Issue Type: Bug > Components: DStreams > Affects Versions: 2.1.1 > Reporter: Vlad Badelita > Labels: kafka-0.10, spark-streaming > > I am trying to use spark-streaming-kafka-0.10 to pull messages from a kafka > topic(broker version 0.10). I have checked that messages are being produced > and used a KafkaConsumer to pull them successfully. Now, when I try to use > the spark streaming api, I am not getting anything. If I just use > KafkaUtils.createRDD and specify some offset ranges manually it works. But > when, I try to use createDirectStream, all the rdds are empty and when I > check the partition offsets it simply reports that all partitions are 0. Here > is what I tried: > {code:scala} > val sparkConf = new SparkConf().setAppName("kafkastream") > val ssc = new StreamingContext(sparkConf, Seconds(3)) > val topics = Array("my_topic") > val kafkaParams = Map[String, Object]( > "bootstrap.servers" -> "hostname:6667" > "key.deserializer" -> classOf[StringDeserializer], > "value.deserializer" -> classOf[StringDeserializer], > "group.id" -> "my_group", > "auto.offset.reset" -> "earliest", > "enable.auto.commit" -> (true: java.lang.Boolean) > ) > val stream = KafkaUtils.createDirectStream[String, String]( > ssc, > PreferConsistent, > Subscribe[String, String](topics, kafkaParams) > ) > stream.foreachRDD { rdd => > val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > rdd.foreachPartition { iter => > val o: OffsetRange = offsetRanges(TaskContext.get.partitionId) > println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") > } > val rddCount = rdd.count() > println("rdd count: ", rddCount) > // stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) > } > ssc.start() > ssc.awaitTermination() > {code} > All partitions show offset ranges from 0 to 0 and all rdds are empty. I would > like it to start from the beginning of a partition but also pick up > everything that is being produced to it. > I have also tried using spark-streaming-kafka-0.8 and it does work. I think > it is a 0.10 issue because everything else works fine. Thank you! -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org