Alright, I have also asked this question in StackOverflow:
http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark

The code there is pretty neat.

On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das <t...@databricks.com> wrote:

> I am not sure if you realized but the code snipper it pretty mangled up in
> the email we received. It might be a good idea to put the code in pastebin
> or gist, much much easier for everyone to read.
>
>
> On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r <gautam1...@gmail.com> wrote:
>
>> I'm trying to use Neo4j with Apache Spark Streaming but I am finding
>> serializability as an issue.
>>
>> Basically, I want Apache Spark to parse and bundle my data in real time.
>> After, the data has been bundled it should be stored in the database,
>> Neo4j.
>> However, I am getting this error:
>>
>> org.apache.spark.SparkException: Task not serializable
>>     at
>>
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>>     at
>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>>     at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
>>     at
>> org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
>>     at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
>>     at twoGrams.Main$4.call(Main.java:102)
>>     at twoGrams.Main$4.call(Main.java:1)
>>     at
>>
>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>>     at
>>
>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>>     at
>>
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
>>     at
>>
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>     at
>>
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>     at scala.util.Try$.apply(Try.scala:161)
>>     at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>>     at
>>
>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
>>     at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>     at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>     at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.NotSerializableException:
>> org.neo4j.kernel.EmbeddedGraphDatabase
>>     at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>>     at
>>
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>     at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>     at
>>
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>     at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>     at
>>
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>     at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>     at
>>
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>     at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>     at
>>
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>     at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>     at
>>
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>     at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>     at
>>
>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>>     at
>>
>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
>>     at
>>
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
>>     ... 17 more
>> Here is my code:
>>
>> output a stream of type: JavaPairDStream<String, ArrayList&lt;String>>
>>
>> output.foreachRDD(
>>                 new
>> Function2<JavaPairRDD&lt;String,ArrayList&lt;String>>,Time,Void>(){
>>
>>                     @Override
>>                     public Void call(
>>                             JavaPairRDD<String, ArrayList&lt;String>>
>> arg0,
>>                             Time arg1) throws Exception {
>>                         // TODO Auto-generated method stub
>>
>>                         arg0.foreach(
>>                                 new
>> VoidFunction<Tuple2&lt;String,ArrayList&lt;String>>>(){
>>
>>                                     @Override
>>                                     public void call(
>>                                             Tuple2<String,
>> ArrayList&lt;String>> arg0)
>>                                             throws Exception {
>>                                         // TODO Auto-generated method stub
>>                                         try( Transaction tx =
>> graphDB.beginTx()){
>>
>> if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null)
>>                                                 System.out.println("Alread
>> in Database:" + arg0._1);
>>                                             else{
>>
>> Neo4jOperations.createHMac(graphDB, arg0._1);
>>                                             }
>>                                             tx.success();
>>                                         }
>>                                     }
>>
>>                         });
>>                         return null;
>>                     }
>>
>>
>>
>>                 });
>> Neo4jOperations Class:
>>
>> public class Neo4jOperations{
>>
>> public static Node getHMacFromValue(GraphDatabaseService graphDB,String
>> value){
>>         try(ResourceIterator<Node>
>> HMacs=graphDB.findNodesByLabelAndProperty(DynamicLabel.label("HMac"),
>> "value", value).iterator()){
>>             return HMacs.next();
>>         }
>>     }
>>
>>     public static void createHMac(GraphDatabaseService graphDB,String
>> value){
>>         Node HMac=graphDB.createNode(DynamicLabel.label("HMac"));
>>         HMac.setProperty("value", value);
>>         HMac.setProperty("time", new
>>
>> SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
>>     }
>> }
>> I know that I have to Serialize the class Neo4jOperations, but I'm able to
>> figure out how. Or is there any other way to achieve this?
>>
>> Also, how can I store output of Spark Streaming in a database ?
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Using-Neo4j-with-Apache-Spark-tp22012.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-- 
Gautam

Reply via email to