I had solved the problem and found the reason, because I used the Master node
to upload files to hdfs, this action may take up a lot of Master's network
resources. When I changed to use another computer none of the cluster to
upload these files, it got the correct result. 

QingFeng
Tathagata Das wrote
> A very crucial thing to remember when using file stream is that the files
> must be written to the monitored directory "atomically". That is when the
> file system show the file in its listing, the file should not be appended
> /
> updated after that. That often causes this kind of issues, as spark
> streaming may the file (soon after it is visible in the listing) and may
> try to process it even before all of the data has been written.
> 
> So the best way to feed data into spark streaming is to write the file to
> a
> temp dir, and them "move" / "rename" them into the monitored directory.
> That makes it "atomic". This is mentioned in the API docs of
> fileStream<http://spark.apache.org/docs/0.9.1/api/streaming/index.html#org.apache.spark.streaming.StreamingContext>
> .
> 
> TD
> 
> 
> 
> On Sun, May 11, 2014 at 7:30 PM, zzzzzqf12345 <

> zzzzzqf12345@

> >wrote:
> 
>> when I put 200 png files to Hdfs , I found sparkStreaming counld detect
>> 200
>> files , but the sum of rdd.count() is less than 200, always between 130
>> and
>> 170, I don't know why...Is this a Bug?
>> PS: When I put 200 files in hdfs before streaming run , It get the
>> correct
>> count and right result.
>>
>> Here is the code:
>>
>> def main(args: Array[String]) {
>> val conf = new SparkConf().setMaster(SparkURL)
>> .setAppName("QimageStreaming-broadcast")
>> .setSparkHome(System.getenv("SPARK_HOME"))
>> .setJars(SparkContext.jarOfClass(this.getClass()))
>> conf.set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>> conf.set("spark.kryo.registrator", "qing.hdu.Image.MyRegistrator")
>> conf.set("spark.kryoserializer.buffer.mb", "10");
>> val ssc = new StreamingContext(conf, Seconds(2))
>> val inputFormatClass = classOf[QimageInputFormat[Text, Qimage]]
>> val outputFormatClass = classOf[QimageOutputFormat[Text, Qimage]]
>> val input_path = HdfsURL + "/Qimage/input"
>> val output_path = HdfsURL + "/Qimage/output/"
>> val bg_path = HdfsURL + "/Qimage/bg/"
>> val bg = ssc.sparkContext.newAPIHadoopFile[Text, Qimage,
>> QimageInputFormat[Text, Qimage]](bg_path)
>> val bbg = bg.map(data => (data._1.toString(), data._2))
>> val broadcastbg = ssc.sparkContext.broadcast(bbg)
>> val file = ssc.fileStream[Text, Qimage, QimageInputFormat[Text,
>> Qimage]](input_path)
>> val qingbg = broadcastbg.value.collectAsMap
>> val foreachFunc = (rdd: RDD[(Text, Qimage)], time: Time) => {
>> val rddnum = rdd.count
>> System.out.println("\n\n"+ "rddnum is " + rddnum + "\n\n")
>> if (rddnum > 0)
>> {
>> System.out.println("here is foreachFunc")
>> val a = rdd.keys
>> val b = a.first
>>  val cbg = qingbg.get(getbgID(b)).getOrElse(new Qimage)
>> rdd.map(data => (data._1, (new QimageProc(data._1, data._2)).koutu(cbg)))
>> .saveAsNewAPIHadoopFile(output_path, classOf[Text], classOf[Qimage],
>> outputFormatClass) }
>> }
>> file.foreachRDD(foreachFunc)
>> ssc.start()
>> ssc.awaitTermination()
>> }
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/streaming-on-hdfs-can-detected-all-new-file-but-the-sum-of-all-the-rdd-count-not-equals-which-had-ded-tp5572.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/streaming-on-hdfs-can-detected-all-new-file-but-the-sum-of-all-the-rdd-count-not-equals-which-had-ded-tp5572p5637.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to