I have a DirectStream and process data from Kafka,
val directKafkaStream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams1, topics1.toSet)
directKafkaStream.foreachRDD { rdd =>
val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
When I have added a new DirectStream and do a union between both it doesn't
work. I thought that it was the same type, but I got a ClassCastException
val directKafkaStream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams1, topics1.toSet)
val directKafkaStream2 = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams2, topics2.toSet)
val kafkaStream = directKafkaStream.union(directKafkaStream2)
kafkaStream.foreachRDD { rdd =>
val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
-->Exception
Exception in thread "main" java.lang.ClassCastException:
org.apache.spark.rdd.UnionRDD cannot be cast to
org.apache.spark.streaming.kafka.HasOffsetRanges
at
com.produban.metrics.MetricsSpark$$anonfun$main$1.apply(MetricsSpark.scala:72)
I guessed that rdd.union(rdd2) gives same type of RDD..