There must be any easy way to count the number of rows in JavaDStream.
JavaDStream<String> words;
JavaDStream<Long> hardToUse = words();
JavaDStream does not seem to have a collect().
The following works but is very clumsy
Any suggestions would be greatly appreciated
Andy
public class JavaDStreamCount<T> implements Serializable {
private static final long serialVersionUID = -3600586183332429887L;
public static Logger logger =
LoggerFactory.getLogger(JavaDStreamCount.class);
/**
* TODO in 1.6 should be able to use a lambda function
* @see https://issues.apache.org/jira/browse/SPARK-4557
* @param total
* @param javaDStream
* @return
*/
@Deprecated
public Double hack(Accumulator<Double> total, JavaDStream<T>
javaDStream) {
Count c = new Count(total);
javaDStream.foreachRDD(c);
return c.getTotal().value();
}
class Count implements Function<JavaRDD<T>,Void> {
private static final long serialVersionUID = -5239727633710162488L;
private Accumulator<Double> total;
public Count(Accumulator<Double> total) {
this.total = total;
}
@Override
public java.lang.Void call(JavaRDD<T> rdd) throws Exception {
List<T> data = rdd.collect();
int dataSize = data.size();
logger.info("Accumulator name:{} data.size:{}", total.name(),
dataSize);
long num = rdd.count();
logger.info("num:{}", num);
total.add(new Double(num));
return null;
}
public Accumulator<Double> getTotal() {
return total;
}
}
}