Re: looking for a easier way to count the number of items in a JavaDStream

2015-12-16 Thread Bryan Cutler
To follow up with your other issue, if you are just trying to count
elements in a DStream, you can do that without an Accumulator.  foreachRDD
is meant to be an output action, it does not return anything and it is
actually run in the driver program.  Because Java (before 8) handles
closures a little differently, it might be easiest to implement the
function to pass to foreachRDD as something like this:

class MyFunc implements VoidFunction {

  public long total = 0;

  @Override
  public void call(JavaRDD rdd) {
System.out.println("foo " + rdd.collect().toString());
total += rdd.count();
  }
}

MyFunc f = new MyFunc();

inputStream.foreachRDD(f);

// f.total will have the count of all RDDs

Hope that helps some!

-bryan

On Wed, Dec 16, 2015 at 8:37 AM, Bryan Cutler  wrote:

> Hi Andy,
>
> Regarding the foreachrdd return value, this Jira that will be in 1.6
> should take care of that https://issues.apache.org/jira/browse/SPARK-4557
> and make things a little simpler.
> On Dec 15, 2015 6:55 PM, "Andy Davidson" 
> wrote:
>
>> I am writing  a JUnit test for some simple streaming code. I want to
>> make assertions about how many things are in a given JavaDStream. I wonder
>> if there is an easier way in Java to get the count?
>>
>> I think there are two points of friction.
>>
>>
>>1. is it easy to create an accumulator of type double or int, How
>>ever Long is not supported
>>2. We need to use javaDStream.foreachRDD. The Function interface must
>>return void. I was not able to define an accumulator in my driver and
>>use a lambda function. (I am new to lambda in Java)
>>
>> Here is a little lambda example that logs my test objects. I was not
>> able to figure out how to get  to return a value or access a accumulator
>>
>>data.foreachRDD(rdd -> {
>>
>> logger.info(“Begin data.foreachRDD" );
>>
>> for (MyPojo pojo : rdd.collect()) {
>>
>> logger.info("\n{}", pojo.toString());
>>
>> }
>>
>> return null;
>>
>> });
>>
>>
>> Any suggestions would be greatly appreciated
>>
>> Andy
>>
>> This following code works in my driver but is a lot of code for such a
>> trivial computation. Because it needs to the JavaSparkContext I do not
>> think it would work inside a closure. I assume the works do not have access
>> to the context as a global and that it shipping it in the closure is not a
>> good idea?
>>
>> public class JavaDStreamCount implements Serializable {
>>
>> private static final long serialVersionUID = -3600586183332429887L;
>>
>> public static Logger logger =
>> LoggerFactory.getLogger(JavaDStreamCount.class);
>>
>>
>>
>> public Double hack(JavaSparkContext sc, JavaDStream javaDStream) {
>>
>> Count c = new Count(sc);
>>
>> javaDStream.foreachRDD(c);
>>
>> return c.getTotal().value();
>>
>> }
>>
>>
>>
>> class Count implements Function {
>>
>> private static final long serialVersionUID =
>> -5239727633710162488L;
>>
>> Accumulator total;
>>
>>
>>
>> public Count(JavaSparkContext sc) {
>>
>> total = sc.accumulator(0.0);
>>
>> }
>>
>>
>>
>> @Override
>>
>> public java.lang.Void call(JavaRDD rdd) throws Exception {
>>
>> List data = rdd.collect();
>>
>> int dataSize = data.size();
>>
>> logger.error("data.size:{}", dataSize);
>>
>> long num = rdd.count();
>>
>> logger.error("num:{}", num);
>>
>> total.add(new Double(num));
>>
>> return null;
>>
>> }
>>
>>
>> public Accumulator getTotal() {
>>
>> return total;
>>
>> }
>>
>> }
>>
>> }
>>
>>
>>
>>
>>


Re: looking for a easier way to count the number of items in a JavaDStream

2015-12-16 Thread Todd Nist
Another possible alternative is to register a StreamingListener and then
reference the BatchInfo.numRecords; good example here,
https://gist.github.com/akhld/b10dc491aad1a2007183.

After registering the listener, Simply implement the appropriate "onEvent"
method where onEvent is onBatchStarted, onBatchCompleted, ..., for example:

public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted)
{ System.out.println("Batch completed, Total records :" + batchCompleted.
batchInfo().numRecords().get().toString()); } That should be very efficient
and avoid any collects(), just to obtain the count of records on the
DStream.

HTH.

-Todd

On Wed, Dec 16, 2015 at 3:34 PM, Bryan Cutler  wrote:

> To follow up with your other issue, if you are just trying to count
> elements in a DStream, you can do that without an Accumulator.  foreachRDD
> is meant to be an output action, it does not return anything and it is
> actually run in the driver program.  Because Java (before 8) handles
> closures a little differently, it might be easiest to implement the
> function to pass to foreachRDD as something like this:
>
> class MyFunc implements VoidFunction {
>
>   public long total = 0;
>
>   @Override
>   public void call(JavaRDD rdd) {
> System.out.println("foo " + rdd.collect().toString());
> total += rdd.count();
>   }
> }
>
> MyFunc f = new MyFunc();
>
> inputStream.foreachRDD(f);
>
> // f.total will have the count of all RDDs
>
> Hope that helps some!
>
> -bryan
>
> On Wed, Dec 16, 2015 at 8:37 AM, Bryan Cutler  wrote:
>
>> Hi Andy,
>>
>> Regarding the foreachrdd return value, this Jira that will be in 1.6
>> should take care of that https://issues.apache.org/jira/browse/SPARK-4557
>> and make things a little simpler.
>> On Dec 15, 2015 6:55 PM, "Andy Davidson" 
>> wrote:
>>
>>> I am writing  a JUnit test for some simple streaming code. I want to
>>> make assertions about how many things are in a given JavaDStream. I wonder
>>> if there is an easier way in Java to get the count?
>>>
>>> I think there are two points of friction.
>>>
>>>
>>>1. is it easy to create an accumulator of type double or int, How
>>>ever Long is not supported
>>>2. We need to use javaDStream.foreachRDD. The Function interface
>>>must return void. I was not able to define an accumulator in my driver
>>>and use a lambda function. (I am new to lambda in Java)
>>>
>>> Here is a little lambda example that logs my test objects. I was not
>>> able to figure out how to get  to return a value or access a accumulator
>>>
>>>data.foreachRDD(rdd -> {
>>>
>>> logger.info(“Begin data.foreachRDD" );
>>>
>>> for (MyPojo pojo : rdd.collect()) {
>>>
>>> logger.info("\n{}", pojo.toString());
>>>
>>> }
>>>
>>> return null;
>>>
>>> });
>>>
>>>
>>> Any suggestions would be greatly appreciated
>>>
>>> Andy
>>>
>>> This following code works in my driver but is a lot of code for such a
>>> trivial computation. Because it needs to the JavaSparkContext I do not
>>> think it would work inside a closure. I assume the works do not have access
>>> to the context as a global and that it shipping it in the closure is not a
>>> good idea?
>>>
>>> public class JavaDStreamCount implements Serializable {
>>>
>>> private static final long serialVersionUID = -3600586183332429887L;
>>>
>>> public static Logger logger =
>>> LoggerFactory.getLogger(JavaDStreamCount.class);
>>>
>>>
>>>
>>> public Double hack(JavaSparkContext sc, JavaDStream javaDStream)
>>> {
>>>
>>> Count c = new Count(sc);
>>>
>>> javaDStream.foreachRDD(c);
>>>
>>> return c.getTotal().value();
>>>
>>> }
>>>
>>>
>>>
>>> class Count implements Function {
>>>
>>> private static final long serialVersionUID =
>>> -5239727633710162488L;
>>>
>>> Accumulator total;
>>>
>>>
>>>
>>> public Count(JavaSparkContext sc) {
>>>
>>> total = sc.accumulator(0.0);
>>>
>>> }
>>>
>>>
>>>
>>> @Override
>>>
>>> public java.lang.Void call(JavaRDD rdd) throws Exception {
>>>
>>> List data = rdd.collect();
>>>
>>> int dataSize = data.size();
>>>
>>> logger.error("data.size:{}", dataSize);
>>>
>>> long num = rdd.count();
>>>
>>> logger.error("num:{}", num);
>>>
>>> total.add(new Double(num));
>>>
>>> return null;
>>>
>>> }
>>>
>>>
>>> public Accumulator getTotal() {
>>>
>>> return total;
>>>
>>> }
>>>
>>> }
>>>
>>> }
>>>
>>>
>>>
>>>
>>>
>


Re: looking for a easier way to count the number of items in a JavaDStream

2015-12-16 Thread Bryan Cutler
Hi Andy,

Regarding the foreachrdd return value, this Jira that will be in 1.6 should
take care of that https://issues.apache.org/jira/browse/SPARK-4557 and make
things a little simpler.
On Dec 15, 2015 6:55 PM, "Andy Davidson" 
wrote:

> I am writing  a JUnit test for some simple streaming code. I want to make
> assertions about how many things are in a given JavaDStream. I wonder if there
> is an easier way in Java to get the count?
>
> I think there are two points of friction.
>
>
>1. is it easy to create an accumulator of type double or int, How ever
>Long is not supported
>2. We need to use javaDStream.foreachRDD. The Function interface must
>return void. I was not able to define an accumulator in my driver and
>use a lambda function. (I am new to lambda in Java)
>
> Here is a little lambda example that logs my test objects. I was not able
> to figure out how to get  to return a value or access a accumulator
>
>data.foreachRDD(rdd -> {
>
> logger.info(“Begin data.foreachRDD" );
>
> for (MyPojo pojo : rdd.collect()) {
>
> logger.info("\n{}", pojo.toString());
>
> }
>
> return null;
>
> });
>
>
> Any suggestions would be greatly appreciated
>
> Andy
>
> This following code works in my driver but is a lot of code for such a
> trivial computation. Because it needs to the JavaSparkContext I do not
> think it would work inside a closure. I assume the works do not have access
> to the context as a global and that it shipping it in the closure is not a
> good idea?
>
> public class JavaDStreamCount implements Serializable {
>
> private static final long serialVersionUID = -3600586183332429887L;
>
> public static Logger logger =
> LoggerFactory.getLogger(JavaDStreamCount.class);
>
>
>
> public Double hack(JavaSparkContext sc, JavaDStream javaDStream) {
>
> Count c = new Count(sc);
>
> javaDStream.foreachRDD(c);
>
> return c.getTotal().value();
>
> }
>
>
>
> class Count implements Function {
>
> private static final long serialVersionUID =
> -5239727633710162488L;
>
> Accumulator total;
>
>
>
> public Count(JavaSparkContext sc) {
>
> total = sc.accumulator(0.0);
>
> }
>
>
>
> @Override
>
> public java.lang.Void call(JavaRDD rdd) throws Exception {
>
> List data = rdd.collect();
>
> int dataSize = data.size();
>
> logger.error("data.size:{}", dataSize);
>
> long num = rdd.count();
>
> logger.error("num:{}", num);
>
> total.add(new Double(num));
>
> return null;
>
> }
>
>
> public Accumulator getTotal() {
>
> return total;
>
> }
>
> }
>
> }
>
>
>
>
>


looking for a easier way to count the number of items in a JavaDStream

2015-12-15 Thread Andy Davidson
I am writing  a JUnit test for some simple streaming code. I want to make
assertions about how many things are in a given JavaDStream. I wonder if
there is an easier way in Java to get the count?

I think there are two points of friction.

1. is it easy to create an accumulator of type double or int, How ever Long
is not supported
2. We need to use javaDStream.foreachRDD. The Function interface must return
void. I was not able to define an accumulator in my driver and use a lambda
function. (I am new to lambda in Java)
Here is a little lambda example that logs my test objects. I was not able to
figure out how to get  to return a value or access a accumulator
   data.foreachRDD(rdd -> {

logger.info(³Begin data.foreachRDD" );

for (MyPojo pojo : rdd.collect()) {

logger.info("\n{}", pojo.toString());

}

return null;

});



Any suggestions would be greatly appreciated

Andy

This following code works in my driver but is a lot of code for such a
trivial computation. Because it needs to the JavaSparkContext I do not think
it would work inside a closure. I assume the works do not have access to the
context as a global and that it shipping it in the closure is not a good
idea?

public class JavaDStreamCount implements Serializable {

private static final long serialVersionUID = -3600586183332429887L;

public static Logger logger =
LoggerFactory.getLogger(JavaDStreamCount.class);



public Double hack(JavaSparkContext sc, JavaDStream javaDStream) {

Count c = new Count(sc);

javaDStream.foreachRDD(c);

return c.getTotal().value();

}



class Count implements Function {

private static final long serialVersionUID = -5239727633710162488L;

Accumulator total;



public Count(JavaSparkContext sc) {

total = sc.accumulator(0.0);

}



@Override

public java.lang.Void call(JavaRDD rdd) throws Exception {

List data = rdd.collect();

int dataSize = data.size();

logger.error("data.size:{}", dataSize);

long num = rdd.count();

logger.error("num:{}", num);

total.add(new Double(num));

return null;

}



public Accumulator getTotal() {

return total;

}

}

}