I guess, maybe you don’t need invoke reduceByKey() after mapToPair, because 
updateStateByKey had covered it. For your reference, here is a sample written 
by scala using text file stream instead of socket as below:

object LocalStatefulWordCount extends App {
  val sparkConf = new SparkConf().setAppName("HdfsWordCount")
  val ssc = new StreamingContext(sparkConf, Seconds(2))

  //must set checkpoint for updateStateByKey
  //note: checkpoint derectory can not be source directory
  ssc.checkpoint("./checkpoint")

  val updateFunc = (values: Seq[Int], state: Option[Int]) => {
    val currentCount = values.foldLeft(0)(_ + _)
    val previousCount = state.getOrElse(0)
    Some(currentCount + previousCount)
  }

  val lines = ssc.textFileStream("/Users/twer/workspace/scala101/data")   
//local directory
  val wordDstream = lines.flatMap(_.split(" ")).map(x => (x, 1))
  val statefulWordCount = wordDstream.updateStateByKey[Int](updateFunc)
  statefulWordCount.print()

  ssc.start()
  ssc.awaitTermination()
}




Zhang Yi / 张逸
Lead Consultant

Email
yizh...@thoughtworks.com (mailto:yizh...@thoughtworks.com)

Telephone
+86 15023157626 (mailto:+86 15023157626)






Sent with Sparrow (http://www.sparrowmailapp.com/?sig)


On Sunday, November 16, 2014 at 6:19 PM, Bahubali Jain wrote:

> Hi,  
> Can anybody help me on this please, haven't been  able to find the problem :( 
>  
> Thanks.  
> On Nov 15, 2014 4:48 PM, "Bahubali Jain" <bahub...@gmail.com 
> (mailto:bahub...@gmail.com)> wrote:
> > Hi,
> > Trying to use spark streaming, but I am struggling with word count :(
> > I want consolidate output of the word count (not on a per window basis), so 
> > I am using updateStateByKey(), but for some reason this is not working.
> > The function it self is not being invoked(do not see the sysout output on 
> > console).
> >  
> >  
> > public final class WordCount {
> >   private static final Pattern SPACE = Pattern.compile(" ");
> >  
> >   public static void main(String[] args) {
> >         if (args.length < 2) {
> >           System.err.println("Usage: JavaNetworkWordCount <hostname> 
> > <port>");
> >           System.exit(1);
> >         }
> >  
> >          // Create the context with a 1 second batch size
> >         SparkConf sparkConf = new 
> > SparkConf().setAppName("JavaNetworkWordCount");
> >         JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,  new 
> > Duration(1000));
> >         ssc.checkpoint("/tmp/worcount");
> >         // Create a JavaReceiverInputDStream on target ip:port and count the
> >         // words in input stream of \n delimited text (eg. generated by 
> > 'nc')
> >         // Note that no duplication in storage level only for running 
> > locally.
> >         // Replication necessary in distributed scenario for fault 
> > tolerance.
> >         JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
> >                 args[0], Integer.parseInt(args[1]), 
> > StorageLevels.MEMORY_AND_DISK_SER);
> >         JavaDStream<String> words = lines.flatMap(new 
> > FlatMapFunction<String, String>() {
> >           @Override
> >           public Iterable<String> call(String x) {
> >             return Lists.newArrayList(SPACE.split(x));
> >           }
> >         });
> >  
> >         JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
> >           new PairFunction<String, String, Integer>() {
> >             @Override
> >             public Tuple2<String, Integer> call(String s) {
> >                 System.err.println("Got "+s);
> >               return new Tuple2<String, Integer>(s, 1);
> >             }
> >           }).reduceByKey(new Function2<Integer, Integer, Integer>() {
> >             @Override
> >             public Integer call(Integer i1, Integer i2) {
> >               return i1 + i2;
> >             }
> >           });
> >  
> >         wordCounts.print();
> >         wordCounts.updateStateByKey(new updateFunction());
> >         ssc.start();
> >         ssc.awaitTermination();
> >   }
> > }
> >  
> > class updateFunction implements Function2<List<Integer>, Optional<Integer>, 
> > Optional<Integer>>  
> > {
> >  
> >       @Override public Optional<Integer> call(List<Integer> values, 
> > Optional<Integer> state) {
> >            
> >          Integer x = new Integer(0);
> >          for (Integer i:values)
> >              x = x+i;
> >         Integer newSum = state.or(0)+x;  // add the new values with the 
> > previous running count to get the new count
> >         System.out.println("Newsum is "+newSum);
> >         return Optional.of(newSum);
> >        
> >       };
> >  
> > }  

Reply via email to