I tried that too. It result in same serializability issue. GraphDatabaseSerive that I'm using is : GraphDatabaseFactory() : http://neo4j.com/api_docs/2.0.0/org/neo4j/graphdb/factory/GraphDatabaseFactory.html
On Thu, Mar 12, 2015 at 5:21 PM, Tathagata Das <t...@databricks.com> wrote: > What is GraphDatabaseService object that you are using? Instead of > creating them on the driver (outside foreachRDD), can you create them > inside the RDD.foreach? > > In general, the right pattern for doing this in the programming guide > > http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd > > So you should be doing (sorry for writing in scala) > > dstream.foreachRDD ((rdd: RDD, time: Time) => { > rdd.foreachPartition(iterator => > // Create GraphDatabaseService object, or fetch it from a pool of > GraphDatabaseService > objects > // Use it to send the whole partition to Neo4j > // Destroy the object or release it to the pool > }) > > > On Thu, Mar 12, 2015 at 1:15 AM, Gautam Bajaj <gautam1...@gmail.com> > wrote: > >> Neo4j is running externally. It has nothing to do with Spark processes. >> >> Basically, the problem is, I'm unable to figure out a way to store output >> of Spark on the database. As Spark Streaming requires Neo4j Core Java API >> to be serializable as well. >> >> The answer points out to using REST API but their performance is really >> poor when compared to Core Java API : >> http://www.rene-pickhardt.de/get-the-full-neo4j-power-by-using-the-core-java-api-for-traversing-your-graph-data-base-instead-of-cypher-query-language/ >> >> On Thu, Mar 12, 2015 at 5:09 PM, Tathagata Das <t...@databricks.com> >> wrote: >> >>> Well the answers you got there are correct as well. >>> Unfortunately I am not familiar with Neo4j enough to comment any more. >>> Is the Neo4j graph database running externally (outside Spark cluster), or >>> within the driver process, or on all the executors? Can you clarify that? >>> >>> TD >>> >>> >>> On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj <gautam1...@gmail.com> >>> wrote: >>> >>>> Alright, I have also asked this question in StackOverflow: >>>> http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark >>>> >>>> The code there is pretty neat. >>>> >>>> On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das <t...@databricks.com> >>>> wrote: >>>> >>>>> I am not sure if you realized but the code snipper it pretty mangled >>>>> up in the email we received. It might be a good idea to put the code in >>>>> pastebin or gist, much much easier for everyone to read. >>>>> >>>>> >>>>> On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r <gautam1...@gmail.com> >>>>> wrote: >>>>> >>>>>> I'm trying to use Neo4j with Apache Spark Streaming but I am finding >>>>>> serializability as an issue. >>>>>> >>>>>> Basically, I want Apache Spark to parse and bundle my data in real >>>>>> time. >>>>>> After, the data has been bundled it should be stored in the database, >>>>>> Neo4j. >>>>>> However, I am getting this error: >>>>>> >>>>>> org.apache.spark.SparkException: Task not serializable >>>>>> at >>>>>> >>>>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) >>>>>> at >>>>>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) >>>>>> at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) >>>>>> at >>>>>> >>>>>> org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297) >>>>>> at >>>>>> org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45) >>>>>> at twoGrams.Main$4.call(Main.java:102) >>>>>> at twoGrams.Main$4.call(Main.java:1) >>>>>> at >>>>>> >>>>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) >>>>>> at >>>>>> >>>>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) >>>>>> at >>>>>> >>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) >>>>>> at >>>>>> >>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) >>>>>> at >>>>>> >>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) >>>>>> at scala.util.Try$.apply(Try.scala:161) >>>>>> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) >>>>>> at >>>>>> >>>>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) >>>>>> at >>>>>> >>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>>> at >>>>>> >>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>> Caused by: java.io.NotSerializableException: >>>>>> org.neo4j.kernel.EmbeddedGraphDatabase >>>>>> at >>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) >>>>>> at >>>>>> >>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >>>>>> at >>>>>> >>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >>>>>> at >>>>>> >>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >>>>>> at >>>>>> >>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >>>>>> at >>>>>> >>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >>>>>> at >>>>>> >>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >>>>>> at >>>>>> >>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >>>>>> at >>>>>> >>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >>>>>> at >>>>>> >>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >>>>>> at >>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) >>>>>> at >>>>>> >>>>>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) >>>>>> at >>>>>> >>>>>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) >>>>>> at >>>>>> >>>>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) >>>>>> ... 17 more >>>>>> Here is my code: >>>>>> >>>>>> output a stream of type: JavaPairDStream<String, ArrayList<String>> >>>>>> >>>>>> output.foreachRDD( >>>>>> new >>>>>> Function2<JavaPairRDD<String,ArrayList<String>>,Time,Void>(){ >>>>>> >>>>>> @Override >>>>>> public Void call( >>>>>> JavaPairRDD<String, ArrayList<String>> >>>>>> arg0, >>>>>> Time arg1) throws Exception { >>>>>> // TODO Auto-generated method stub >>>>>> >>>>>> arg0.foreach( >>>>>> new >>>>>> VoidFunction<Tuple2<String,ArrayList<String>>>(){ >>>>>> >>>>>> @Override >>>>>> public void call( >>>>>> Tuple2<String, >>>>>> ArrayList<String>> arg0) >>>>>> throws Exception { >>>>>> // TODO Auto-generated method >>>>>> stub >>>>>> try( Transaction tx = >>>>>> graphDB.beginTx()){ >>>>>> >>>>>> if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null) >>>>>> >>>>>> System.out.println("Alread >>>>>> in Database:" + arg0._1); >>>>>> else{ >>>>>> >>>>>> Neo4jOperations.createHMac(graphDB, arg0._1); >>>>>> } >>>>>> tx.success(); >>>>>> } >>>>>> } >>>>>> >>>>>> }); >>>>>> return null; >>>>>> } >>>>>> >>>>>> >>>>>> >>>>>> }); >>>>>> Neo4jOperations Class: >>>>>> >>>>>> public class Neo4jOperations{ >>>>>> >>>>>> public static Node getHMacFromValue(GraphDatabaseService >>>>>> graphDB,String >>>>>> value){ >>>>>> try(ResourceIterator<Node> >>>>>> HMacs=graphDB.findNodesByLabelAndProperty(DynamicLabel.label("HMac"), >>>>>> "value", value).iterator()){ >>>>>> return HMacs.next(); >>>>>> } >>>>>> } >>>>>> >>>>>> public static void createHMac(GraphDatabaseService graphDB,String >>>>>> value){ >>>>>> Node HMac=graphDB.createNode(DynamicLabel.label("HMac")); >>>>>> HMac.setProperty("value", value); >>>>>> HMac.setProperty("time", new >>>>>> >>>>>> SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime())); >>>>>> } >>>>>> } >>>>>> I know that I have to Serialize the class Neo4jOperations, but I'm >>>>>> able to >>>>>> figure out how. Or is there any other way to achieve this? >>>>>> >>>>>> Also, how can I store output of Spark Streaming in a database ? >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> View this message in context: >>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Using-Neo4j-with-Apache-Spark-tp22012.html >>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>> Nabble.com. >>>>>> >>>>>> --------------------------------------------------------------------- >>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>> >>>>>> >>>>> >>>> >>>> >>>> -- >>>> Gautam >>>> >>> >>> >> >> >> -- >> Gautam >> > > -- Gautam