Hi, Since Spark uses the HDFS API to access files, if you configure HADOOP_CONF_DIR to point to the config files of your Hadoop cluster, Spark will attempt to access files on your HDFS if you leave out the scheme and host name in the URI, i.e. hdfs://predictionspark:9000/ in your case. If HADOOP_CONF_DIR is not configured, Spark will instead attempt to access files locally.
Regardless of whether HADOOP_CONF_DIR is configured, if file URIs are hardcoded with scheme and host name, Spark will always respect it. This is not recommended as any changes to Hadoop config the engine must be updated and recompiled. Here's a link to configuring PIO's storage backend: http://predictionio.apache.org/system/anotherdatastore/ Regards, Donald On Mon, May 14, 2018 at 12:41 AM, 王斌斌 <[email protected]> wrote: > Hi, > I want the tmp models datas been sotred in HDFS, not the local /tmp. > And I modified the code like this: > > class ALSModel( > override val rank: Int, > override val userFeatures: RDD[(Int, Array[Double])], > override val productFeatures: RDD[(Int, Array[Double])], > val userStringIntMap: BiMap[String, Int], > val itemStringIntMap: BiMap[String, Int]) > extends MatrixFactorizationModel(rank, userFeatures, productFeatures) > with PersistentModel[ALSAlgorithmParams] { > > def save(id: String, params: ALSAlgorithmParams, > sc: SparkContext): Boolean = { > > > sc.parallelize(Seq(rank)).saveAsObjectFile(s"hdfs://predictionspark:9000/tmp/${id}/rank") > > userFeatures.saveAsObjectFile(s"hdfs://predictionspark:9000/tmp/${id}/userFeatures") > > productFeatures.saveAsObjectFile(s"hdfs://predictionspark:9000/tmp/${id}/productFeatures") > sc.parallelize(Seq(userStringIntMap)) > > .saveAsObjectFile(s"hdfs://predictionspark:9000/tmp/${id}/userStringIntMap") > sc.parallelize(Seq(itemStringIntMap)) > > .saveAsObjectFile(s"hdfs://predictionspark:9000/tmp/${id}/itemStringIntMap") > true > } > > override def toString = { > s"userFeatures: [${userFeatures.count()}]" + > s"(${userFeatures.take(2).toList}...)" + > s" productFeatures: [${productFeatures.count()}]" + > s"(${productFeatures.take(2).toList}...)" + > s" userStringIntMap: [${userStringIntMap.size}]" + > s"(${userStringIntMap.take(2)}...)" + > s" itemStringIntMap: [${itemStringIntMap.size}]" + > s"(${itemStringIntMap.take(2)}...)" > } > } > > object ALSModel > extends PersistentModelLoader[ALSAlgorithmParams, ALSModel] { > def apply(id: String, params: ALSAlgorithmParams, > sc: Option[SparkContext]) = { > new ALSModel( > rank = > sc.get.objectFile[Int](s"hdfs://predictionspark:9000/tmp/${id}/rank").first, > userFeatures = > sc.get.objectFile(s"hdfs://predictionspark:9000/tmp/${id}/userFeatures"), > productFeatures = > sc.get.objectFile(s"hdfs://predictionspark:9000/tmp/${id}/productFeatures"), > userStringIntMap = sc.get > .objectFile[BiMap[String, > Int]](s"hdfs://predictionspark:9000/tmp/${id}/userStringIntMap").first, > itemStringIntMap = sc.get > .objectFile[BiMap[String, > Int]](s"hdfs://predictionspark:9000/tmp/${id}/itemStringIntMap").first) > } > } > > > It works. > > But why the pio-env.sh says: > > # HADOOP_CONF_DIR: You must configure this if you intend to run PredictionIO > # with Hadoop 2. > # HADOOP_CONF_DIR=/opt/hadoop > > > I don't do this, it also works. So someone can explain this? And what is > HADOOP_CONF_DIR? All the configurations in the hadoop server's etc/ ? > > > > >
