Hello Akhil, Thank you for taking your time for a detailed answer. I managed to solve it in a very similar manner.
Kind regards, Emre Sevinç On Mon, Feb 2, 2015 at 8:22 PM, Akhil Das <ak...@sigmoidanalytics.com> wrote: > Hi Emre, > > This is how you do that in scala: > > val lines = ssc.fileStream[LongWritable, Text, > TextInputFormat]("/home/akhld/sigmoid", (t: Path) => true, true) > > In java you can do something like: > > jssc.ssc().<LongWritable, Text, > SequenceFileInputFormat>fileStream("/home/akhld/sigmoid", new > AbstractFunction1<Path, Object>() { > @Override > public Boolean apply(Path input) { > //file filtering logic here > > return true; > } > }, true, ClassTag$.MODULE$.apply(LongWritable.class), > ClassTag$.MODULE$.apply(Text.class), > ClassTag$.MODULE$.apply(SequenceFileInputFormat.class)); > > > > > > Thanks > Best Regards > > On Mon, Feb 2, 2015 at 6:34 PM, Emre Sevinc <emre.sev...@gmail.com> wrote: > >> Hello, >> >> I'm using Apache Spark Streaming 1.2.0 and trying to define a file filter >> for file names when creating an InputDStream >> <https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/dstream/InputDStream.html> >> by invoking the fileStream >> <https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.Function1,%20boolean,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29> >> method. My code is working perfectly fine when I don't use a file filter, >> e.g. by invoking the other fileStream >> <https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29> >> method (described here >> <https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29>: >> >> https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/streaming/StreamingContext.html#fileStream%28java.lang.String,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag,%20scala.reflect.ClassTag%29 >> ). >> >> According to the documentation of *fileStream* method, I can pass it >> >> scala.Function1<org.apache.hadoop.fs.Path,Object> filter >> >> But so far, I could not create a fileFilter. My initial attempts have >> been >> >> 1- Tried to implement it as: >> >> Function1<Path, Object> fileFilter = new Function1<Path, Object>() { >> @Override >> public Object apply(Path v1) { >> return true; >> } >> >> @Override >> public <A> Function1<A, Object> compose(Function1<A, Path> g) { >> return Function1$class.compose(this, g); >> } >> >> @Override >> public <A> Function1<Path, A> andThen(Function1<Object, A> g) { >> return Function1$class.andThen(this, g); >> } >> }; >> >> But apparently my implementation of andThen is wrong, and I couldn't >> understand how I should implement it. It complains that the anonymous >> function: >> >> is not abstract and does not override abstract method >> <A>andThen$mcVJ$sp(scala.Function1<scala.runtime.BoxedUnit,A>) in >> scala.Function1 >> >> 2- Tried to implement it as: >> >> Function1<Path, Object> fileFilter = new AbstractFunction1<Path, Object>() { >> @Override >> public Object apply(Path v1) { >> return true; >> } >> }; >> >> This one compiles, but when I run it I get an exception: >> >> 2015-02-02 13:42:50 ERROR OneForOneStrategy:66 - myModule$1 >> java.io.NotSerializableException: myModule$1 >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >> at >> java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) >> at >> org.apache.spark.streaming.DStreamGraph$$anonfun$writeObject$1.apply$mcV$sp(DStreamGraph.scala:169) >> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985) >> at >> org.apache.spark.streaming.DStreamGraph.writeObject(DStreamGraph.scala:164) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:483) >> at >> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> at >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) >> at >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >> at >> org.apache.spark.streaming.CheckpointWriter.write(Checkpoint.scala:184) >> at >> org.apache.spark.streaming.scheduler.JobGenerator.doCheckpoint(JobGenerator.scala:263) >> at >> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:167) >> at >> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76) >> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >> at >> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1.aroundReceive(JobGenerator.scala:74) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >> >> Any ideas how I can implement a *fileFilter* so that I can pass it >> fileStream method, so that I can make Spark Streaming process only the file >> name patterns I want? >> >> >> -- >> >> Emre Sevinç >> >> >> > -- Emre Sevinc