I'm attempting to code a Java only implementation accessing the StreamingContext.fileStream method and am especially interested in setting the boolean "newFilesOnly" to false. Unfortunately my code throws exceptions:
Exception in thread "main" java.lang.InstantiationException at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:374) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:83) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) whenever the files are opened. The exceptions are generated whether or not I invoke the longer form of the fileStream method. I can use the JavaStreamingContext version successfully, but don't have access to the boolean flag in this case. If someone sees an issue with the code below, I would be very grateful for a nudge in the right direction. SparkConf conf = new SparkConf(); conf.setMaster("local[2]"); conf.setAppName("SparkStreamingFileTest"); conf.set("spark.cores.max", "1"); conf.set("spark.executor.memory","1g"); List<String> inputjarslist = new ArrayList<String>(); inputjarslist.add("/home/usr/target/lib/scala-library-2.10.1.jar"); inputjarslist.add("/home/usr/target/lib/spark-assembly-1.0.2-hadoop2.2.0.jar"); inputjarslist.add("/home/usr/target/lib/spark-streaming_2.10-1.0.2.jar"); //Seq<String> inputjars = asScalaBuffer(inputjarslist); conf.setJars(inputjarslist.toArray(new String[3])); StreamingContext scc = new StreamingContext(conf, new Duration(10000)); Seq<String> thejars = scc.sc().jars(); scala.collection.Iterator iter = thejars.iterator(); if(!(iter.hasNext())) System.out.println("no jars associated!!"); while (iter.hasNext()) { System.out.println("Jar in system: "+iter.next()); } Function1<Path,Object> f = new AbstractFunction1<Path,Object>() { public Boolean apply(Path input){ return true; } }; //scala.reflect.ClassTag$.MODULE$.apply(LongWritable.class); ClassTag <LongWritable> k = scala.reflect.ClassTag$.MODULE$.apply(LongWritable.class); ClassTag <Text> v =scala.reflect.ClassTag$.MODULE$.apply(Text.class); ClassTag <InputFormat<LongWritable,Text>> t = scala.reflect.ClassTag$.MODULE$.apply(InputFormat.class); InputDStream<Tuple2<LongWritable,Text>> ans = scc.fileStream("/home/usr/testDataDirectory", f, false, k, v, t); //InputDStream<Tuple2<LongWritable,Text>> ans = scc.fileStream("/home/usr/testDataDirectory",k,v,t); ans.print(); -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Java-Implementation-of-StreamingContext-fileStream-tp14863.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org