PHOENIX-4415 Ignore CURRENT_SCN property if set in Pig Storer
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a3db3356 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a3db3356 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a3db3356 Branch: refs/heads/4.x-HBase-1.2 Commit: a3db33567b4eb91f9c7b41d0232331da1f7873bb Parents: ef174bb Author: James Taylor <jtay...@salesforce.com> Authored: Wed Nov 8 03:13:53 2017 +0000 Committer: James Taylor <jtay...@salesforce.com> Committed: Sat Dec 16 16:42:54 2017 -0800 ---------------------------------------------------------------------- .../phoenix/mapreduce/PhoenixOutputFormat.java | 13 ++++++++++- .../phoenix/mapreduce/PhoenixRecordWriter.java | 8 ++++++- .../phoenix/mapreduce/util/ConnectionUtil.java | 23 ++++++++++++++++---- .../org/apache/phoenix/util/PropertiesUtil.java | 9 +++++++- .../java/org/apache/phoenix/pig/BasePigIT.java | 4 ++++ .../apache/phoenix/pig/PhoenixHBaseStorage.java | 12 ++++++---- 6 files changed, 58 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/a3db3356/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java index e55b977..4217e40 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java @@ -19,6 +19,8 @@ package org.apache.phoenix.mapreduce; import java.io.IOException; import java.sql.SQLException; +import java.util.Collections; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -36,6 +38,15 @@ import org.apache.hadoop.mapreduce.lib.db.DBWritable; */ public class PhoenixOutputFormat <T extends DBWritable> extends OutputFormat<NullWritable,T> { private static final Log LOG = LogFactory.getLog(PhoenixOutputFormat.class); + private final Set<String> propsToIgnore; + + public PhoenixOutputFormat() { + this(Collections.<String>emptySet()); + } + + public PhoenixOutputFormat(Set<String> propsToIgnore) { + this.propsToIgnore = propsToIgnore; + } @Override public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException { @@ -52,7 +63,7 @@ public class PhoenixOutputFormat <T extends DBWritable> extends OutputFormat<Nul @Override public RecordWriter<NullWritable, T> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { try { - return new PhoenixRecordWriter<T>(context.getConfiguration()); + return new PhoenixRecordWriter<T>(context.getConfiguration(), propsToIgnore); } catch (SQLException e) { LOG.error("Error calling PhoenixRecordWriter " + e.getMessage()); throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/phoenix/blob/a3db3356/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java index 70ee3f5..52f2fe3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.Collections; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -46,7 +48,11 @@ public class PhoenixRecordWriter<T extends DBWritable> extends RecordWriter<Nul private long numRecords = 0; public PhoenixRecordWriter(final Configuration configuration) throws SQLException { - this.conn = ConnectionUtil.getOutputConnection(configuration); + this(configuration, Collections.<String>emptySet()); + } + + public PhoenixRecordWriter(final Configuration configuration, Set<String> propsToIgnore) throws SQLException { + this.conn = ConnectionUtil.getOutputConnectionWithoutTheseProps(configuration, propsToIgnore); this.batchSize = PhoenixConfigurationUtil.getBatchSize(configuration); final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration); this.statement = this.conn.prepareStatement(upsertQuery); http://git-wip-us.apache.org/repos/asf/phoenix/blob/a3db3356/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java index ada3816..56a5ef5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java @@ -20,15 +20,16 @@ package org.apache.phoenix.mapreduce.util; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; -import java.util.Iterator; -import java.util.Map; +import java.util.Collections; import java.util.Properties; +import java.util.Set; -import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; +import com.google.common.base.Preconditions; + /** * Utility class to return a {@link Connection} . */ @@ -74,15 +75,29 @@ public class ConnectionUtil { * Create the configured output Connection. * * @param conf configuration containing the connection information + * @return the configured output connection + */ + public static Connection getOutputConnectionWithoutTheseProps(final Configuration conf, Set<String> ignoreTheseProps) throws SQLException { + return getOutputConnection(conf, new Properties(), ignoreTheseProps); + } + + /** + * Create the configured output Connection. + * + * @param conf configuration containing the connection information * @param props custom connection properties * @return the configured output connection */ public static Connection getOutputConnection(final Configuration conf, Properties props) throws SQLException { + return getOutputConnection(conf, props, Collections.<String>emptySet()); + } + + public static Connection getOutputConnection(final Configuration conf, Properties props, Set<String> withoutTheseProps) throws SQLException { Preconditions.checkNotNull(conf); return getConnection(PhoenixConfigurationUtil.getOutputCluster(conf), PhoenixConfigurationUtil.getClientPort(conf), PhoenixConfigurationUtil.getZNodeParent(conf), - PropertiesUtil.combineProperties(props, conf)); + PropertiesUtil.combineProperties(props, conf, withoutTheseProps)); } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/a3db3356/phoenix-core/src/main/java/org/apache/phoenix/util/PropertiesUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PropertiesUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PropertiesUtil.java index f6eb5c5..685b8cb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/PropertiesUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PropertiesUtil.java @@ -17,10 +17,13 @@ */ package org.apache.phoenix.util; +import java.util.Collections; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; +import java.util.Set; + import org.apache.hadoop.conf.Configuration; public class PropertiesUtil { @@ -50,13 +53,17 @@ public class PropertiesUtil { * properties contained in conf */ public static Properties combineProperties(Properties props, final Configuration conf) { + return combineProperties(props, conf, Collections.<String>emptySet()); + } + + public static Properties combineProperties(Properties props, final Configuration conf, Set<String> withoutTheseProps) { Iterator<Map.Entry<String, String>> iterator = conf.iterator(); Properties copy = deepCopy(props); if (iterator != null) { while (iterator.hasNext()) { Map.Entry<String, String> entry = iterator.next(); // set the property from config only if props doesn't have it already - if (copy.getProperty(entry.getKey()) == null) { + if (copy.getProperty(entry.getKey()) == null && !withoutTheseProps.contains(entry.getKey())) { copy.setProperty(entry.getKey(), entry.getValue()); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/a3db3356/phoenix-pig/src/it/java/org/apache/phoenix/pig/BasePigIT.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/it/java/org/apache/phoenix/pig/BasePigIT.java b/phoenix-pig/src/it/java/org/apache/phoenix/pig/BasePigIT.java index 94ccc25..4de9854 100644 --- a/phoenix-pig/src/it/java/org/apache/phoenix/pig/BasePigIT.java +++ b/phoenix-pig/src/it/java/org/apache/phoenix/pig/BasePigIT.java @@ -29,8 +29,10 @@ import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; import org.apache.phoenix.end2end.Shadower; +import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.pig.ExecType; @@ -62,6 +64,8 @@ public class BasePigIT extends BaseHBaseManagedTimeIT { public void setUp() throws Exception { conf = getTestClusterConfig(); conf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS); + // Set CURRENT_SCN to confirm that it's ignored + conf.set(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(System.currentTimeMillis()+QueryConstants.MILLIS_IN_DAY)); pigServer = new PigServer(ExecType.LOCAL, conf); Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); conn = DriverManager.getConnection(getUrl(), props); http://git-wip-us.apache.org/repos/asf/phoenix/blob/a3db3356/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java index a9f0c8f..e061c1c 100644 --- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java +++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java @@ -21,6 +21,9 @@ import java.io.IOException; import java.sql.SQLException; import java.util.List; import java.util.Properties; +import java.util.Set; +import java.util.HashSet; +import java.util.Arrays; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -43,6 +46,7 @@ import org.apache.phoenix.pig.util.TableSchemaParserFunction; import org.apache.phoenix.pig.util.TypeUtil; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.PhoenixRuntime; import org.apache.pig.ResourceSchema; import org.apache.pig.ResourceSchema.ResourceFieldSchema; import org.apache.pig.StoreFuncInterface; @@ -87,14 +91,15 @@ import org.slf4j.LoggerFactory; public class PhoenixHBaseStorage implements StoreFuncInterface { private static final Logger LOG = LoggerFactory.getLogger(PhoenixHBaseStorage.class); - + private static final Set<String> PROPS_TO_IGNORE = new HashSet<>(Arrays.asList(PhoenixRuntime.CURRENT_SCN_ATTRIB)); + private Configuration config; private RecordWriter<NullWritable, PhoenixRecordWritable> writer; private List<ColumnInfo> columnInfo = null; private String contextSignature = null; private ResourceSchema schema; private long batchSize; - private final PhoenixOutputFormat outputFormat = new PhoenixOutputFormat(); + private final PhoenixOutputFormat outputFormat = new PhoenixOutputFormat<PhoenixRecordWritable>(PROPS_TO_IGNORE); // Set of options permitted private final static Options validOptions = new Options(); private final static CommandLineParser parser = new GnuParser(); @@ -228,5 +233,4 @@ public class PhoenixHBaseStorage implements StoreFuncInterface { schema = s; getUDFProperties().setProperty(contextSignature + SCHEMA, ObjectSerializer.serialize(schema)); } - -} \ No newline at end of file +}