This is an automated email from the ASF dual-hosted git repository. pboado pushed a commit to branch 5.x-cdh6 in repository https://gitbox.apache.org/repos/asf/phoenix.git
commit d066f400f8fab830597a1b5a37d1d647705aa70f Author: Thomas D'Silva <tdsi...@apache.org> AuthorDate: Fri Mar 1 00:52:36 2019 +0000 PHOENIX-5141 Use HBaseFactoryProvider.getConfigurationFactory to get the config in PhoenixRDD (addendum) --- .../main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala | 7 +++++-- .../main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala | 8 ++++---- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala index d555954..9377986 100644 --- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala +++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/ConfigurationUtil.scala @@ -17,6 +17,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants} import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver import org.apache.phoenix.mapreduce.util.{ColumnInfoToStringEncoderDecoder, PhoenixConfigurationUtil} +import org.apache.phoenix.query.HBaseFactoryProvider import org.apache.phoenix.util.{ColumnInfo, PhoenixRuntime} import scala.collection.JavaConversions._ @@ -28,8 +29,8 @@ object ConfigurationUtil extends Serializable { // Create an HBaseConfiguration object from the passed in config, if present val config = conf match { - case Some(c) => HBaseConfiguration.create(c) - case _ => HBaseConfiguration.create() + case Some(c) => HBaseFactoryProvider.getConfigurationFactory.getConfiguration(c) + case _ => HBaseFactoryProvider.getConfigurationFactory.getConfiguration() } // Set the tenantId in the config if present @@ -41,6 +42,8 @@ object ConfigurationUtil extends Serializable { // Set the table to save to PhoenixConfigurationUtil.setOutputTableName(config, tableName) PhoenixConfigurationUtil.setPhysicalTableName(config, tableName) + // disable property provider evaluation + PhoenixConfigurationUtil.setPropertyPolicyProviderDisabled(config); // Infer column names from the DataFrame schema PhoenixConfigurationUtil.setUpsertColumnNames(config, Array(columns : _*)) diff --git a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala index 3b0289d..85a6d8a 100644 --- a/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala +++ b/phoenix-spark/src/main/scala/org/apache/phoenix/spark/DataFrameFunctions.scala @@ -28,7 +28,7 @@ class DataFrameFunctions(data: DataFrame) extends Serializable { saveToPhoenix(parameters("table"), zkUrl = parameters.get("zkUrl"), tenantId = parameters.get("TenantId"), skipNormalizingIdentifier=parameters.contains("skipNormalizingIdentifier")) } - def saveToPhoenix(tableName: String, conf: Configuration = new Configuration, + def saveToPhoenix(tableName: String, conf: Option[Configuration] = None, zkUrl: Option[String] = None, tenantId: Option[String] = None, skipNormalizingIdentifier: Boolean = false): Unit = { // Retrieve the schema field names and normalize to Phoenix, need to do this outside of mapPartitions @@ -36,7 +36,7 @@ class DataFrameFunctions(data: DataFrame) extends Serializable { // Create a configuration object to use for saving - @transient val outConfig = ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrl, tenantId, Some(conf)) + @transient val outConfig = ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrl, tenantId, conf) // Retrieve the zookeeper URL val zkUrlFinal = ConfigurationUtil.getZookeeperURL(outConfig) @@ -45,9 +45,9 @@ class DataFrameFunctions(data: DataFrame) extends Serializable { val phxRDD = data.rdd.mapPartitions{ rows => // Create a within-partition config to retrieve the ColumnInfo list - @transient val partitionConfig = ConfigurationUtil.getOutputConfiguration(tableName, fieldArray, zkUrlFinal, tenantId) + @transient val partitionConfig = ConfigurationUtil.getOutputCon figuration(tableName, fieldArray, zkUrlFinal, tenantId) @transient val columns = PhoenixConfigurationUtil.getUpsertColumnMetadataList(partitionConfig).toList - + rows.map { row => val rec = new PhoenixRecordWritable(columns) row.toSeq.foreach { e => rec.add(e) }