There must be any easy way to count the number of rows in JavaDStream.

JavaDStream<String> words;

JavaDStream<Long> hardToUse = words();





JavaDStream does not seem to have a collect().



The following works but is very clumsy



Any suggestions would be greatly appreciated



Andy



public class JavaDStreamCount<T> implements Serializable {

    private static final long serialVersionUID = -3600586183332429887L;

    public static Logger logger =
LoggerFactory.getLogger(JavaDStreamCount.class);

    

    /**

     * TODO in 1.6 should be able to use a lambda function

     * @see https://issues.apache.org/jira/browse/SPARK-4557

     * @param total

     * @param javaDStream

     * @return

     */

    @Deprecated

    public Double hack(Accumulator<Double> total, JavaDStream<T>
javaDStream) {

        Count c = new Count(total);

        javaDStream.foreachRDD(c);

        return c.getTotal().value();

    }

    

    class Count implements Function<JavaRDD<T>,Void> {

        private static final long serialVersionUID = -5239727633710162488L;

        private Accumulator<Double> total;

        

        public Count(Accumulator<Double> total) {

            this.total = total;

        }

        

        @Override

        public java.lang.Void call(JavaRDD<T> rdd) throws Exception {

            List<T> data = rdd.collect();

            int dataSize = data.size();

            logger.info("Accumulator name:{} data.size:{}", total.name(),
dataSize);

            long num = rdd.count();

            logger.info("num:{}", num);

            total.add(new Double(num));

            return null;

        }



        public Accumulator<Double> getTotal() {

            return total;

        }

        

    }

}






Reply via email to