Thanks a ton! That worked.

However, this may have performance issue. As for each partition, I'd need
to restart the server, that was the basic reason I was creating graphDb
object outside this loop.

On Fri, Mar 13, 2015 at 5:34 AM, Tathagata Das <t...@databricks.com> wrote:

> (Putting user@spark back in the to list)
>
> In the gist, you are creating graphDB object way outside the
> RDD.foreachPartition. I said last time, create the graphDB object inside
> the RDD.foreachPartition. You are creating it outside DStream.foreachRDD,
> and then using it from inside the rdd.foreachPartition. That is bringing
> the graphDB object in the task closure, and hence the system is trying to
> serialize the graphDB object when its serializing the closure. If you
> create the graphDB object inside the RDD.foreachPartition, then the closure
> will not refer to any prior graphDB object and therefore not serialize
> anything.
>
> On Thu, Mar 12, 2015 at 3:46 AM, Gautam Bajaj <gautam1...@gmail.com>
> wrote:
>
>> Here: https://gist.github.com/d34th4ck3r/0c99d1e9fa288e0cc8ab
>>
>> I'll add the flag and send you stack trace, I have meetings now.
>>
>> On Thu, Mar 12, 2015 at 6:28 PM, Tathagata Das <t...@databricks.com>
>> wrote:
>>
>>> Could you show us that version of the code?
>>>
>>> Also helps to turn on java flag of extended debug info. That will show
>>> the lineage of objects leading to the nonserilaizable one.
>>> On Mar 12, 2015 1:32 AM, "Gautam Bajaj" <gautam1...@gmail.com> wrote:
>>>
>>>> I tried that too. It result in same serializability issue.
>>>>
>>>> GraphDatabaseSerive that I'm using is : GraphDatabaseFactory() :
>>>> http://neo4j.com/api_docs/2.0.0/org/neo4j/graphdb/factory/GraphDatabaseFactory.html
>>>>
>>>> On Thu, Mar 12, 2015 at 5:21 PM, Tathagata Das <t...@databricks.com>
>>>> wrote:
>>>>
>>>>> What is GraphDatabaseService object that you are using? Instead of
>>>>> creating them on the driver (outside foreachRDD), can you create them
>>>>> inside the RDD.foreach?
>>>>>
>>>>> In general, the right pattern for doing this in the programming guide
>>>>>
>>>>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
>>>>>
>>>>> So you should be doing (sorry for writing in scala)
>>>>>
>>>>> dstream.foreachRDD ((rdd: RDD, time: Time) => {
>>>>>     rdd.foreachPartition(iterator =>
>>>>>         // Create GraphDatabaseService object, or fetch it from a
>>>>> pool of GraphDatabaseService objects
>>>>>         // Use it to send the whole partition to Neo4j
>>>>>         // Destroy the object or release it to the pool
>>>>> })
>>>>>
>>>>>
>>>>> On Thu, Mar 12, 2015 at 1:15 AM, Gautam Bajaj <gautam1...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Neo4j is running externally. It has nothing to do with Spark
>>>>>> processes.
>>>>>>
>>>>>> Basically, the problem is, I'm unable to figure out a way to store
>>>>>> output of Spark on the database. As Spark Streaming requires Neo4j Core
>>>>>> Java API to be serializable as well.
>>>>>>
>>>>>> The answer points out to using REST API but their performance is
>>>>>> really poor when compared to Core Java API :
>>>>>> http://www.rene-pickhardt.de/get-the-full-neo4j-power-by-using-the-core-java-api-for-traversing-your-graph-data-base-instead-of-cypher-query-language/
>>>>>>
>>>>>> On Thu, Mar 12, 2015 at 5:09 PM, Tathagata Das <t...@databricks.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Well the answers you got there are correct as well.
>>>>>>> Unfortunately I am not familiar with Neo4j enough to comment any
>>>>>>> more. Is the Neo4j graph database running externally (outside Spark
>>>>>>> cluster), or within the driver process, or on all the executors? Can you
>>>>>>> clarify that?
>>>>>>>
>>>>>>> TD
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj <gautam1...@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Alright, I have also asked this question in StackOverflow:
>>>>>>>> http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark
>>>>>>>>
>>>>>>>> The code there is pretty neat.
>>>>>>>>
>>>>>>>> On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das <t...@databricks.com
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> I am not sure if you realized but the code snipper it pretty
>>>>>>>>> mangled up in the email we received. It might be a good idea to put 
>>>>>>>>> the
>>>>>>>>> code in pastebin or gist, much much easier for everyone to read.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r <gautam1...@gmail.com
>>>>>>>>> > wrote:
>>>>>>>>>
>>>>>>>>>> I'm trying to use Neo4j with Apache Spark Streaming but I am
>>>>>>>>>> finding
>>>>>>>>>> serializability as an issue.
>>>>>>>>>>
>>>>>>>>>> Basically, I want Apache Spark to parse and bundle my data in
>>>>>>>>>> real time.
>>>>>>>>>> After, the data has been bundled it should be stored in the
>>>>>>>>>> database, Neo4j.
>>>>>>>>>> However, I am getting this error:
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.SparkException: Task not serializable
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>>>>>>>>>>     at
>>>>>>>>>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>>>>>>>>>>     at
>>>>>>>>>> org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
>>>>>>>>>>     at
>>>>>>>>>> org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
>>>>>>>>>>     at twoGrams.Main$4.call(Main.java:102)
>>>>>>>>>>     at twoGrams.Main$4.call(Main.java:1)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>>>>>>>>     at scala.util.Try$.apply(Try.scala:161)
>>>>>>>>>>     at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>> Caused by: java.io.NotSerializableException:
>>>>>>>>>> org.neo4j.kernel.EmbeddedGraphDatabase
>>>>>>>>>>     at
>>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>>>>>>     at
>>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>>>>>>     at
>>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>>>>>>     at
>>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>>>>>>     at
>>>>>>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
>>>>>>>>>>     at
>>>>>>>>>>
>>>>>>>>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
>>>>>>>>>>     ... 17 more
>>>>>>>>>> Here is my code:
>>>>>>>>>>
>>>>>>>>>> output a stream of type: JavaPairDStream<String,
>>>>>>>>>> ArrayList&lt;String>>
>>>>>>>>>>
>>>>>>>>>> output.foreachRDD(
>>>>>>>>>>                 new
>>>>>>>>>>
>>>>>>>>>> Function2<JavaPairRDD&lt;String,ArrayList&lt;String>>,Time,Void>(){
>>>>>>>>>>
>>>>>>>>>>                     @Override
>>>>>>>>>>                     public Void call(
>>>>>>>>>>                             JavaPairRDD<String,
>>>>>>>>>> ArrayList&lt;String>> arg0,
>>>>>>>>>>                             Time arg1) throws Exception {
>>>>>>>>>>                         // TODO Auto-generated method stub
>>>>>>>>>>
>>>>>>>>>>                         arg0.foreach(
>>>>>>>>>>                                 new
>>>>>>>>>> VoidFunction<Tuple2&lt;String,ArrayList&lt;String>>>(){
>>>>>>>>>>
>>>>>>>>>>                                     @Override
>>>>>>>>>>                                     public void call(
>>>>>>>>>>                                             Tuple2<String,
>>>>>>>>>> ArrayList&lt;String>> arg0)
>>>>>>>>>>                                             throws Exception {
>>>>>>>>>>                                         // TODO Auto-generated
>>>>>>>>>> method stub
>>>>>>>>>>                                         try( Transaction tx =
>>>>>>>>>> graphDB.beginTx()){
>>>>>>>>>>
>>>>>>>>>> if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null)
>>>>>>>>>>
>>>>>>>>>> System.out.println("Alread
>>>>>>>>>> in Database:" + arg0._1);
>>>>>>>>>>                                             else{
>>>>>>>>>>
>>>>>>>>>> Neo4jOperations.createHMac(graphDB, arg0._1);
>>>>>>>>>>                                             }
>>>>>>>>>>                                             tx.success();
>>>>>>>>>>                                         }
>>>>>>>>>>                                     }
>>>>>>>>>>
>>>>>>>>>>                         });
>>>>>>>>>>                         return null;
>>>>>>>>>>                     }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>                 });
>>>>>>>>>> Neo4jOperations Class:
>>>>>>>>>>
>>>>>>>>>> public class Neo4jOperations{
>>>>>>>>>>
>>>>>>>>>> public static Node getHMacFromValue(GraphDatabaseService
>>>>>>>>>> graphDB,String
>>>>>>>>>> value){
>>>>>>>>>>         try(ResourceIterator<Node>
>>>>>>>>>>
>>>>>>>>>> HMacs=graphDB.findNodesByLabelAndProperty(DynamicLabel.label("HMac"),
>>>>>>>>>> "value", value).iterator()){
>>>>>>>>>>             return HMacs.next();
>>>>>>>>>>         }
>>>>>>>>>>     }
>>>>>>>>>>
>>>>>>>>>>     public static void createHMac(GraphDatabaseService
>>>>>>>>>> graphDB,String
>>>>>>>>>> value){
>>>>>>>>>>         Node HMac=graphDB.createNode(DynamicLabel.label("HMac"));
>>>>>>>>>>         HMac.setProperty("value", value);
>>>>>>>>>>         HMac.setProperty("time", new
>>>>>>>>>>
>>>>>>>>>> SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
>>>>>>>>>>     }
>>>>>>>>>> }
>>>>>>>>>> I know that I have to Serialize the class Neo4jOperations, but
>>>>>>>>>> I'm able to
>>>>>>>>>> figure out how. Or is there any other way to achieve this?
>>>>>>>>>>
>>>>>>>>>> Also, how can I store output of Spark Streaming in a database ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> View this message in context:
>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Using-Neo4j-with-Apache-Spark-tp22012.html
>>>>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>>>>> Nabble.com.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Gautam
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Gautam
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Gautam
>>>>
>>>
>>
>>
>> --
>> Gautam
>>
>
>


-- 
Gautam

Reply via email to