Hi Andy,
Regarding the foreachrdd return value, this Jira that will be in 1.6 should
take care of that https://issues.apache.org/jira/browse/SPARK-4557 and make
things a little simpler.
On Dec 15, 2015 6:55 PM, "Andy Davidson" <[email protected]>
wrote:
> I am writing a JUnit test for some simple streaming code. I want to make
> assertions about how many things are in a given JavaDStream. I wonder if there
> is an easier way in Java to get the count?
>
> I think there are two points of friction.
>
>
> 1. is it easy to create an accumulator of type double or int, How ever
> Long is not supported
> 2. We need to use javaDStream.foreachRDD. The Function interface must
> return void. I was not able to define an accumulator in my driver and
> use a lambda function. (I am new to lambda in Java)
>
> Here is a little lambda example that logs my test objects. I was not able
> to figure out how to get to return a value or access a accumulator
>
> data.foreachRDD(rdd -> {
>
> logger.info(“Begin data.foreachRDD" );
>
> for (MyPojo pojo : rdd.collect()) {
>
> logger.info("\n{}", pojo.toString());
>
> }
>
> return null;
>
> });
>
>
> Any suggestions would be greatly appreciated
>
> Andy
>
> This following code works in my driver but is a lot of code for such a
> trivial computation. Because it needs to the JavaSparkContext I do not
> think it would work inside a closure. I assume the works do not have access
> to the context as a global and that it shipping it in the closure is not a
> good idea?
>
> public class JavaDStreamCount<T> implements Serializable {
>
> private static final long serialVersionUID = -3600586183332429887L;
>
> public static Logger logger =
> LoggerFactory.getLogger(JavaDStreamCount.class);
>
>
>
> public Double hack(JavaSparkContext sc, JavaDStream<T> javaDStream) {
>
> Count c = new Count(sc);
>
> javaDStream.foreachRDD(c);
>
> return c.getTotal().value();
>
> }
>
>
>
> class Count implements Function<JavaRDD<T>,Void> {
>
> private static final long serialVersionUID =
> -5239727633710162488L;
>
> Accumulator<Double> total;
>
>
>
> public Count(JavaSparkContext sc) {
>
> total = sc.accumulator(0.0);
>
> }
>
>
>
> @Override
>
> public java.lang.Void call(JavaRDD<T> rdd) throws Exception {
>
> List<T> data = rdd.collect();
>
> int dataSize = data.size();
>
> logger.error("data.size:{}", dataSize);
>
> long num = rdd.count();
>
> logger.error("num:{}", num);
>
> total.add(new Double(num));
>
> return null;
>
> }
>
>
> public Accumulator<Double> getTotal() {
>
> return total;
>
> }
>
> }
>
> }
>
>
>
>
>