If you take into account what streaming means in spark, your goal doesn't really make sense; you have to assume that your streams are infinite and you will have to process them till the end of the days. Operations on a DStream define what you want to do with each element of each RDD, but spark streaming is smart enough to not apply the transformations if RDD are empty.
The only time where you probably want to know the size of the RDD is when you are going to perform a side-effect like storing something in a database, using foreachRDD, i.e: val flumeStream = ... val transformedStream = flumeStream.map(... some transformation ...).flatMap(... some other transformation).distinct()..... transformedStream.foreachRDD { rdd => if (rdd.count() != 0) { // perform some side effect that shouldn't be done if a transformed batch is empty } } 2014-09-09 9:20 GMT+01:00 julyfire <hellowe...@gmail.com>: > i'm sorry I have some error in my code, update here: > > var count = -1L // a global variable in the main object > > val currentBatch = some_DStream > val countDStream = currentBatch.map(o=>{ > count = 0L // reset the count variable in each batch > o > }) > countDStream.foreachRDD(rdd=> count += rdd.count()) > > if (count > 0) { > currentBatch.map(...).someOtherTransformation > } > > two problems: > 1. the variable count just go on accumulate and no reset in each batch > 2. if(count > 0) only evaluate in the beginning of running the program, so > the next statement will never run > > Can you all give me some suggestion? thanks > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13781.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >