Thanks a ton! That worked. However, this may have performance issue. As for each partition, I'd need to restart the server, that was the basic reason I was creating graphDb object outside this loop.
On Fri, Mar 13, 2015 at 5:34 AM, Tathagata Das <t...@databricks.com> wrote: > (Putting user@spark back in the to list) > > In the gist, you are creating graphDB object way outside the > RDD.foreachPartition. I said last time, create the graphDB object inside > the RDD.foreachPartition. You are creating it outside DStream.foreachRDD, > and then using it from inside the rdd.foreachPartition. That is bringing > the graphDB object in the task closure, and hence the system is trying to > serialize the graphDB object when its serializing the closure. If you > create the graphDB object inside the RDD.foreachPartition, then the closure > will not refer to any prior graphDB object and therefore not serialize > anything. > > On Thu, Mar 12, 2015 at 3:46 AM, Gautam Bajaj <gautam1...@gmail.com> > wrote: > >> Here: https://gist.github.com/d34th4ck3r/0c99d1e9fa288e0cc8ab >> >> I'll add the flag and send you stack trace, I have meetings now. >> >> On Thu, Mar 12, 2015 at 6:28 PM, Tathagata Das <t...@databricks.com> >> wrote: >> >>> Could you show us that version of the code? >>> >>> Also helps to turn on java flag of extended debug info. That will show >>> the lineage of objects leading to the nonserilaizable one. >>> On Mar 12, 2015 1:32 AM, "Gautam Bajaj" <gautam1...@gmail.com> wrote: >>> >>>> I tried that too. It result in same serializability issue. >>>> >>>> GraphDatabaseSerive that I'm using is : GraphDatabaseFactory() : >>>> http://neo4j.com/api_docs/2.0.0/org/neo4j/graphdb/factory/GraphDatabaseFactory.html >>>> >>>> On Thu, Mar 12, 2015 at 5:21 PM, Tathagata Das <t...@databricks.com> >>>> wrote: >>>> >>>>> What is GraphDatabaseService object that you are using? Instead of >>>>> creating them on the driver (outside foreachRDD), can you create them >>>>> inside the RDD.foreach? >>>>> >>>>> In general, the right pattern for doing this in the programming guide >>>>> >>>>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd >>>>> >>>>> So you should be doing (sorry for writing in scala) >>>>> >>>>> dstream.foreachRDD ((rdd: RDD, time: Time) => { >>>>> rdd.foreachPartition(iterator => >>>>> // Create GraphDatabaseService object, or fetch it from a >>>>> pool of GraphDatabaseService objects >>>>> // Use it to send the whole partition to Neo4j >>>>> // Destroy the object or release it to the pool >>>>> }) >>>>> >>>>> >>>>> On Thu, Mar 12, 2015 at 1:15 AM, Gautam Bajaj <gautam1...@gmail.com> >>>>> wrote: >>>>> >>>>>> Neo4j is running externally. It has nothing to do with Spark >>>>>> processes. >>>>>> >>>>>> Basically, the problem is, I'm unable to figure out a way to store >>>>>> output of Spark on the database. As Spark Streaming requires Neo4j Core >>>>>> Java API to be serializable as well. >>>>>> >>>>>> The answer points out to using REST API but their performance is >>>>>> really poor when compared to Core Java API : >>>>>> http://www.rene-pickhardt.de/get-the-full-neo4j-power-by-using-the-core-java-api-for-traversing-your-graph-data-base-instead-of-cypher-query-language/ >>>>>> >>>>>> On Thu, Mar 12, 2015 at 5:09 PM, Tathagata Das <t...@databricks.com> >>>>>> wrote: >>>>>> >>>>>>> Well the answers you got there are correct as well. >>>>>>> Unfortunately I am not familiar with Neo4j enough to comment any >>>>>>> more. Is the Neo4j graph database running externally (outside Spark >>>>>>> cluster), or within the driver process, or on all the executors? Can you >>>>>>> clarify that? >>>>>>> >>>>>>> TD >>>>>>> >>>>>>> >>>>>>> On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj <gautam1...@gmail.com >>>>>>> > wrote: >>>>>>> >>>>>>>> Alright, I have also asked this question in StackOverflow: >>>>>>>> http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark >>>>>>>> >>>>>>>> The code there is pretty neat. >>>>>>>> >>>>>>>> On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das <t...@databricks.com >>>>>>>> > wrote: >>>>>>>> >>>>>>>>> I am not sure if you realized but the code snipper it pretty >>>>>>>>> mangled up in the email we received. It might be a good idea to put >>>>>>>>> the >>>>>>>>> code in pastebin or gist, much much easier for everyone to read. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r <gautam1...@gmail.com >>>>>>>>> > wrote: >>>>>>>>> >>>>>>>>>> I'm trying to use Neo4j with Apache Spark Streaming but I am >>>>>>>>>> finding >>>>>>>>>> serializability as an issue. >>>>>>>>>> >>>>>>>>>> Basically, I want Apache Spark to parse and bundle my data in >>>>>>>>>> real time. >>>>>>>>>> After, the data has been bundled it should be stored in the >>>>>>>>>> database, Neo4j. >>>>>>>>>> However, I am getting this error: >>>>>>>>>> >>>>>>>>>> org.apache.spark.SparkException: Task not serializable >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.SparkContext.clean(SparkContext.scala:1264) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297) >>>>>>>>>> at >>>>>>>>>> org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45) >>>>>>>>>> at twoGrams.Main$4.call(Main.java:102) >>>>>>>>>> at twoGrams.Main$4.call(Main.java:1) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) >>>>>>>>>> at scala.util.Try$.apply(Try.scala:161) >>>>>>>>>> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>>>>> Caused by: java.io.NotSerializableException: >>>>>>>>>> org.neo4j.kernel.EmbeddedGraphDatabase >>>>>>>>>> at >>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >>>>>>>>>> at >>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >>>>>>>>>> at >>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >>>>>>>>>> at >>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >>>>>>>>>> at >>>>>>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) >>>>>>>>>> at >>>>>>>>>> >>>>>>>>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) >>>>>>>>>> ... 17 more >>>>>>>>>> Here is my code: >>>>>>>>>> >>>>>>>>>> output a stream of type: JavaPairDStream<String, >>>>>>>>>> ArrayList<String>> >>>>>>>>>> >>>>>>>>>> output.foreachRDD( >>>>>>>>>> new >>>>>>>>>> >>>>>>>>>> Function2<JavaPairRDD<String,ArrayList<String>>,Time,Void>(){ >>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>> public Void call( >>>>>>>>>> JavaPairRDD<String, >>>>>>>>>> ArrayList<String>> arg0, >>>>>>>>>> Time arg1) throws Exception { >>>>>>>>>> // TODO Auto-generated method stub >>>>>>>>>> >>>>>>>>>> arg0.foreach( >>>>>>>>>> new >>>>>>>>>> VoidFunction<Tuple2<String,ArrayList<String>>>(){ >>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>> public void call( >>>>>>>>>> Tuple2<String, >>>>>>>>>> ArrayList<String>> arg0) >>>>>>>>>> throws Exception { >>>>>>>>>> // TODO Auto-generated >>>>>>>>>> method stub >>>>>>>>>> try( Transaction tx = >>>>>>>>>> graphDB.beginTx()){ >>>>>>>>>> >>>>>>>>>> if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null) >>>>>>>>>> >>>>>>>>>> System.out.println("Alread >>>>>>>>>> in Database:" + arg0._1); >>>>>>>>>> else{ >>>>>>>>>> >>>>>>>>>> Neo4jOperations.createHMac(graphDB, arg0._1); >>>>>>>>>> } >>>>>>>>>> tx.success(); >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> }); >>>>>>>>>> return null; >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> }); >>>>>>>>>> Neo4jOperations Class: >>>>>>>>>> >>>>>>>>>> public class Neo4jOperations{ >>>>>>>>>> >>>>>>>>>> public static Node getHMacFromValue(GraphDatabaseService >>>>>>>>>> graphDB,String >>>>>>>>>> value){ >>>>>>>>>> try(ResourceIterator<Node> >>>>>>>>>> >>>>>>>>>> HMacs=graphDB.findNodesByLabelAndProperty(DynamicLabel.label("HMac"), >>>>>>>>>> "value", value).iterator()){ >>>>>>>>>> return HMacs.next(); >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> public static void createHMac(GraphDatabaseService >>>>>>>>>> graphDB,String >>>>>>>>>> value){ >>>>>>>>>> Node HMac=graphDB.createNode(DynamicLabel.label("HMac")); >>>>>>>>>> HMac.setProperty("value", value); >>>>>>>>>> HMac.setProperty("time", new >>>>>>>>>> >>>>>>>>>> SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime())); >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> I know that I have to Serialize the class Neo4jOperations, but >>>>>>>>>> I'm able to >>>>>>>>>> figure out how. Or is there any other way to achieve this? >>>>>>>>>> >>>>>>>>>> Also, how can I store output of Spark Streaming in a database ? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> View this message in context: >>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Using-Neo4j-with-Apache-Spark-tp22012.html >>>>>>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>>>>>> Nabble.com. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> --------------------------------------------------------------------- >>>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Gautam >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Gautam >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Gautam >>>> >>> >> >> >> -- >> Gautam >> > > -- Gautam