Hi, pyspark experts, I'm trying to implement a naive Bayes lib with the same interface of pyspark.mllib.classification.NaiveBayes. train() and predict() will be the interfaces.
I finished the train(LabeledPoint), but got trouble in predict() due to SPARK-5063 issue. *Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.* Basically, the model will be save as member var in the class, like self.weight_map. The self.weight_map is from a RDD (after some transformation operation), can be converted to other data structure by using some actions like collect(), collectAsMap(), etc. The predict(test_tfidf)'s input has to be RDD of tfidf, and the output should be able to be zip() with test_labels. The question is: def predict(self, test_tfidf): return test_tfidf.map(lambda v: MyNaiveBayes._predict_v(self.weight_map, v)) No matter which non RDD data structure for self.weight_map, it always complains the 5063 issue. btw, I also tried: 1. broadcast: def predict(self, bb, test_tfidf): return test_tfidf.map(lambda v: MyNaiveBayes._predict_v(bb.value, v)) bb = sc.broadcast(MyNaiveBayes.weight_map) predict(bb, test_tfidf) same error 2. cartesian ret = test_tfidf.cartesian(self.weight_map).groupByKey(): but i need to zip the output with test_labels, this would trigger the partition inconsistency issue with zip(). test_labels.zip(model.predict(test_tfidf)) model.predict() Any suggestion about the right data struct for self.weight_map in the my MyNaiveBayes class?