I had solved the problem and found the reason, because I used the Master node to upload files to hdfs, this action may take up a lot of Master's network resources. When I changed to use another computer none of the cluster to upload these files, it got the correct result.
QingFeng Tathagata Das wrote > A very crucial thing to remember when using file stream is that the files > must be written to the monitored directory "atomically". That is when the > file system show the file in its listing, the file should not be appended > / > updated after that. That often causes this kind of issues, as spark > streaming may the file (soon after it is visible in the listing) and may > try to process it even before all of the data has been written. > > So the best way to feed data into spark streaming is to write the file to > a > temp dir, and them "move" / "rename" them into the monitored directory. > That makes it "atomic". This is mentioned in the API docs of > fileStream<http://spark.apache.org/docs/0.9.1/api/streaming/index.html#org.apache.spark.streaming.StreamingContext> > . > > TD > > > > On Sun, May 11, 2014 at 7:30 PM, zzzzzqf12345 < > zzzzzqf12345@ > >wrote: > >> when I put 200 png files to Hdfs , I found sparkStreaming counld detect >> 200 >> files , but the sum of rdd.count() is less than 200, always between 130 >> and >> 170, I don't know why...Is this a Bug? >> PS: When I put 200 files in hdfs before streaming run , It get the >> correct >> count and right result. >> >> Here is the code: >> >> def main(args: Array[String]) { >> val conf = new SparkConf().setMaster(SparkURL) >> .setAppName("QimageStreaming-broadcast") >> .setSparkHome(System.getenv("SPARK_HOME")) >> .setJars(SparkContext.jarOfClass(this.getClass())) >> conf.set("spark.serializer", >> "org.apache.spark.serializer.KryoSerializer") >> conf.set("spark.kryo.registrator", "qing.hdu.Image.MyRegistrator") >> conf.set("spark.kryoserializer.buffer.mb", "10"); >> val ssc = new StreamingContext(conf, Seconds(2)) >> val inputFormatClass = classOf[QimageInputFormat[Text, Qimage]] >> val outputFormatClass = classOf[QimageOutputFormat[Text, Qimage]] >> val input_path = HdfsURL + "/Qimage/input" >> val output_path = HdfsURL + "/Qimage/output/" >> val bg_path = HdfsURL + "/Qimage/bg/" >> val bg = ssc.sparkContext.newAPIHadoopFile[Text, Qimage, >> QimageInputFormat[Text, Qimage]](bg_path) >> val bbg = bg.map(data => (data._1.toString(), data._2)) >> val broadcastbg = ssc.sparkContext.broadcast(bbg) >> val file = ssc.fileStream[Text, Qimage, QimageInputFormat[Text, >> Qimage]](input_path) >> val qingbg = broadcastbg.value.collectAsMap >> val foreachFunc = (rdd: RDD[(Text, Qimage)], time: Time) => { >> val rddnum = rdd.count >> System.out.println("\n\n"+ "rddnum is " + rddnum + "\n\n") >> if (rddnum > 0) >> { >> System.out.println("here is foreachFunc") >> val a = rdd.keys >> val b = a.first >> val cbg = qingbg.get(getbgID(b)).getOrElse(new Qimage) >> rdd.map(data => (data._1, (new QimageProc(data._1, data._2)).koutu(cbg))) >> .saveAsNewAPIHadoopFile(output_path, classOf[Text], classOf[Qimage], >> outputFormatClass) } >> } >> file.foreachRDD(foreachFunc) >> ssc.start() >> ssc.awaitTermination() >> } >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/streaming-on-hdfs-can-detected-all-new-file-but-the-sum-of-all-the-rdd-count-not-equals-which-had-ded-tp5572.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/streaming-on-hdfs-can-detected-all-new-file-but-the-sum-of-all-the-rdd-count-not-equals-which-had-ded-tp5572p5637.html Sent from the Apache Spark User List mailing list archive at Nabble.com.