I guess, maybe you don’t need invoke reduceByKey() after mapToPair, because updateStateByKey had covered it. For your reference, here is a sample written by scala using text file stream instead of socket as below:
object LocalStatefulWordCount extends App { val sparkConf = new SparkConf().setAppName("HdfsWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) //must set checkpoint for updateStateByKey //note: checkpoint derectory can not be source directory ssc.checkpoint("./checkpoint") val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.foldLeft(0)(_ + _) val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } val lines = ssc.textFileStream("/Users/twer/workspace/scala101/data") //local directory val wordDstream = lines.flatMap(_.split(" ")).map(x => (x, 1)) val statefulWordCount = wordDstream.updateStateByKey[Int](updateFunc) statefulWordCount.print() ssc.start() ssc.awaitTermination() } Zhang Yi / 张逸 Lead Consultant Email yizh...@thoughtworks.com (mailto:yizh...@thoughtworks.com) Telephone +86 15023157626 (mailto:+86 15023157626) Sent with Sparrow (http://www.sparrowmailapp.com/?sig) On Sunday, November 16, 2014 at 6:19 PM, Bahubali Jain wrote: > Hi, > Can anybody help me on this please, haven't been able to find the problem :( > > Thanks. > On Nov 15, 2014 4:48 PM, "Bahubali Jain" <bahub...@gmail.com > (mailto:bahub...@gmail.com)> wrote: > > Hi, > > Trying to use spark streaming, but I am struggling with word count :( > > I want consolidate output of the word count (not on a per window basis), so > > I am using updateStateByKey(), but for some reason this is not working. > > The function it self is not being invoked(do not see the sysout output on > > console). > > > > > > public final class WordCount { > > private static final Pattern SPACE = Pattern.compile(" "); > > > > public static void main(String[] args) { > > if (args.length < 2) { > > System.err.println("Usage: JavaNetworkWordCount <hostname> > > <port>"); > > System.exit(1); > > } > > > > // Create the context with a 1 second batch size > > SparkConf sparkConf = new > > SparkConf().setAppName("JavaNetworkWordCount"); > > JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new > > Duration(1000)); > > ssc.checkpoint("/tmp/worcount"); > > // Create a JavaReceiverInputDStream on target ip:port and count the > > // words in input stream of \n delimited text (eg. generated by > > 'nc') > > // Note that no duplication in storage level only for running > > locally. > > // Replication necessary in distributed scenario for fault > > tolerance. > > JavaReceiverInputDStream<String> lines = ssc.socketTextStream( > > args[0], Integer.parseInt(args[1]), > > StorageLevels.MEMORY_AND_DISK_SER); > > JavaDStream<String> words = lines.flatMap(new > > FlatMapFunction<String, String>() { > > @Override > > public Iterable<String> call(String x) { > > return Lists.newArrayList(SPACE.split(x)); > > } > > }); > > > > JavaPairDStream<String, Integer> wordCounts = words.mapToPair( > > new PairFunction<String, String, Integer>() { > > @Override > > public Tuple2<String, Integer> call(String s) { > > System.err.println("Got "+s); > > return new Tuple2<String, Integer>(s, 1); > > } > > }).reduceByKey(new Function2<Integer, Integer, Integer>() { > > @Override > > public Integer call(Integer i1, Integer i2) { > > return i1 + i2; > > } > > }); > > > > wordCounts.print(); > > wordCounts.updateStateByKey(new updateFunction()); > > ssc.start(); > > ssc.awaitTermination(); > > } > > } > > > > class updateFunction implements Function2<List<Integer>, Optional<Integer>, > > Optional<Integer>> > > { > > > > @Override public Optional<Integer> call(List<Integer> values, > > Optional<Integer> state) { > > > > Integer x = new Integer(0); > > for (Integer i:values) > > x = x+i; > > Integer newSum = state.or(0)+x; // add the new values with the > > previous running count to get the new count > > System.out.println("Newsum is "+newSum); > > return Optional.of(newSum); > > > > }; > > > > }