Thanks Marcelo. I was having another problem. My code was running properly and then it suddenly stopped with the error:
java.lang.OutOfMemoryError: Java heap space at java.io.BufferedOutputStream.<init>(Unknown Source) at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:62) Can you help in that? On Wed, Jul 9, 2014 at 2:07 AM, Marcelo Vanzin <van...@cloudera.com> wrote: > Sorry, that would be sc.stop() (not close). > > On Tue, Jul 8, 2014 at 1:31 PM, Marcelo Vanzin <van...@cloudera.com> > wrote: > > Hi Rahul, > > > > Can you try calling "sc.close()" at the end of your program, so Spark > > can clean up after itself? > > > > On Tue, Jul 8, 2014 at 12:40 PM, Rahul Bhojwani > > <rahulbhojwani2...@gmail.com> wrote: > >> Here I am adding my code. If you can have a look to help me out. > >> Thanks > >> ####################### > >> > >> import tokenizer > >> import gettingWordLists as gl > >> from pyspark.mllib.classification import NaiveBayes > >> from numpy import array > >> from pyspark import SparkContext, SparkConf > >> > >> conf = (SparkConf().setMaster("local[6]").setAppName("My > >> app").set("spark.executor.memory", "1g")) > >> > >> sc=SparkContext(conf = conf) > >> # Getting the positive dict: > >> > >> pos_list = [] > >> pos_list = gl.getPositiveList() > >> neg_list = gl.getNegativeList() > >> > >> #print neg_list > >> tok = tokenizer.Tokenizer(preserve_case=False) > >> train_data = [] > >> > >> with open("training_file_coach.csv","r") as train_file: > >> for line in train_file: > >> tokens = line.split("######") > >> msg = tokens[0] > >> sentiment = tokens[1] > >> pos_count = 0 > >> neg_count = 0 > >> # print sentiment + "\n\n" > >> # print msg > >> tokens = set(tok.tokenize(msg)) > >> for i in tokens: > >> if i.encode('utf-8') in pos_list: > >> pos_count+=1 > >> if i.encode('utf-8') in neg_list: > >> neg_count+=1 > >> if sentiment.__contains__('NEG'): > >> label = 0.0 > >> else: > >> label = 1.0 > >> > >> feature = [] > >> feature.append(label) > >> feature.append(float(pos_count)) > >> feature.append(float(neg_count)) > >> train_data.append(feature) > >> train_file.close() > >> > >> model = NaiveBayes.train(sc.parallelize(array(train_data))) > >> > >> > >> file_predicted = open("predicted_file_coach.csv","w") > >> > >> with open("prediction_file_coach.csv","r") as predict_file: > >> for line in predict_file: > >> msg = line[0:-1] > >> pos_count = 0 > >> neg_count = 0 > >> # print sentiment + "\n\n" > >> # print msg > >> tokens = set(tok.tokenize(msg)) > >> for i in tokens: > >> if i.encode('utf-8') in pos_list: > >> pos_count+=1 > >> if i.encode('utf-8') in neg_list: > >> neg_count+=1 > >> prediction = > >> model.predict(array([float(pos_count),float(neg_count)])) > >> if prediction == 0: > >> sentiment = "NEG" > >> elif prediction == 1: > >> sentiment = "POS" > >> else: > >> print "ERROR\n\n\n\n\n\n\nERROR" > >> > >> feature = [] > >> feature.append(float(prediction)) > >> feature.append(float(pos_count)) > >> feature.append(float(neg_count)) > >> print feature > >> train_data.append(feature) > >> model = NaiveBayes.train(sc.parallelize(array(train_data))) > >> file_predicted.write(msg + "######" + sentiment + "\n") > >> > >> file_predicted.close() > >> ################### > >> > >> If you can have a look at the code and help me out, It would be great > >> > >> Thanks > >> > >> > >> On Wed, Jul 9, 2014 at 12:54 AM, Rahul Bhojwani > >> <rahulbhojwani2...@gmail.com> wrote: > >>> > >>> Hi Marcelo. > >>> Thanks for the quick reply. Can you suggest me how to increase the > memory > >>> limits or how to tackle this problem. I am a novice. If you want I can > post > >>> my code here. > >>> > >>> > >>> Thanks > >>> > >>> > >>> On Wed, Jul 9, 2014 at 12:50 AM, Marcelo Vanzin <van...@cloudera.com> > >>> wrote: > >>>> > >>>> This is generally a side effect of your executor being killed. For > >>>> example, Yarn will do that if you're going over the requested memory > >>>> limits. > >>>> > >>>> On Tue, Jul 8, 2014 at 12:17 PM, Rahul Bhojwani > >>>> <rahulbhojwani2...@gmail.com> wrote: > >>>> > HI, > >>>> > > >>>> > I am getting this error. Can anyone help out to explain why is this > >>>> > error > >>>> > coming. > >>>> > > >>>> > ######## > >>>> > > >>>> > Exception in thread "delete Spark temp dir > >>>> > > >>>> > > C:\Users\shawn\AppData\Local\Temp\spark-27f60467-36d4-4081-aaf5-d0ad42dda560" > >>>> > java.io.IOException: Failed to delete: > >>>> > > >>>> > > C:\Users\shawn\AppData\Local\Temp\spark-27f60467-36d4-4081-aaf5-d0ad42dda560\tmp > >>>> > cmenlp > >>>> > at > >>>> > org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:483) > >>>> > at > >>>> > > >>>> > > org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:479) > >>>> > at > >>>> > > >>>> > > org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:478) > >>>> > at > >>>> > > >>>> > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > >>>> > at > >>>> > scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) > >>>> > at > >>>> > org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:478) > >>>> > at org.apache.spark.util.Utils$$anon$4.run(Utils.scala:212) > >>>> > PS> > >>>> > ############ > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > Thanks in advance > >>>> > -- > >>>> > Rahul K Bhojwani > >>>> > 3rd Year B.Tech > >>>> > Computer Science and Engineering > >>>> > National Institute of Technology, Karnataka > >>>> > >>>> > >>>> > >>>> -- > >>>> Marcelo > >>> > >>> > >>> > >>> > >>> -- > >>> Rahul K Bhojwani > >>> 3rd Year B.Tech > >>> Computer Science and Engineering > >>> National Institute of Technology, Karnataka > >> > >> > >> > >> > >> -- > >> Rahul K Bhojwani > >> 3rd Year B.Tech > >> Computer Science and Engineering > >> National Institute of Technology, Karnataka > > > > > > > > -- > > Marcelo > > > > -- > Marcelo > -- Rahul K Bhojwani 3rd Year B.Tech Computer Science and Engineering National Institute of Technology, Karnataka