I'm attempting to code a Java only implementation accessing the
StreamingContext.fileStream method and am especially interested in setting
the boolean "newFilesOnly" to false. Unfortunately my code throws
exceptions:

Exception in thread "main" java.lang.InstantiationException
        at
sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
        at java.lang.Class.newInstance(Class.java:374)
        at 
org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:83)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)

whenever the files are opened. The exceptions are generated whether or not I
invoke the longer form of the fileStream method. I can use the
JavaStreamingContext version successfully, but don't have access to the
boolean flag in this case. If someone sees an issue with the code below, I
would be very grateful for a nudge in the right direction.

                SparkConf conf = new SparkConf();
                conf.setMaster("local[2]");
                conf.setAppName("SparkStreamingFileTest");
                conf.set("spark.cores.max", "1");
                conf.set("spark.executor.memory","1g");

                List<String> inputjarslist = new ArrayList<String>();
               
inputjarslist.add("/home/usr/target/lib/scala-library-2.10.1.jar");
               
inputjarslist.add("/home/usr/target/lib/spark-assembly-1.0.2-hadoop2.2.0.jar");
               
inputjarslist.add("/home/usr/target/lib/spark-streaming_2.10-1.0.2.jar");

                //Seq<String> inputjars = asScalaBuffer(inputjarslist);
                conf.setJars(inputjarslist.toArray(new String[3]));

                StreamingContext scc = new StreamingContext(conf, new
Duration(10000));

                Seq<String> thejars = scc.sc().jars();
                scala.collection.Iterator iter = thejars.iterator();
                if(!(iter.hasNext())) System.out.println("no jars
associated!!");
                while (iter.hasNext()) {
                        System.out.println("Jar in system: "+iter.next());
                }

                Function1<Path,Object> f = new
AbstractFunction1<Path,Object>() {
                        public Boolean apply(Path input){
                          return true;
                        }
                };

                //scala.reflect.ClassTag$.MODULE$.apply(LongWritable.class);

                ClassTag <LongWritable> k =
scala.reflect.ClassTag$.MODULE$.apply(LongWritable.class);
                ClassTag <Text> v
=scala.reflect.ClassTag$.MODULE$.apply(Text.class);
                ClassTag <InputFormat&lt;LongWritable,Text>> t =
scala.reflect.ClassTag$.MODULE$.apply(InputFormat.class);

                InputDStream<Tuple2&lt;LongWritable,Text>> ans =
scc.fileStream("/home/usr/testDataDirectory", f, false, k, v, t);
                //InputDStream<Tuple2&lt;LongWritable,Text>> ans =
scc.fileStream("/home/usr/testDataDirectory",k,v,t);

                ans.print();





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Java-Implementation-of-StreamingContext-fileStream-tp14863.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to