Hi,
Can anybody help me on this please, haven't been  able to find the problem
:(

Thanks.
On Nov 15, 2014 4:48 PM, "Bahubali Jain" <bahub...@gmail.com> wrote:

> Hi,
> Trying to use spark streaming, but I am struggling with word count :(
> I want consolidate output of the word count (not on a per window basis),
> so I am using updateStateByKey(), but for some reason this is not working.
> The function it self is not being invoked(do not see the sysout output on
> console).
>
>
> public final class WordCount {
>   private static final Pattern SPACE = Pattern.compile(" ");
>
>   public static void main(String[] args) {
>         if (args.length < 2) {
>           System.err.println("Usage: JavaNetworkWordCount <hostname>
> <port>");
>           System.exit(1);
>         }
>
>          // Create the context with a 1 second batch size
>         SparkConf sparkConf = new
> SparkConf().setAppName("JavaNetworkWordCount");
>         JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
> new Duration(1000));
>         ssc.checkpoint("/tmp/worcount");
>         // Create a JavaReceiverInputDStream on target ip:port and count
> the
>         // words in input stream of \n delimited text (eg. generated by
> 'nc')
>         // Note that no duplication in storage level only for running
> locally.
>         // Replication necessary in distributed scenario for fault
> tolerance.
>         JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
>                 args[0], Integer.parseInt(args[1]),
> StorageLevels.MEMORY_AND_DISK_SER);
>         JavaDStream<String> words = lines.flatMap(new
> FlatMapFunction<String, String>() {
>           @Override
>           public Iterable<String> call(String x) {
>             return Lists.newArrayList(SPACE.split(x));
>           }
>         });
>
>         JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
>           new PairFunction<String, String, Integer>() {
>             @Override
>             public Tuple2<String, Integer> call(String s) {
>                 System.err.println("Got "+s);
>               return new Tuple2<String, Integer>(s, 1);
>             }
>           }).reduceByKey(new Function2<Integer, Integer, Integer>() {
>             @Override
>             public Integer call(Integer i1, Integer i2) {
>               return i1 + i2;
>             }
>           });
>
>         wordCounts.print();
>
> *        wordCounts.updateStateByKey(new updateFunction());*
>  ssc.start();
>         ssc.awaitTermination();
>   }
> }
>
> class updateFunction implements Function2<List<Integer>,
> Optional<Integer>, Optional<Integer>>
> {
>
>       @Override public Optional<Integer> call(List<Integer> values,
> Optional<Integer> state) {
>
>          Integer x = new Integer(0);
>          for (Integer i:values)
>              x = x+i;
>         Integer newSum = state.or(0)+x;  // add the new values with the
> previous running count to get the new count
>         System.out.println("Newsum is "+newSum);
>         return Optional.of(newSum);
>
>       };
>
> }
>

Reply via email to