This is how i do it: val tmp = test.map(x => (x, 1L)).reduceByWindow({ case ((word1, count1), (word2, count2)) => (word1 + " " + word2, count1 + count2)}, Seconds(10), Seconds(10))
In your case you are actually having a type mismatch: [image: Inline image 1] Thanks Best Regards On Sat, Jan 31, 2015 at 5:30 AM, Eduardo Costa Alfaia < e.costaalf...@unibs.it> wrote: > Hi Guys, > > some idea how solve this error > > [error] > /sata_disk/workspace/spark-1.1.1/examples/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala:76: > missing parameter type for expanded function ((x$6, x$7) => x$6.$plus(x$7)) > > [error] val wordCounts = words.map(x => (x, 1L)).reduceByWindow(_ + > _, _ - _, Minutes(1), Seconds(2), 2) > > Thanks > > Informativa sulla Privacy: http://www.unibs.it/node/8155