PHOENIX-4415 Ignore CURRENT_SCN property if set in Pig Storer

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d6e61af8
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d6e61af8
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d6e61af8

Branch: refs/heads/system-catalog
Commit: d6e61af807f7a4e605c61217bac556ffe00ea237
Parents: 1c3387d
Author: James Taylor <jtay...@salesforce.com>
Authored: Thu Dec 7 19:13:25 2017 -0800
Committer: James Taylor <jtay...@salesforce.com>
Committed: Thu Dec 7 19:13:25 2017 -0800

----------------------------------------------------------------------
 .../phoenix/mapreduce/PhoenixOutputFormat.java  | 13 ++++++++++-
 .../phoenix/mapreduce/PhoenixRecordWriter.java  |  8 ++++++-
 .../phoenix/mapreduce/util/ConnectionUtil.java  | 23 ++++++++++++++++----
 .../org/apache/phoenix/util/PropertiesUtil.java |  9 +++++++-
 .../java/org/apache/phoenix/pig/BasePigIT.java  |  4 ++++
 .../apache/phoenix/pig/PhoenixHBaseStorage.java | 12 ++++++----
 6 files changed, 58 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d6e61af8/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
index e55b977..4217e40 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java
@@ -19,6 +19,8 @@ package org.apache.phoenix.mapreduce;
 
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,6 +38,15 @@ import org.apache.hadoop.mapreduce.lib.db.DBWritable;
  */
 public class PhoenixOutputFormat <T extends DBWritable> extends 
OutputFormat<NullWritable,T> {
     private static final Log LOG = 
LogFactory.getLog(PhoenixOutputFormat.class);
+    private final Set<String> propsToIgnore;
+    
+    public PhoenixOutputFormat() {
+        this(Collections.<String>emptySet());
+    }
+    
+    public PhoenixOutputFormat(Set<String> propsToIgnore) {
+        this.propsToIgnore = propsToIgnore;
+    }
     
     @Override
     public void checkOutputSpecs(JobContext jobContext) throws IOException, 
InterruptedException {      
@@ -52,7 +63,7 @@ public class PhoenixOutputFormat <T extends DBWritable> 
extends OutputFormat<Nul
     @Override
     public RecordWriter<NullWritable, T> getRecordWriter(TaskAttemptContext 
context) throws IOException, InterruptedException {
         try {
-            return new PhoenixRecordWriter<T>(context.getConfiguration());
+            return new PhoenixRecordWriter<T>(context.getConfiguration(), 
propsToIgnore);
         } catch (SQLException e) {
             LOG.error("Error calling PhoenixRecordWriter "  + e.getMessage());
             throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d6e61af8/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
index 70ee3f5..52f2fe3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -46,7 +48,11 @@ public class PhoenixRecordWriter<T extends DBWritable>  
extends RecordWriter<Nul
     private long numRecords = 0;
     
     public PhoenixRecordWriter(final Configuration configuration) throws 
SQLException {
-        this.conn = ConnectionUtil.getOutputConnection(configuration);
+        this(configuration, Collections.<String>emptySet());
+    }
+    
+    public PhoenixRecordWriter(final Configuration configuration, Set<String> 
propsToIgnore) throws SQLException {
+        this.conn = 
ConnectionUtil.getOutputConnectionWithoutTheseProps(configuration, 
propsToIgnore);
         this.batchSize = PhoenixConfigurationUtil.getBatchSize(configuration);
         final String upsertQuery = 
PhoenixConfigurationUtil.getUpsertStatement(configuration);
         this.statement = this.conn.prepareStatement(upsertQuery);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d6e61af8/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
index ada3816..56a5ef5 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
@@ -20,15 +20,16 @@ package org.apache.phoenix.mapreduce.util;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
-import java.util.Iterator;
-import java.util.Map;
+import java.util.Collections;
 import java.util.Properties;
+import java.util.Set;
 
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Utility class to return a {@link Connection} .
  */
@@ -74,15 +75,29 @@ public class ConnectionUtil {
      * Create the configured output Connection.
      *
      * @param conf configuration containing the connection information
+     * @return the configured output connection
+     */
+    public static Connection getOutputConnectionWithoutTheseProps(final 
Configuration conf, Set<String> ignoreTheseProps) throws SQLException {
+        return getOutputConnection(conf, new Properties(), ignoreTheseProps);
+    }
+    
+    /**
+     * Create the configured output Connection.
+     *
+     * @param conf configuration containing the connection information
      * @param props custom connection properties
      * @return the configured output connection
      */
     public static Connection getOutputConnection(final Configuration conf, 
Properties props) throws SQLException {
+        return getOutputConnection(conf, props, 
Collections.<String>emptySet());
+    }
+    
+    public static Connection getOutputConnection(final Configuration conf, 
Properties props, Set<String> withoutTheseProps) throws SQLException {
         Preconditions.checkNotNull(conf);
                return 
getConnection(PhoenixConfigurationUtil.getOutputCluster(conf),
                                PhoenixConfigurationUtil.getClientPort(conf),
                                PhoenixConfigurationUtil.getZNodeParent(conf),
-                               PropertiesUtil.combineProperties(props, conf));
+                               PropertiesUtil.combineProperties(props, conf, 
withoutTheseProps));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d6e61af8/phoenix-core/src/main/java/org/apache/phoenix/util/PropertiesUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/PropertiesUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/PropertiesUtil.java
index f6eb5c5..685b8cb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PropertiesUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PropertiesUtil.java
@@ -17,10 +17,13 @@
  */
 package org.apache.phoenix.util;
 
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
+import java.util.Set;
+
 import org.apache.hadoop.conf.Configuration;
 
 public class PropertiesUtil {
@@ -50,13 +53,17 @@ public class PropertiesUtil {
      *         properties contained in conf
      */
     public static Properties combineProperties(Properties props, final 
Configuration conf) {
+        return combineProperties(props, conf, Collections.<String>emptySet());
+    }
+    
+    public static Properties combineProperties(Properties props, final 
Configuration conf, Set<String> withoutTheseProps) {
         Iterator<Map.Entry<String, String>> iterator = conf.iterator();
         Properties copy = deepCopy(props);
         if (iterator != null) {
             while (iterator.hasNext()) {
                 Map.Entry<String, String> entry = iterator.next();
                 // set the property from config only if props doesn't have it 
already
-                if (copy.getProperty(entry.getKey()) == null) {
+                if (copy.getProperty(entry.getKey()) == null && 
!withoutTheseProps.contains(entry.getKey())) {
                     copy.setProperty(entry.getKey(), entry.getValue());
                 }
             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d6e61af8/phoenix-pig/src/it/java/org/apache/phoenix/pig/BasePigIT.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/it/java/org/apache/phoenix/pig/BasePigIT.java 
b/phoenix-pig/src/it/java/org/apache/phoenix/pig/BasePigIT.java
index 94ccc25..4de9854 100644
--- a/phoenix-pig/src/it/java/org/apache/phoenix/pig/BasePigIT.java
+++ b/phoenix-pig/src/it/java/org/apache/phoenix/pig/BasePigIT.java
@@ -29,8 +29,10 @@ import java.util.Properties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
 import org.apache.phoenix.end2end.Shadower;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.pig.ExecType;
@@ -62,6 +64,8 @@ public class BasePigIT extends BaseHBaseManagedTimeIT {
     public void setUp() throws Exception {
         conf = getTestClusterConfig();
         conf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, 
QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+        // Set CURRENT_SCN to confirm that it's ignored
+        conf.set(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(System.currentTimeMillis()+QueryConstants.MILLIS_IN_DAY));
         pigServer = new PigServer(ExecType.LOCAL, conf);
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         conn = DriverManager.getConnection(getUrl(), props);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d6e61af8/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
----------------------------------------------------------------------
diff --git 
a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java 
b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
index a9f0c8f..e061c1c 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java
@@ -21,6 +21,9 @@ import java.io.IOException;
 import java.sql.SQLException;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Arrays;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -43,6 +46,7 @@ import org.apache.phoenix.pig.util.TableSchemaParserFunction;
 import org.apache.phoenix.pig.util.TypeUtil;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.StoreFuncInterface;
@@ -87,14 +91,15 @@ import org.slf4j.LoggerFactory;
 public class PhoenixHBaseStorage implements StoreFuncInterface {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PhoenixHBaseStorage.class);
-    
+    private static final Set<String> PROPS_TO_IGNORE = new 
HashSet<>(Arrays.asList(PhoenixRuntime.CURRENT_SCN_ATTRIB));
+
     private Configuration config;
     private RecordWriter<NullWritable, PhoenixRecordWritable> writer;
     private List<ColumnInfo> columnInfo = null;
     private String contextSignature = null;
     private ResourceSchema schema;  
     private long batchSize;
-    private final PhoenixOutputFormat outputFormat = new PhoenixOutputFormat();
+    private final PhoenixOutputFormat outputFormat = new 
PhoenixOutputFormat<PhoenixRecordWritable>(PROPS_TO_IGNORE);
     // Set of options permitted
     private final static Options validOptions = new Options();
     private final static CommandLineParser parser = new GnuParser();
@@ -228,5 +233,4 @@ public class PhoenixHBaseStorage implements 
StoreFuncInterface {
         schema = s;
         getUDFProperties().setProperty(contextSignature + SCHEMA, 
ObjectSerializer.serialize(schema));
     }
-
-}
\ No newline at end of file
+}

Reply via email to