This is an automated email from the ASF dual-hosted git repository.

elserj pushed a commit to branch 4.x-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x-HBase-1.3 by this push:
     new baa7080  PHOENIX-5128 Add ability to skip header in CsvBulkLoadTool
baa7080 is described below

commit baa708063de33c3a0b303f3121dd52ee6247702d
Author: Josh Elser <els...@apache.org>
AuthorDate: Fri Feb 8 11:13:01 2019 -0500

    PHOENIX-5128 Add ability to skip header in CsvBulkLoadTool
---
 .../apache/phoenix/end2end/CsvBulkLoadToolIT.java  | 33 ++++++++
 .../phoenix/mapreduce/AbstractBulkLoadTool.java    |  9 +-
 .../phoenix/mapreduce/PhoenixTextInputFormat.java  | 97 ++++++++++++++++++++++
 3 files changed, 137 insertions(+), 2 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
index 7e4226d..699b469 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
@@ -497,4 +497,37 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT {
         stmt.close();
 
     }
+
+    @Test
+    public void testIgnoreCsvHeader() throws Exception {
+        try (Statement stmt = conn.createStatement()) {
+            stmt.execute("CREATE TABLE S.TABLE13 (ID INTEGER NOT NULL PRIMARY 
KEY, NAME VARCHAR)");
+
+            final Configuration conf = new 
Configuration(getUtility().getConfiguration());
+            FileSystem fs = FileSystem.get(getUtility().getConfiguration());
+            FSDataOutputStream outputStream = fs.create(new 
Path("/tmp/input13.csv"));
+            try (PrintWriter printWriter = new PrintWriter(outputStream)) {
+                printWriter.println("id,name");
+                printWriter.println("1,Name 1");
+                printWriter.println("2,Name 2");
+                printWriter.println("3,Name 3");
+            }
+
+            CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
+            csvBulkLoadTool.setConf(conf);
+            int exitCode = csvBulkLoadTool.run(new String[] {
+                    "--input", "/tmp/input13.csv",
+                    "--table", "table13",
+                    "--schema", "s",
+                    "--zookeeper", zkQuorum,
+                    "--skip-header"});
+            assertEquals(0, exitCode);
+
+            try (ResultSet rs = stmt.executeQuery("SELECT COUNT(1) FROM 
S.TABLE13")) {
+                assertTrue(rs.next());
+                assertEquals(3, rs.getInt(1));
+                assertFalse(rs.next());
+            }
+        }
+    }
 }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index f717647..5252afb 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -47,7 +47,6 @@ import 
org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.phoenix.index.IndexMaintainer;
@@ -91,6 +90,7 @@ public abstract class AbstractBulkLoadTool extends Configured 
implements Tool {
     static final Option IMPORT_COLUMNS_OPT = new Option("c", "import-columns", 
true, "Comma-separated list of columns to be imported");
     static final Option IGNORE_ERRORS_OPT = new Option("g", "ignore-errors", 
false, "Ignore input errors");
     static final Option HELP_OPT = new Option("h", "help", false, "Show this 
help and quit");
+    static final Option SKIP_HEADER_OPT = new Option("k", "skip-header", 
false, "Skip the first line of CSV files (the header)");
 
     /**
      * Set configuration values based on parsed command line options.
@@ -114,6 +114,7 @@ public abstract class AbstractBulkLoadTool extends 
Configured implements Tool {
         options.addOption(IMPORT_COLUMNS_OPT);
         options.addOption(IGNORE_ERRORS_OPT);
         options.addOption(HELP_OPT);
+        options.addOption(SKIP_HEADER_OPT);
         return options;
     }
 
@@ -205,6 +206,10 @@ public abstract class AbstractBulkLoadTool extends 
Configured implements Tool {
                 conf.set(entry.getKey(), entry.getValue());
             }
         }
+        // Skip the first line of the CSV file(s)?
+        if (cmdLine.hasOption(SKIP_HEADER_OPT.getOpt())) {
+            PhoenixTextInputFormat.setSkipHeader(conf);
+        }
 
         final Connection conn = QueryUtil.getConnection(conf);
         if (LOG.isDebugEnabled()) {
@@ -282,7 +287,7 @@ public abstract class AbstractBulkLoadTool extends 
Configured implements Tool {
         FileInputFormat.addInputPaths(job, inputPaths);
         FileOutputFormat.setOutputPath(job, outputPath);
 
-        job.setInputFormatClass(TextInputFormat.class);
+        job.setInputFormatClass(PhoenixTextInputFormat.class);
         job.setMapOutputKeyClass(TableRowkeyPair.class);
         job.setMapOutputValueClass(ImmutableBytesWritable.class);
         job.setOutputKeyClass(TableRowkeyPair.class);
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTextInputFormat.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTextInputFormat.java
new file mode 100644
index 0000000..cc170f5
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTextInputFormat.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Wrapper around TextInputFormat which can ignore the first line in the first 
InputSplit
+ * for a file.
+ */
+public class PhoenixTextInputFormat extends TextInputFormat {
+  public static final String SKIP_HEADER_KEY = 
"phoenix.input.format.skip.header";
+
+  public static void setSkipHeader(Configuration conf) {
+    conf.setBoolean(SKIP_HEADER_KEY, true);
+  }
+
+  @Override
+  public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, 
TaskAttemptContext context) {
+    RecordReader<LongWritable,Text> rr = super.createRecordReader(split, 
context);
+    
+    return new PhoenixLineRecordReader((LineRecordReader) rr);
+  }
+
+  public static class PhoenixLineRecordReader extends 
RecordReader<LongWritable,Text> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PhoenixLineRecordReader.class);
+    private final LineRecordReader rr;
+    private PhoenixLineRecordReader(LineRecordReader rr) {
+      this.rr = rr;
+    }
+
+    @Override
+    public void initialize(InputSplit genericSplit, TaskAttemptContext 
context) throws IOException, InterruptedException {
+      rr.initialize(genericSplit, context);
+      final Configuration conf = context.getConfiguration();
+      final FileSplit split = (FileSplit) genericSplit;
+      if (conf.getBoolean(SKIP_HEADER_KEY, false) && split.getStart() == 0) {
+        LOG.trace("Consuming first key-value from {}", genericSplit);
+        nextKeyValue();
+      } else {
+        LOG.trace("Not configured to skip header or not the first input split: 
{}", split);
+      }
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      return rr.nextKeyValue();
+    }
+
+    @Override
+    public LongWritable getCurrentKey() throws IOException {
+      return rr.getCurrentKey();
+    }
+
+    @Override
+    public Text getCurrentValue() throws IOException {
+      return rr.getCurrentValue();
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      return rr.getProgress();
+    }
+
+    @Override
+    public void close() throws IOException {
+      rr.close();
+    }
+  }
+}

Reply via email to