I want to pull HDFS paths from Kafka and build text streams based on those paths. I currently have:
val lines = KafkaUtils.createStream(/* params here */).map(_._2) val buffer = new ArrayBuffer[String]() lines.foreachRDD(rdd => { if (!rdd.partitions.isEmpty) { rdd.collect().foreach(line => { buffer += line }) } }) buffer.foreach(path => { streamingContext.textFileStream(path).foreachRDD(rdd => { println(s"${path} => ${rdd.count()}") }) }) streamingContext.start streamingContext.awaitTermination It's not actually counting any of the files in the paths, and I know the paths are valid. Can someone tell me if this is possible and if so, give me a pointer on how to fix this? Thanks, Mike