Hi Sean/All,

I am importing among various other things the newer mapreduce version -

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;


I am using Java 7 and CDH5.1.2 distribution.

Also I was looking at the JavaPairDStream source code  -  Here this is what
I have for the definition of saveAsNewAPIHadoopFiles

/**
   * Save each RDD in `this` DStream as a Hadoop file. The file name at
each batch interval is
   * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
   */
  def saveAsNewAPIHadoopFiles(
      prefix: String,
      suffix: String,
      keyClass: Class[_],
      valueClass: Class[_],
      outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
      conf: Configuration = new Configuration) {
    dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass,
outputFormatClass, conf)
  }


I have no experience in Scala.  What does this mean in Scala -
Class[_ <: NewOutputFormat[_, _]]


Also I was looking at the source code for the regular JavaPairRDD. In here
saveAsNewAPIHadoopFile is defined as follows -

 /** Output the RDD to any Hadoop-supported file system. */  def
saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](  path: String,
keyClass: Class[_],      valueClass: Class[_],      outputFormatClass:
Class[F],      conf: Configuration) {    rdd.saveAsNewAPIHadoopFile(path,
keyClass, valueClass, outputFormatClass, conf)  }

I know that I have run the non-streaming version of wordcount and this
works....

wordCounts.saveAsNewAPIHadoopFile(outputDir, Text.class, IntWritable.class,
TextOutputFormat.class, clstrConf);






On Sat, Oct 4, 2014 at 1:33 AM, Sean Owen <so...@cloudera.com> wrote:

> Are you importing the '.mapred.' version of TextOutputFormat instead
> of the new API '.mapreduce.' version?
>
> On Sat, Oct 4, 2014 at 1:08 AM, Abraham Jacob <abe.jac...@gmail.com>
> wrote:
> > Hi All,
> >
> >
> > Would really appreciate if someone in the community can help me with
> this. I
> > have a simple Java spark streaming application - NetworkWordCount
> >
> >                 SparkConf sparkConf = new
> > SparkConf().setMaster("yarn-cluster").setAppName("Streaming WordCount");
> > JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
> > Duration(1000));
> > JavaReceiverInputDStream<String> lines = jssc.socketTextStream(hostname,
> > port);
> > JavaDStream<String> words = lines.flatMap(new SplitLines());
> > JavaPairDStream<String, Integer> wordMap = words.mapToPair(new
> MapWords());
> > JavaPairDStream<String, Integer> wordCount = wordMap.reduceByKey(new
> > ReduceWords());
> > wordCount.saveAsNewAPIHadoopFiles(output + "/wordcount", "txt",
> Text.class,
> > IntWritable.class, TextOutputFormat.class, clstrConf);
> > jssc.start();
> > jssc.awaitTermination();
> >
> >
> > I have an issue with this line
> >
> > wordCount.saveAsNewAPIHadoopFiles(output + "/wordcount", "txt",
> Text.class,
> > IntWritable.class, TextOutputFormat.class, clstrConf);
> >
> > it complains of the following -
> >
> > The method saveAsNewAPIHadoopFiles(String, String, Class<?>, Class<?>,
> > Class<? extends OutputFormat<?,?>>, Configuration) in the type
> > JavaPairDStream<String,Integer> is not applicable for the arguments
> (String,
> > String, Class<Text>, Class<IntWritable>, Class<TextOutputFormat>,
> > Configuration)
> >
> >
> > In the saveAsNewAPIHadoopFile for JavaPairRDD this worked perfectly
> fine...
> >
> >
> > Would appreciate if someone could help me with this...?
> >
> > --
> > abe
>



-- 
~

Reply via email to