Hi Rahul,

Can you try calling "sc.close()" at the end of your program, so Spark
can clean up after itself?

On Tue, Jul 8, 2014 at 12:40 PM, Rahul Bhojwani
<rahulbhojwani2...@gmail.com> wrote:
> Here I am adding my code. If you can have a look to help me out.
> Thanks
> #######################
>
> import tokenizer
> import gettingWordLists as gl
> from pyspark.mllib.classification import NaiveBayes
> from numpy import array
> from pyspark import SparkContext, SparkConf
>
> conf = (SparkConf().setMaster("local[6]").setAppName("My
> app").set("spark.executor.memory", "1g"))
>
> sc=SparkContext(conf = conf)
> # Getting the positive dict:
>
> pos_list = []
> pos_list = gl.getPositiveList()
> neg_list = gl.getNegativeList()
>
> #print neg_list
> tok = tokenizer.Tokenizer(preserve_case=False)
> train_data  = []
>
> with open("training_file_coach.csv","r") as train_file:
>     for line in train_file:
>         tokens = line.split("######")
>         msg = tokens[0]
>         sentiment = tokens[1]
>         pos_count = 0
>         neg_count = 0
> #        print sentiment + "\n\n"
> #        print msg
>         tokens = set(tok.tokenize(msg))
>         for i in tokens:
>             if i.encode('utf-8') in pos_list:
>                 pos_count+=1
>             if i.encode('utf-8') in neg_list:
>                 neg_count+=1
>         if sentiment.__contains__('NEG'):
>             label = 0.0
>         else:
>             label = 1.0
>
>         feature = []
>         feature.append(label)
>         feature.append(float(pos_count))
>         feature.append(float(neg_count))
>         train_data.append(feature)
>     train_file.close()
>
> model = NaiveBayes.train(sc.parallelize(array(train_data)))
>
>
> file_predicted = open("predicted_file_coach.csv","w")
>
> with open("prediction_file_coach.csv","r") as predict_file:
>     for line in predict_file:
>         msg = line[0:-1]
>         pos_count = 0
>         neg_count = 0
> #        print sentiment + "\n\n"
> #        print msg
>         tokens = set(tok.tokenize(msg))
>         for i in tokens:
>             if i.encode('utf-8') in pos_list:
>                 pos_count+=1
>             if i.encode('utf-8') in neg_list:
>                 neg_count+=1
>         prediction =
> model.predict(array([float(pos_count),float(neg_count)]))
>         if prediction == 0:
>             sentiment = "NEG"
>         elif prediction == 1:
>             sentiment = "POS"
>         else:
>             print "ERROR\n\n\n\n\n\n\nERROR"
>
>         feature = []
>         feature.append(float(prediction))
>         feature.append(float(pos_count))
>         feature.append(float(neg_count))
>         print feature
>         train_data.append(feature)
>         model = NaiveBayes.train(sc.parallelize(array(train_data)))
>         file_predicted.write(msg + "######" + sentiment + "\n")
>
> file_predicted.close()
> ###################
>
> If you can have a look at the code and help me out, It would be great
>
> Thanks
>
>
> On Wed, Jul 9, 2014 at 12:54 AM, Rahul Bhojwani
> <rahulbhojwani2...@gmail.com> wrote:
>>
>> Hi Marcelo.
>> Thanks for the quick reply. Can you suggest me how to increase the memory
>> limits or how to tackle this problem. I am a novice. If you want I can post
>> my code here.
>>
>>
>> Thanks
>>
>>
>> On Wed, Jul 9, 2014 at 12:50 AM, Marcelo Vanzin <van...@cloudera.com>
>> wrote:
>>>
>>> This is generally a side effect of your executor being killed. For
>>> example, Yarn will do that if you're going over the requested memory
>>> limits.
>>>
>>> On Tue, Jul 8, 2014 at 12:17 PM, Rahul Bhojwani
>>> <rahulbhojwani2...@gmail.com> wrote:
>>> > HI,
>>> >
>>> > I am getting this error. Can anyone help out to explain why is this
>>> > error
>>> > coming.
>>> >
>>> > ########
>>> >
>>> > Exception in thread "delete Spark temp dir
>>> >
>>> > C:\Users\shawn\AppData\Local\Temp\spark-27f60467-36d4-4081-aaf5-d0ad42dda560"
>>> >  java.io.IOException: Failed to delete:
>>> >
>>> > C:\Users\shawn\AppData\Local\Temp\spark-27f60467-36d4-4081-aaf5-d0ad42dda560\tmp
>>> > cmenlp
>>> >         at
>>> > org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:483)
>>> >         at
>>> >
>>> > org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:479)
>>> >         at
>>> >
>>> > org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(Utils.scala:478)
>>> >         at
>>> >
>>> > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>> >         at
>>> > scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>>> >         at
>>> > org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:478)
>>> >         at org.apache.spark.util.Utils$$anon$4.run(Utils.scala:212)
>>> > PS>
>>> > ############
>>> >
>>> >
>>> >
>>> >
>>> > Thanks in advance
>>> > --
>>> > Rahul K Bhojwani
>>> > 3rd Year B.Tech
>>> > Computer Science and Engineering
>>> > National Institute of Technology, Karnataka
>>>
>>>
>>>
>>> --
>>> Marcelo
>>
>>
>>
>>
>> --
>> Rahul K Bhojwani
>> 3rd Year B.Tech
>> Computer Science and Engineering
>> National Institute of Technology, Karnataka
>
>
>
>
> --
> Rahul K Bhojwani
> 3rd Year B.Tech
> Computer Science and Engineering
> National Institute of Technology, Karnataka



-- 
Marcelo

Reply via email to