I have a DirectStream and process data from Kafka, val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams1, topics1.toSet) directKafkaStream.foreachRDD { rdd => val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
When I have added a new DirectStream and do a union between both it doesn't work. I thought that it was the same type, but I got a ClassCastException val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams1, topics1.toSet) val directKafkaStream2 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams2, topics2.toSet) val kafkaStream = directKafkaStream.union(directKafkaStream2) kafkaStream.foreachRDD { rdd => val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges -->Exception Exception in thread "main" java.lang.ClassCastException: org.apache.spark.rdd.UnionRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges at com.produban.metrics.MetricsSpark$$anonfun$main$1.apply(MetricsSpark.scala:72) I guessed that rdd.union(rdd2) gives same type of RDD..