[jira] [Commented] (SPARK-21561) spark-streaming-kafka-010 DSteam is not pulling anything from Kafka

2017-07-31 Thread Vlad Badelita (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16107324#comment-16107324
 ] 

Vlad Badelita commented on SPARK-21561:
---

Sorry, didn't know where to ask about this issues. I have since found out it 
was an actual bug, that someone else experienced:
https://issues.apache.org/jira/browse/SPARK-18779

However, it was a kafka bug, not a spark-streaming one and was solved here:
https://issues.apache.org/jira/browse/KAFKA-4547

And it changing the kafka-clients version to 0.10.2.1 fixed it for me.

> spark-streaming-kafka-010 DSteam is not pulling anything from Kafka
> ---
>
> Key: SPARK-21561
> URL: https://issues.apache.org/jira/browse/SPARK-21561
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.1.1
>Reporter: Vlad Badelita
>  Labels: kafka-0.10, spark-streaming
>
> I am trying to use spark-streaming-kafka-0.10 to pull messages from a kafka 
> topic(broker version 0.10). I have checked that messages are being produced 
> and used a KafkaConsumer to pull them successfully. Now, when I try to use 
> the spark streaming api, I am not getting anything. If I just use 
> KafkaUtils.createRDD and specify some offset ranges manually it works. But 
> when, I try to use createDirectStream, all the rdds are empty and when I 
> check the partition offsets it simply reports that all partitions are 0. Here 
> is what I tried:
> {code:scala}
>  val sparkConf = new SparkConf().setAppName("kafkastream")
>  val ssc = new StreamingContext(sparkConf, Seconds(3))
>  val topics = Array("my_topic")
>  val kafkaParams = Map[String, Object](
>"bootstrap.servers" -> "hostname:6667"
>"key.deserializer" -> classOf[StringDeserializer],
>"value.deserializer" -> classOf[StringDeserializer],
>"group.id" -> "my_group",
>"auto.offset.reset" -> "earliest",
>"enable.auto.commit" -> (true: java.lang.Boolean)
>  )
>  val stream = KafkaUtils.createDirectStream[String, String](
>ssc,
>PreferConsistent,
>Subscribe[String, String](topics, kafkaParams)
>  )
>  stream.foreachRDD { rdd =>
>val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>rdd.foreachPartition { iter =>
>  val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
>  println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
>}
>val rddCount = rdd.count()
>println("rdd count: ", rddCount)
>// stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
>  }
>  ssc.start()
>  ssc.awaitTermination()
> {code}
> All partitions show offset ranges from 0 to 0 and all rdds are empty. I would 
> like it to start from the beginning of a partition but also pick up 
> everything that is being produced to it.
> I have also tried using spark-streaming-kafka-0.8 and it does work. I think 
> it is a 0.10 issue because everything else works fine. Thank you!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21561) spark-streaming-kafka-010 DSteam is not pulling anything from Kafka

2017-07-28 Thread Vlad Badelita (JIRA)
Vlad Badelita created SPARK-21561:
-

 Summary: spark-streaming-kafka-010 DSteam is not pulling anything 
from Kafka
 Key: SPARK-21561
 URL: https://issues.apache.org/jira/browse/SPARK-21561
 Project: Spark
  Issue Type: Bug
  Components: DStreams
Affects Versions: 2.1.1
Reporter: Vlad Badelita


I am trying to use spark-streaming-kafka-0.10 to pull messages from a kafka 
topic(broker version 0.10). I have checked that messages are being produced and 
used a KafkaConsumer to pull them successfully. Now, when I try to use the 
spark streaming api, I am not getting anything. If I just use 
KafkaUtils.createRDD and specify some offset ranges manually it works. But 
when, I try to use createDirectStream, all the rdds are empty and when I check 
the partition offsets it simply reports that all partitions are 0. Here is what 
I tried:

{code:scala}
 val sparkConf = new SparkConf().setAppName("kafkastream")
 val ssc = new StreamingContext(sparkConf, Seconds(3))
 val topics = Array("my_topic")

 val kafkaParams = Map[String, Object](
   "bootstrap.servers" -> "hostname:6667"
   "key.deserializer" -> classOf[StringDeserializer],
   "value.deserializer" -> classOf[StringDeserializer],
   "group.id" -> "my_group",
   "auto.offset.reset" -> "earliest",
   "enable.auto.commit" -> (true: java.lang.Boolean)
 )

 val stream = KafkaUtils.createDirectStream[String, String](
   ssc,
   PreferConsistent,
   Subscribe[String, String](topics, kafkaParams)
 )

 stream.foreachRDD { rdd =>
   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   rdd.foreachPartition { iter =>
 val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
 println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
   }

   val rddCount = rdd.count()
   println("rdd count: ", rddCount)

   // stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
 }

 ssc.start()
 ssc.awaitTermination()
{code}

All partitions show offset ranges from 0 to 0 and all rdds are empty. I would 
like it to start from the beginning of a partition but also pick up everything 
that is being produced to it.

I have also tried using spark-streaming-kafka-0.8 and it does work. I think it 
is a 0.10 issue because everything else works fine. Thank you!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org