Alright, I have also asked this question in StackOverflow: http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark
The code there is pretty neat. On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das <t...@databricks.com> wrote: > I am not sure if you realized but the code snipper it pretty mangled up in > the email we received. It might be a good idea to put the code in pastebin > or gist, much much easier for everyone to read. > > > On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r <gautam1...@gmail.com> wrote: > >> I'm trying to use Neo4j with Apache Spark Streaming but I am finding >> serializability as an issue. >> >> Basically, I want Apache Spark to parse and bundle my data in real time. >> After, the data has been bundled it should be stored in the database, >> Neo4j. >> However, I am getting this error: >> >> org.apache.spark.SparkException: Task not serializable >> at >> >> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) >> at >> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) >> at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) >> at >> org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297) >> at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45) >> at twoGrams.Main$4.call(Main.java:102) >> at twoGrams.Main$4.call(Main.java:1) >> at >> >> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) >> at >> >> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) >> at >> >> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) >> at >> >> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) >> at >> >> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) >> at scala.util.Try$.apply(Try.scala:161) >> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) >> at >> >> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) >> at >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> at >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.io.NotSerializableException: >> org.neo4j.kernel.EmbeddedGraphDatabase >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) >> at >> >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >> at >> >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >> at >> >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >> at >> >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >> at >> >> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >> at >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >> at >> >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) >> at >> >> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) >> at >> >> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) >> at >> >> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) >> ... 17 more >> Here is my code: >> >> output a stream of type: JavaPairDStream<String, ArrayList<String>> >> >> output.foreachRDD( >> new >> Function2<JavaPairRDD<String,ArrayList<String>>,Time,Void>(){ >> >> @Override >> public Void call( >> JavaPairRDD<String, ArrayList<String>> >> arg0, >> Time arg1) throws Exception { >> // TODO Auto-generated method stub >> >> arg0.foreach( >> new >> VoidFunction<Tuple2<String,ArrayList<String>>>(){ >> >> @Override >> public void call( >> Tuple2<String, >> ArrayList<String>> arg0) >> throws Exception { >> // TODO Auto-generated method stub >> try( Transaction tx = >> graphDB.beginTx()){ >> >> if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null) >> System.out.println("Alread >> in Database:" + arg0._1); >> else{ >> >> Neo4jOperations.createHMac(graphDB, arg0._1); >> } >> tx.success(); >> } >> } >> >> }); >> return null; >> } >> >> >> >> }); >> Neo4jOperations Class: >> >> public class Neo4jOperations{ >> >> public static Node getHMacFromValue(GraphDatabaseService graphDB,String >> value){ >> try(ResourceIterator<Node> >> HMacs=graphDB.findNodesByLabelAndProperty(DynamicLabel.label("HMac"), >> "value", value).iterator()){ >> return HMacs.next(); >> } >> } >> >> public static void createHMac(GraphDatabaseService graphDB,String >> value){ >> Node HMac=graphDB.createNode(DynamicLabel.label("HMac")); >> HMac.setProperty("value", value); >> HMac.setProperty("time", new >> >> SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime())); >> } >> } >> I know that I have to Serialize the class Neo4jOperations, but I'm able to >> figure out how. Or is there any other way to achieve this? >> >> Also, how can I store output of Spark Streaming in a database ? >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Using-Neo4j-with-Apache-Spark-tp22012.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> > -- Gautam