Hi Sean/All, I am importing among various other things the newer mapreduce version -
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.spark.SparkConf; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; I am using Java 7 and CDH5.1.2 distribution. Also I was looking at the JavaPairDStream source code - Here this is what I have for the definition of saveAsNewAPIHadoopFiles /** * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ def saveAsNewAPIHadoopFiles( prefix: String, suffix: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: NewOutputFormat[_, _]], conf: Configuration = new Configuration) { dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf) } I have no experience in Scala. What does this mean in Scala - Class[_ <: NewOutputFormat[_, _]] Also I was looking at the source code for the regular JavaPairRDD. In here saveAsNewAPIHadoopFile is defined as follows - /** Output the RDD to any Hadoop-supported file system. */ def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]]( path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[F], conf: Configuration) { rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf) } I know that I have run the non-streaming version of wordcount and this works.... wordCounts.saveAsNewAPIHadoopFile(outputDir, Text.class, IntWritable.class, TextOutputFormat.class, clstrConf); On Sat, Oct 4, 2014 at 1:33 AM, Sean Owen <so...@cloudera.com> wrote: > Are you importing the '.mapred.' version of TextOutputFormat instead > of the new API '.mapreduce.' version? > > On Sat, Oct 4, 2014 at 1:08 AM, Abraham Jacob <abe.jac...@gmail.com> > wrote: > > Hi All, > > > > > > Would really appreciate if someone in the community can help me with > this. I > > have a simple Java spark streaming application - NetworkWordCount > > > > SparkConf sparkConf = new > > SparkConf().setMaster("yarn-cluster").setAppName("Streaming WordCount"); > > JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new > > Duration(1000)); > > JavaReceiverInputDStream<String> lines = jssc.socketTextStream(hostname, > > port); > > JavaDStream<String> words = lines.flatMap(new SplitLines()); > > JavaPairDStream<String, Integer> wordMap = words.mapToPair(new > MapWords()); > > JavaPairDStream<String, Integer> wordCount = wordMap.reduceByKey(new > > ReduceWords()); > > wordCount.saveAsNewAPIHadoopFiles(output + "/wordcount", "txt", > Text.class, > > IntWritable.class, TextOutputFormat.class, clstrConf); > > jssc.start(); > > jssc.awaitTermination(); > > > > > > I have an issue with this line > > > > wordCount.saveAsNewAPIHadoopFiles(output + "/wordcount", "txt", > Text.class, > > IntWritable.class, TextOutputFormat.class, clstrConf); > > > > it complains of the following - > > > > The method saveAsNewAPIHadoopFiles(String, String, Class<?>, Class<?>, > > Class<? extends OutputFormat<?,?>>, Configuration) in the type > > JavaPairDStream<String,Integer> is not applicable for the arguments > (String, > > String, Class<Text>, Class<IntWritable>, Class<TextOutputFormat>, > > Configuration) > > > > > > In the saveAsNewAPIHadoopFile for JavaPairRDD this worked perfectly > fine... > > > > > > Would appreciate if someone could help me with this...? > > > > -- > > abe > -- ~