http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java
index eae58ad..05fbab2 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java
@@ -66,9 +66,9 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
-import org.apache.phoenix.mapreduce.CsvBulkLoadTool.TargetTableRef;
-import org.apache.phoenix.mapreduce.CsvBulkLoadTool.TargetTableRefFunctions;
-import org.apache.phoenix.mapreduce.bulkload.CsvTableRowkeyPair;
+import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
+import org.apache.phoenix.mapreduce.bulkload.TargetTableRef;
+import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,7 +81,7 @@ import com.google.common.collect.Sets;
  * It has been adapted from {#link HFileOutputFormat2} but differs from the 
fact it creates
  * HFiles for multiple tables.
  */
-public class MultiHfileOutputFormat extends 
FileOutputFormat<CsvTableRowkeyPair, Cell> {
+public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, 
Cell> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(MultiHfileOutputFormat.class);
 
@@ -101,7 +101,7 @@ public class MultiHfileOutputFormat extends 
FileOutputFormat<CsvTableRowkeyPair,
     private static final String AT_DELIMITER = "@";
     
     @Override
-    public RecordWriter<CsvTableRowkeyPair, Cell> 
getRecordWriter(TaskAttemptContext context)
+    public RecordWriter<TableRowkeyPair, Cell> 
getRecordWriter(TaskAttemptContext context)
             throws IOException, InterruptedException {
         return createRecordWriter(context);
     }
@@ -112,7 +112,7 @@ public class MultiHfileOutputFormat extends 
FileOutputFormat<CsvTableRowkeyPair,
      * @return
      * @throws IOException 
      */
-    static <V extends Cell> RecordWriter<CsvTableRowkeyPair, V> 
createRecordWriter(final TaskAttemptContext context) 
+    static <V extends Cell> RecordWriter<TableRowkeyPair, V> 
createRecordWriter(final TaskAttemptContext context)
             throws IOException {
         // Get the path of the temporary output file
         final Path outputPath = FileOutputFormat.getOutputPath(context);
@@ -130,7 +130,7 @@ public class MultiHfileOutputFormat extends 
FileOutputFormat<CsvTableRowkeyPair,
         final boolean compactionExclude = conf.getBoolean(
             "hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
 
-        return new RecordWriter<CsvTableRowkeyPair, V>() {
+        return new RecordWriter<TableRowkeyPair, V>() {
           // Map of families to writers and how much has been output on the 
writer.
             private final Map<byte [], WriterLength> writers =
                     new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
@@ -139,7 +139,7 @@ public class MultiHfileOutputFormat extends 
FileOutputFormat<CsvTableRowkeyPair,
             private boolean rollRequested = false;
 
             @Override
-            public void write(CsvTableRowkeyPair row, V cell)
+            public void write(TableRowkeyPair row, V cell)
                     throws IOException {
                 KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
                 // null input == user explicitly wants to flush
@@ -450,7 +450,7 @@ public class MultiHfileOutputFormat extends 
FileOutputFormat<CsvTableRowkeyPair,
      * Configure <code>job</code> with a TotalOrderPartitioner, partitioning 
against
      * <code>splitPoints</code>. Cleans up the partitions file after job 
exists.
      */
-    static void configurePartitioner(Job job, Set<CsvTableRowkeyPair> 
tablesStartKeys) 
+    static void configurePartitioner(Job job, Set<TableRowkeyPair> 
tablesStartKeys)
             throws IOException {
         
         Configuration conf = job.getConfiguration();
@@ -467,7 +467,7 @@ public class MultiHfileOutputFormat extends 
FileOutputFormat<CsvTableRowkeyPair,
     }
 
     private static void writePartitions(Configuration conf, Path 
partitionsPath,
-            Set<CsvTableRowkeyPair> tablesStartKeys) throws IOException {
+            Set<TableRowkeyPair> tablesStartKeys) throws IOException {
         
         LOG.info("Writing partition information to " + partitionsPath);
         if (tablesStartKeys.isEmpty()) {
@@ -478,9 +478,9 @@ public class MultiHfileOutputFormat extends 
FileOutputFormat<CsvTableRowkeyPair,
         // have keys < the first region (which has an empty start key)
         // so we need to remove it. Otherwise we would end up with an
         // empty reducer with index 0
-        TreeSet<CsvTableRowkeyPair> sorted = new 
TreeSet<CsvTableRowkeyPair>(tablesStartKeys);
+        TreeSet<TableRowkeyPair> sorted = new 
TreeSet<TableRowkeyPair>(tablesStartKeys);
 
-        CsvTableRowkeyPair first = sorted.first();
+        TableRowkeyPair first = sorted.first();
         if (!first.getRowkey().equals(HConstants.EMPTY_BYTE_ARRAY)) {
           throw new IllegalArgumentException(
               "First region of table should have empty start key. Instead has: 
"
@@ -491,11 +491,11 @@ public class MultiHfileOutputFormat extends 
FileOutputFormat<CsvTableRowkeyPair,
         // Write the actual file
         FileSystem fs = partitionsPath.getFileSystem(conf);
         SequenceFile.Writer writer = SequenceFile.createWriter(
-          fs, conf, partitionsPath, CsvTableRowkeyPair.class,
+          fs, conf, partitionsPath, TableRowkeyPair.class,
           NullWritable.class);
 
         try {
-          for (CsvTableRowkeyPair startKey : sorted) {
+          for (TableRowkeyPair startKey : sorted) {
             writer.append(startKey, NullWritable.get());
           }
         } finally {
@@ -658,11 +658,11 @@ public class MultiHfileOutputFormat extends 
FileOutputFormat<CsvTableRowkeyPair,
                 KeyValueSerialization.class.getName());
 
         // tableStartKeys for all tables.
-        Set<CsvTableRowkeyPair> tablesStartKeys = Sets.newTreeSet();
+        Set<TableRowkeyPair> tablesStartKeys = Sets.newTreeSet();
         for(TargetTableRef table : tablesToBeLoaded) {
            final String tableName = table.getPhysicalName();
            try(HTable htable = new HTable(conf,tableName);){
-               Set<CsvTableRowkeyPair> startKeys = 
getRegionStartKeys(tableName , htable.getRegionLocator());
+               Set<TableRowkeyPair> startKeys = getRegionStartKeys(tableName , 
htable.getRegionLocator());
                tablesStartKeys.addAll(startKeys);
                String compressionConfig = 
configureCompression(htable.getTableDescriptor());
                String bloomTypeConfig = 
configureBloomType(htable.getTableDescriptor());
@@ -704,12 +704,12 @@ public class MultiHfileOutputFormat extends 
FileOutputFormat<CsvTableRowkeyPair,
      * Return the start keys of all of the regions in this table,
      * as a list of ImmutableBytesWritable.
      */
-    private static Set<CsvTableRowkeyPair> getRegionStartKeys(String tableName 
, RegionLocator table) throws IOException {
+    private static Set<TableRowkeyPair> getRegionStartKeys(String tableName , 
RegionLocator table) throws IOException {
       byte[][] byteKeys = table.getStartKeys();
-      Set<CsvTableRowkeyPair> ret = new TreeSet<CsvTableRowkeyPair>();
+      Set<TableRowkeyPair> ret = new TreeSet<TableRowkeyPair>();
       for (byte[] byteKey : byteKeys) {
           // phoenix-2216: start : passing the table name and startkey  
-        ret.add(new CsvTableRowkeyPair(tableName, new 
ImmutableBytesWritable(byteKey)));
+        ret.add(new TableRowkeyPair(tableName, new 
ImmutableBytesWritable(byteKey)));
       }
       return ret;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/CsvTableRowkeyPair.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/CsvTableRowkeyPair.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/CsvTableRowkeyPair.java
deleted file mode 100644
index 3ae74b6..0000000
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/CsvTableRowkeyPair.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.mapreduce.bulkload;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableUtils;
-
-import com.google.common.base.Preconditions;
-
-
-/**
- * A WritableComparable to hold the table name and the rowkey. 
- *
- */
-public class CsvTableRowkeyPair implements 
WritableComparable<CsvTableRowkeyPair> {
-
-    /* The qualified table name */
-    private String tableName;
-    
-    /* The rowkey for the record */
-    private ImmutableBytesWritable rowkey;
-    
-    /**
-     * Default constructor
-     */
-    public CsvTableRowkeyPair() {
-        super();
-    }
-    
-    /**
-     * @param tableName
-     * @param rowkey
-     */
-    public CsvTableRowkeyPair(String tableName, ImmutableBytesWritable rowkey) 
{
-        super();
-        Preconditions.checkNotNull(tableName);
-        Preconditions.checkNotNull(rowkey);
-        this.tableName = tableName;
-        this.rowkey = rowkey;
-    }
-
-    public String getTableName() {
-        return tableName;
-    }
-
-    public void setTableName(String tableName) {
-        this.tableName = tableName;
-    }
-
-    public ImmutableBytesWritable getRowkey() {
-        return rowkey;
-    }
-
-    public void setRowkey(ImmutableBytesWritable rowkey) {
-        this.rowkey = rowkey;
-    }
-
-    @Override
-    public void readFields(DataInput input) throws IOException {
-        tableName = WritableUtils.readString(input);
-        rowkey = new ImmutableBytesWritable();
-        rowkey.readFields(input);
-   }
-
-    @Override
-    public void write(DataOutput output) throws IOException {
-        WritableUtils.writeString(output,tableName);
-        rowkey.write(output);
-    }
-
-    @Override
-    public int compareTo(CsvTableRowkeyPair other) {
-        String otherTableName = other.getTableName();
-        if(this.tableName.equals(otherTableName)) {
-            return this.rowkey.compareTo(other.getRowkey());
-        } else {
-            return this.tableName.compareTo(otherTableName);
-        }
-    }
-    
-    /** Comparator optimized for <code>CsvTableRowkeyPair</code>. */
-    public static class Comparator extends WritableComparator {
-        private BytesWritable.Comparator comparator = new 
BytesWritable.Comparator();
-        
-        public Comparator() {
-            super(CsvTableRowkeyPair.class);
-        }
-
-        @Override
-        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int 
l2) {
-            try {
-                int vintL1 = WritableUtils.decodeVIntSize(b1[s1]);
-                int vintL2 = WritableUtils.decodeVIntSize(b2[s2]);
-                int strL1 = readVInt(b1, s1);
-                int strL2 = readVInt(b2, s2);
-                int cmp = compareBytes(b1, s1 + vintL1, strL1, b2, s2 + 
vintL2, strL2);
-                if (cmp != 0) {
-                  return cmp;
-                }
-                int vintL3 = WritableUtils.decodeVIntSize(b1[s1 + vintL1 + 
strL1]);
-                int vintL4 = WritableUtils.decodeVIntSize(b2[s2 + vintL2 + 
strL2]);
-                int strL3 = readVInt(b1, s1 + vintL1 + strL1);
-                int strL4 = readVInt(b2, s2 + vintL2 + strL2);
-                return comparator.compare(b1, s1 + vintL1 + strL1 + vintL3, 
strL3, b2, s2
-                    + vintL2 + strL2 + vintL4, strL4);
-                
-            } catch(Exception ex) {
-                throw new IllegalArgumentException(ex);
-            }
-        }
-    }
- 
-    static { 
-        WritableComparator.define(CsvTableRowkeyPair.class, new Comparator());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TableRowkeyPair.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TableRowkeyPair.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TableRowkeyPair.java
new file mode 100644
index 0000000..412226f
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TableRowkeyPair.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.bulkload;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.WritableUtils;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * A WritableComparable to hold the table name and the rowkey.
+ */
+public class TableRowkeyPair implements WritableComparable<TableRowkeyPair> {
+
+    /* The qualified table name */
+    private String tableName;
+
+    /* The rowkey for the record */
+    private ImmutableBytesWritable rowkey;
+
+    /**
+     * Default constructor
+     */
+    public TableRowkeyPair() {
+        super();
+    }
+
+    public TableRowkeyPair(String tableName, ImmutableBytesWritable rowkey) {
+        super();
+        Preconditions.checkNotNull(tableName);
+        Preconditions.checkNotNull(rowkey);
+        this.tableName = tableName;
+        this.rowkey = rowkey;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public void setTableName(String tableName) {
+        this.tableName = tableName;
+    }
+
+    public ImmutableBytesWritable getRowkey() {
+        return rowkey;
+    }
+
+    public void setRowkey(ImmutableBytesWritable rowkey) {
+        this.rowkey = rowkey;
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        tableName = WritableUtils.readString(input);
+        rowkey = new ImmutableBytesWritable();
+        rowkey.readFields(input);
+   }
+
+    @Override
+    public void write(DataOutput output) throws IOException {
+        WritableUtils.writeString(output,tableName);
+        rowkey.write(output);
+    }
+
+    @Override
+    public int compareTo(TableRowkeyPair other) {
+        String otherTableName = other.getTableName();
+        if(this.tableName.equals(otherTableName)) {
+            return this.rowkey.compareTo(other.getRowkey());
+        } else {
+            return this.tableName.compareTo(otherTableName);
+        }
+    }
+    
+    /** Comparator optimized for <code>TableRowkeyPair</code>. */
+    public static class Comparator extends WritableComparator {
+        private BytesWritable.Comparator comparator = new 
BytesWritable.Comparator();
+        
+        public Comparator() {
+            super(TableRowkeyPair.class);
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int 
l2) {
+            try {
+                int vintL1 = WritableUtils.decodeVIntSize(b1[s1]);
+                int vintL2 = WritableUtils.decodeVIntSize(b2[s2]);
+                int strL1 = readVInt(b1, s1);
+                int strL2 = readVInt(b2, s2);
+                int cmp = compareBytes(b1, s1 + vintL1, strL1, b2, s2 + 
vintL2, strL2);
+                if (cmp != 0) {
+                  return cmp;
+                }
+                int vintL3 = WritableUtils.decodeVIntSize(b1[s1 + vintL1 + 
strL1]);
+                int vintL4 = WritableUtils.decodeVIntSize(b2[s2 + vintL2 + 
strL2]);
+                int strL3 = readVInt(b1, s1 + vintL1 + strL1);
+                int strL4 = readVInt(b2, s2 + vintL2 + strL2);
+                return comparator.compare(b1, s1 + vintL1 + strL1 + vintL3, 
strL3, b2, s2
+                    + vintL2 + strL2 + vintL4, strL4);
+                
+            } catch(Exception ex) {
+                throw new IllegalArgumentException(ex);
+            }
+        }
+    }
+ 
+    static { 
+        WritableComparator.define(TableRowkeyPair.class, new Comparator());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRef.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRef.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRef.java
new file mode 100644
index 0000000..1a846f9
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRef.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.bulkload;
+
+import java.util.Map;
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+import com.google.common.collect.Maps;
+
+/**
+ * Represents the logical and physical name of a single table to which data is 
to be loaded.
+ *
+ * This class exists to allow for the difference between HBase physical table 
names and
+ * Phoenix logical table names.
+ */
+public class TargetTableRef {
+
+    @JsonProperty
+    private final String logicalName;
+
+    @JsonProperty
+    private final String physicalName;
+
+    @JsonProperty
+    private Map<String,String> configuration = Maps.newHashMap();
+
+    public TargetTableRef(String name) {
+        this(name, name);
+    }
+
+    @JsonCreator
+    private TargetTableRef(@JsonProperty("logicalName") String logicalName,
+        @JsonProperty("physicalName") String physicalName) {
+        this.logicalName = logicalName;
+        this.physicalName = physicalName;
+    }
+
+    public String getLogicalName() {
+        return logicalName;
+    }
+
+    public String getPhysicalName() {
+        return physicalName;
+    }
+
+    public Map<String, String> getConfiguration() {
+        return configuration;
+    }
+
+    public void setConfiguration(Map<String, String> configuration) {
+        this.configuration = configuration;
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java
new file mode 100644
index 0000000..d786842
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.bulkload;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.codehaus.jackson.map.ObjectMapper;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+
+/**
+  * Utility functions to get/put json.
+  *
+  */
+public class TargetTableRefFunctions {
+
+     public static Function<TargetTableRef,String> TO_JSON =  new 
Function<TargetTableRef,String>() {
+
+         @Override
+         public String apply(TargetTableRef input) {
+             try {
+                 ObjectMapper mapper = new ObjectMapper();
+                 return mapper.writeValueAsString(input);
+             } catch (IOException e) {
+                 throw new RuntimeException(e);
+             }
+
+         }
+     };
+
+     public static Function<String,TargetTableRef> FROM_JSON =  new 
Function<String,TargetTableRef>() {
+
+         @Override
+         public TargetTableRef apply(String json) {
+             try {
+                 ObjectMapper mapper = new ObjectMapper();
+                 return mapper.readValue(json, TargetTableRef.class);
+             } catch (IOException e) {
+                 throw new RuntimeException(e);
+             }
+
+         }
+     };
+
+     public static Function<List<TargetTableRef>,String> NAMES_TO_JSON =  new 
Function<List<TargetTableRef>,String>() {
+
+         @Override
+         public String apply(List<TargetTableRef> input) {
+             try {
+                 List<String> tableNames = 
Lists.newArrayListWithCapacity(input.size());
+                 for(TargetTableRef table : input) {
+                     tableNames.add(table.getPhysicalName());
+                 }
+                 ObjectMapper mapper = new ObjectMapper();
+                 return mapper.writeValueAsString(tableNames);
+             } catch (IOException e) {
+                 throw new RuntimeException(e);
+             }
+
+         }
+     };
+
+     public static Function<String,List<String>> NAMES_FROM_JSON =  new 
Function<String,List<String>>() {
+
+         @SuppressWarnings("unchecked")
+         @Override
+         public List<String> apply(String json) {
+             try {
+                 ObjectMapper mapper = new ObjectMapper();
+                 return mapper.readValue(json, ArrayList.class);
+             } catch (IOException e) {
+                 throw new RuntimeException(e);
+             }
+
+         }
+     };
+ }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 8bf786b..9e29fba 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -23,13 +23,6 @@ import java.sql.SQLException;
 import java.util.List;
 import java.util.Map;
 
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -41,13 +34,17 @@ import 
org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable;
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.phoenix.jdbc.PhoenixConnection;
-import 
org.apache.phoenix.mapreduce.CsvToKeyValueMapper.DefaultImportPreUpsertKeyValueProcessor;
+import org.apache.phoenix.mapreduce.FormatToKeyValueMapper;
 import org.apache.phoenix.mapreduce.ImportPreUpsertKeyValueProcessor;
 import org.apache.phoenix.mapreduce.PhoenixInputFormat;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
 import static org.apache.commons.lang.StringUtils.isNotEmpty;
 
 /**
@@ -419,7 +416,7 @@ public final class PhoenixConfigurationUtil {
         Class<? extends ImportPreUpsertKeyValueProcessor> processorClass = 
null;
         try {
             processorClass = conf.getClass(
-                    UPSERT_HOOK_CLASS_CONFKEY, 
DefaultImportPreUpsertKeyValueProcessor.class,
+                    UPSERT_HOOK_CLASS_CONFKEY, 
FormatToKeyValueMapper.DefaultImportPreUpsertKeyValueProcessor.class,
                     ImportPreUpsertKeyValueProcessor.class);
         } catch (Exception e) {
             throw new IllegalStateException("Couldn't load upsert hook class", 
e);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/util/CSVCommonsLoader.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/CSVCommonsLoader.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/CSVCommonsLoader.java
index b8b284a..cdd9d7b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/CSVCommonsLoader.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/CSVCommonsLoader.java
@@ -17,36 +17,25 @@
  */
 package org.apache.phoenix.util;
 
-import com.google.common.base.Charsets;
-import com.google.common.base.Joiner;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.io.File;
+import java.io.Reader;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.csv.CSVParser;
 import org.apache.commons.csv.CSVRecord;
-import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.util.csv.CsvUpsertExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.Reader;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
 
 /***
  * Upserts CSV data using Phoenix JDBC connection
@@ -222,7 +211,7 @@ public class CSVCommonsLoader {
             long start = System.currentTimeMillis();
             CsvUpsertListener upsertListener = new CsvUpsertListener(conn,
                     conn.getMutateBatchSize(), isStrict);
-            CsvUpsertExecutor csvUpsertExecutor = 
CsvUpsertExecutor.create(conn, tableName,
+            CsvUpsertExecutor csvUpsertExecutor = new CsvUpsertExecutor(conn, 
tableName,
                     columnInfoList, upsertListener, arrayElementSeparator);
 
             csvUpsertExecutor.execute(csvParser);
@@ -267,131 +256,10 @@ public class CSVCommonsLoader {
         default:
             throw new IllegalStateException("parser has unknown column 
source.");
         }
-        return generateColumnInfo(conn, tableName, columns, isStrict);
-    }
-
-    /**
-     * Get list of ColumnInfos that contain Column Name and its associated
-     * PDataType for an import. The supplied list of columns can be null -- if 
it is non-null,
-     * it represents a user-supplied list of columns to be imported.
-     *
-     * @param conn Phoenix connection from which metadata will be read
-     * @param tableName Phoenix table name whose columns are to be checked. 
Can include a schema
-     *                  name
-     * @param columns user-supplied list of import columns, can be null
-     * @param strict if true, an exception will be thrown if unknown columns 
are supplied
-     */
-    public static List<ColumnInfo> generateColumnInfo(Connection conn,
-            String tableName, List<String> columns, boolean strict)
-            throws SQLException {
-        Map<String, Integer> columnNameToTypeMap = Maps.newLinkedHashMap();
-        Set<String> ambiguousColumnNames = new HashSet<String>();
-        Map<String, Integer> fullColumnNameToTypeMap = Maps.newLinkedHashMap();
-        DatabaseMetaData dbmd = conn.getMetaData();
-        int unfoundColumnCount = 0;
-        // TODO: escape wildcard characters here because we don't want that
-        // behavior here
-        String escapedTableName = StringUtil.escapeLike(tableName);
-        String[] schemaAndTable = escapedTableName.split("\\.");
-        ResultSet rs = null;
-        try {
-            rs = dbmd.getColumns(null, (schemaAndTable.length == 1 ? ""
-                    : schemaAndTable[0]),
-                    (schemaAndTable.length == 1 ? escapedTableName
-                            : schemaAndTable[1]), null);
-            while (rs.next()) {
-                String colName = rs.getString(QueryUtil.COLUMN_NAME_POSITION);
-                String colFam = rs.getString(QueryUtil.COLUMN_FAMILY_POSITION);
-
-                // use family qualifier, if available, otherwise, use column 
name
-                String fullColumn = 
(colFam==null?colName:String.format("%s.%s",colFam,colName));
-                String sqlTypeName = 
rs.getString(QueryUtil.DATA_TYPE_NAME_POSITION);
-
-                // allow for both bare and family qualified names.
-                if (columnNameToTypeMap.keySet().contains(colName)) {
-                    ambiguousColumnNames.add(colName);
-                }
-                columnNameToTypeMap.put(
-                        colName,
-                        PDataType.fromSqlTypeName(sqlTypeName).getSqlType());
-                fullColumnNameToTypeMap.put(
-                        fullColumn,
-                        PDataType.fromSqlTypeName(sqlTypeName).getSqlType());
-            }
-            if (columnNameToTypeMap.isEmpty()) {
-                throw new IllegalArgumentException("Table " + tableName + " 
not found");
-            }
-        } finally {
-            if (rs != null) {
-                rs.close();
-            }
-        }
-        List<ColumnInfo> columnInfoList = Lists.newArrayList();
-        Set<String> unresolvedColumnNames = new TreeSet<String>();
-        if (columns == null) {
-            // use family qualified names by default, if no columns are 
specified.
-            for (Map.Entry<String, Integer> entry : fullColumnNameToTypeMap
-                    .entrySet()) {
-                columnInfoList.add(new ColumnInfo(entry.getKey(), 
entry.getValue()));
-            }
-        } else {
-            // Leave "null" as indication to skip b/c it doesn't exist
-            for (int i = 0; i < columns.size(); i++) {
-                String columnName = columns.get(i).trim();
-                Integer sqlType = null;
-                if (fullColumnNameToTypeMap.containsKey(columnName)) {
-                    sqlType = fullColumnNameToTypeMap.get(columnName);
-                } else if (columnNameToTypeMap.containsKey(columnName)) {
-                    if (ambiguousColumnNames.contains(columnName)) {
-                        unresolvedColumnNames.add(columnName);
-                    }
-                    // fall back to bare column name.
-                    sqlType = columnNameToTypeMap.get(columnName);
-                }
-                if (unresolvedColumnNames.size()>0) {
-                    StringBuilder exceptionMessage = new StringBuilder();
-                    boolean first = true;
-                    exceptionMessage.append("Unable to resolve these column 
names to a single column family:\n");
-                    for (String col : unresolvedColumnNames) {
-                        if (first) first = false;
-                        else exceptionMessage.append(",");
-                        exceptionMessage.append(col);
-                    }
-                    exceptionMessage.append("\nAvailable columns with column 
families:\n");
-                    first = true;
-                    for (String col : fullColumnNameToTypeMap.keySet()) {
-                        if (first) first = false;
-                        else exceptionMessage.append(",");
-                        exceptionMessage.append(col);
-                    }
-                    throw new SQLException(exceptionMessage.toString());
-                }
-
-                if (sqlType == null) {
-                    if (strict) {
-                        throw new SQLExceptionInfo.Builder(
-                                SQLExceptionCode.COLUMN_NOT_FOUND)
-                                .setColumnName(columnName)
-                                .setTableName(tableName).build()
-                                .buildException();
-                    }
-                    unfoundColumnCount++;
-                } else {
-                    columnInfoList.add(new ColumnInfo(columnName, sqlType));
-                }
-            }
-            if (unfoundColumnCount == columns.size()) {
-                throw new SQLExceptionInfo.Builder(
-                        SQLExceptionCode.COLUMN_NOT_FOUND)
-                        .setColumnName(
-                                Arrays.toString(columns.toArray(new 
String[0])))
-                        .setTableName(tableName).build().buildException();
-            }
-        }
-        return columnInfoList;
+        return SchemaUtil.generateColumnInfo(conn, tableName, columns, 
isStrict);
     }
 
-    static class CsvUpsertListener implements CsvUpsertExecutor.UpsertListener 
{
+    static class CsvUpsertListener implements 
UpsertExecutor.UpsertListener<CSVRecord> {
 
         private final PhoenixConnection conn;
         private final int upsertBatchSize;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index 1693600..cb8aced 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -17,21 +17,20 @@
  */
 package org.apache.phoenix.util;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Strings.isNullOrEmpty;
-import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
-import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES;
-import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES;
-
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
-
+import java.util.Set;
+import java.util.TreeSet;
 import javax.annotation.Nullable;
 
 import org.apache.hadoop.hbase.KeyValue;
@@ -64,6 +63,15 @@ import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES;
 
 /**
  * 
@@ -757,4 +765,125 @@ public class SchemaUtil {
     public static byte getSeparatorByte(boolean rowKeyOrderOptimizable, 
boolean isNullValue, Expression e) {
         return getSeparatorByte(rowKeyOrderOptimizable, isNullValue, 
e.getSortOrder());
     }
+
+    /**
+     * Get list of ColumnInfos that contain Column Name and its associated
+     * PDataType for an import. The supplied list of columns can be null -- if 
it is non-null,
+     * it represents a user-supplied list of columns to be imported.
+     *
+     * @param conn Phoenix connection from which metadata will be read
+     * @param tableName Phoenix table name whose columns are to be checked. 
Can include a schema
+     *                  name
+     * @param columns user-supplied list of import columns, can be null
+     * @param strict if true, an exception will be thrown if unknown columns 
are supplied
+     */
+    public static List<ColumnInfo> generateColumnInfo(Connection conn,
+                                                      String tableName, 
List<String> columns, boolean strict)
+            throws SQLException {
+        Map<String, Integer> columnNameToTypeMap = Maps.newLinkedHashMap();
+        Set<String> ambiguousColumnNames = new HashSet<String>();
+        Map<String, Integer> fullColumnNameToTypeMap = Maps.newLinkedHashMap();
+        DatabaseMetaData dbmd = conn.getMetaData();
+        int unfoundColumnCount = 0;
+        // TODO: escape wildcard characters here because we don't want that
+        // behavior here
+        String escapedTableName = StringUtil.escapeLike(tableName);
+        String[] schemaAndTable = escapedTableName.split("\\.");
+        ResultSet rs = null;
+        try {
+            rs = dbmd.getColumns(null, (schemaAndTable.length == 1 ? ""
+                            : schemaAndTable[0]),
+                    (schemaAndTable.length == 1 ? escapedTableName
+                            : schemaAndTable[1]), null);
+            while (rs.next()) {
+                String colName = rs.getString(QueryUtil.COLUMN_NAME_POSITION);
+                String colFam = rs.getString(QueryUtil.COLUMN_FAMILY_POSITION);
+
+                // use family qualifier, if available, otherwise, use column 
name
+                String fullColumn = 
(colFam==null?colName:String.format("%s.%s",colFam,colName));
+                String sqlTypeName = 
rs.getString(QueryUtil.DATA_TYPE_NAME_POSITION);
+
+                // allow for both bare and family qualified names.
+                if (columnNameToTypeMap.keySet().contains(colName)) {
+                    ambiguousColumnNames.add(colName);
+                }
+                columnNameToTypeMap.put(
+                        colName,
+                        PDataType.fromSqlTypeName(sqlTypeName).getSqlType());
+                fullColumnNameToTypeMap.put(
+                        fullColumn,
+                        PDataType.fromSqlTypeName(sqlTypeName).getSqlType());
+            }
+            if (columnNameToTypeMap.isEmpty()) {
+                throw new IllegalArgumentException("Table " + tableName + " 
not found");
+            }
+        } finally {
+            if (rs != null) {
+                rs.close();
+            }
+        }
+        List<ColumnInfo> columnInfoList = Lists.newArrayList();
+        Set<String> unresolvedColumnNames = new TreeSet<String>();
+        if (columns == null) {
+            // use family qualified names by default, if no columns are 
specified.
+            for (Map.Entry<String, Integer> entry : fullColumnNameToTypeMap
+                    .entrySet()) {
+                columnInfoList.add(new ColumnInfo(entry.getKey(), 
entry.getValue()));
+            }
+        } else {
+            // Leave "null" as indication to skip b/c it doesn't exist
+            for (int i = 0; i < columns.size(); i++) {
+                String columnName = columns.get(i).trim();
+                Integer sqlType = null;
+                if (fullColumnNameToTypeMap.containsKey(columnName)) {
+                    sqlType = fullColumnNameToTypeMap.get(columnName);
+                } else if (columnNameToTypeMap.containsKey(columnName)) {
+                    if (ambiguousColumnNames.contains(columnName)) {
+                        unresolvedColumnNames.add(columnName);
+                    }
+                    // fall back to bare column name.
+                    sqlType = columnNameToTypeMap.get(columnName);
+                }
+                if (unresolvedColumnNames.size()>0) {
+                    StringBuilder exceptionMessage = new StringBuilder();
+                    boolean first = true;
+                    exceptionMessage.append("Unable to resolve these column 
names to a single column family:\n");
+                    for (String col : unresolvedColumnNames) {
+                        if (first) first = false;
+                        else exceptionMessage.append(",");
+                        exceptionMessage.append(col);
+                    }
+                    exceptionMessage.append("\nAvailable columns with column 
families:\n");
+                    first = true;
+                    for (String col : fullColumnNameToTypeMap.keySet()) {
+                        if (first) first = false;
+                        else exceptionMessage.append(",");
+                        exceptionMessage.append(col);
+                    }
+                    throw new SQLException(exceptionMessage.toString());
+                }
+
+                if (sqlType == null) {
+                    if (strict) {
+                        throw new SQLExceptionInfo.Builder(
+                                SQLExceptionCode.COLUMN_NOT_FOUND)
+                                .setColumnName(columnName)
+                                .setTableName(tableName).build()
+                                .buildException();
+                    }
+                    unfoundColumnCount++;
+                } else {
+                    columnInfoList.add(new ColumnInfo(columnName, sqlType));
+                }
+            }
+            if (unfoundColumnCount == columns.size()) {
+                throw new SQLExceptionInfo.Builder(
+                        SQLExceptionCode.COLUMN_NOT_FOUND)
+                        .setColumnName(
+                                Arrays.toString(columns.toArray(new 
String[0])))
+                        .setTableName(tableName).build().buildException();
+            }
+        }
+        return columnInfoList;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/util/UpsertExecutor.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/UpsertExecutor.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/UpsertExecutor.java
new file mode 100644
index 0000000..d9ce5f2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpsertExecutor.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.schema.types.PDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+
+/**
+ * Executes upsert statements on a provided {@code PreparedStatement} based on 
incoming
+ * {@code RECORDS}. An {@link UpsertListener} is notified each time the 
prepared statement
+ * is executed.
+ */
+public abstract class UpsertExecutor<RECORD, FIELD> implements Closeable {
+
+    /**
+     * A listener that is called for events based on incoming JSON data.
+     */
+    public interface UpsertListener<RECORD> {
+
+        /**
+         * Called when an upsert has been sucessfully completed. The given 
upsertCount is the total number of upserts
+         * completed on the caller up to this point.
+         *
+         * @param upsertCount total number of upserts that have been completed
+         */
+        void upsertDone(long upsertCount);
+
+
+        /**
+         * Called when executing a prepared statement has failed on a given 
record.
+         *
+         * @param record the JSON record that was being upserted when the 
error occurred
+         */
+        void errorOnRecord(RECORD record, Throwable throwable);
+    }
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(UpsertExecutor.class);
+
+    protected final Connection conn;
+    protected final List<ColumnInfo> columnInfos;
+    protected final List<PDataType> dataTypes;
+    protected final List<Function<FIELD, Object>> conversionFunctions;
+    protected final PreparedStatement preparedStatement;
+    protected final UpsertListener<RECORD> upsertListener;
+    protected long upsertCount = 0L;
+    protected boolean initFinished = false; // allow subclasses to finish 
initialization
+
+    private static PreparedStatement createStatement(Connection conn, String 
tableName,
+            List<ColumnInfo> columnInfoList) {
+        PreparedStatement preparedStatement;
+        try {
+            String upsertSql = QueryUtil.constructUpsertStatement(tableName, 
columnInfoList);
+            LOG.info("Upserting SQL data with {}", upsertSql);
+            preparedStatement = conn.prepareStatement(upsertSql);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+        return preparedStatement;
+    }
+
+    /**
+     * Construct with the definition of incoming columns, and the statement 
upon which upsert
+     * statements are to be performed.
+     */
+    public UpsertExecutor(Connection conn, String tableName,
+            List<ColumnInfo> columnInfoList, UpsertListener<RECORD> 
upsertListener) {
+        this(conn, columnInfoList, createStatement(conn, tableName, 
columnInfoList), upsertListener);
+    }
+
+    /** Testing constructor. Do not use in prod. */
+    @VisibleForTesting
+    protected UpsertExecutor(Connection conn, List<ColumnInfo> columnInfoList,
+            PreparedStatement preparedStatement, UpsertListener<RECORD> 
upsertListener) {
+        this.conn = conn;
+        this.upsertListener = upsertListener;
+        this.columnInfos = columnInfoList;
+        this.preparedStatement = preparedStatement;
+        this.dataTypes = Lists.newArrayList();
+        this.conversionFunctions = Lists.newArrayList();
+    }
+
+    /**
+     * Awkward protocol allows subclass constructors to finish initializing 
context before
+     * proceeding to record processing.
+     */
+    protected void finishInit() {
+        for (ColumnInfo columnInfo : columnInfos) {
+            PDataType dataType = PDataType.fromTypeId(columnInfo.getSqlType());
+            dataTypes.add(dataType);
+            conversionFunctions.add(createConversionFunction(dataType));
+        }
+        this.initFinished = true;
+    }
+
+    /**
+     * Execute upserts for each JSON record contained in the given iterable, 
notifying this instance's
+     * {@code UpsertListener} for each completed upsert.
+     *
+     * @param records iterable of JSON records to be upserted
+     */
+    public void execute(Iterable<RECORD> records) {
+        if (!initFinished) {
+            finishInit();
+        }
+        for (RECORD record : records) {
+            execute(record);
+        }
+    }
+
+    /**
+     * Upsert a single record.
+     *
+     * @param record JSON record containing the data to be upserted
+     */
+    protected abstract void execute(RECORD record);
+
+    @Override
+    public void close() throws IOException {
+        try {
+            preparedStatement.close();
+        } catch (SQLException e) {
+            // An exception while closing the prepared statement is most 
likely a sign of a real problem, so we don't
+            // want to hide it with closeQuietly or something similar
+            throw new RuntimeException(e);
+        }
+    }
+
+    protected abstract Function<FIELD, Object> 
createConversionFunction(PDataType dataType);
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java
index e680f5c..4a3af21 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java
@@ -17,132 +17,55 @@
  */
 package org.apache.phoenix.util.csv;
 
-import java.io.Closeable;
-import java.io.IOException;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.Types;
 import java.util.List;
 import java.util.Properties;
-
 import javax.annotation.Nullable;
 
 import org.apache.commons.csv.CSVRecord;
-import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PTimestamp;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.DateUtil;
-import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.UpsertExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
-import com.google.common.collect.Lists;
 
-/**
- * Executes upsert statements on a provided {@code PreparedStatement} based on 
incoming CSV records, notifying a
- * listener each time the prepared statement is executed.
- */
-public class CsvUpsertExecutor implements Closeable {
+/** {@link UpsertExecutor} over {@link CSVRecord}s. */
+public class CsvUpsertExecutor extends UpsertExecutor<CSVRecord, String> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(CsvUpsertExecutor.class);
 
-    private final String arrayElementSeparator;
-    private final Connection conn;
-    private final List<PDataType> dataTypes;
-    private final List<Function<String,Object>> conversionFunctions;
-    private final PreparedStatement preparedStatement;
-    private final UpsertListener upsertListener;
-    private long upsertCount = 0L;
-
-    /**
-     * A listener that is called for events based on incoming CSV data.
-     */
-    public static interface UpsertListener {
-
-        /**
-         * Called when an upsert has been sucessfully completed. The given 
upsertCount is the total number of upserts
-         * completed on the caller up to this point.
-         *
-         * @param upsertCount total number of upserts that have been completed
-         */
-        void upsertDone(long upsertCount);
-
-
-        /**
-         * Called when executing a prepared statement has failed on a given 
record.
-         *
-         * @param csvRecord the CSV record that was being upserted when the 
error occurred
-         */
-        void errorOnRecord(CSVRecord csvRecord, Throwable throwable);
-    }
-
-
-    /**
-     * Static constructor method for creating a CsvUpsertExecutor.
-     *
-     * @param conn Phoenix connection upon which upserts are to be performed
-     * @param tableName name of the table in which upserts are to be performed
-     * @param columnInfoList description of the columns to be upserted to, in 
the same order as in the CSV input
-     * @param upsertListener listener that will be notified of upserts, can be 
null
-     * @param arrayElementSeparator separator string to delimit string 
representations of arrays
-     * @return the created CsvUpsertExecutor
-     */
-    public static CsvUpsertExecutor create(PhoenixConnection conn, String 
tableName, List<ColumnInfo> columnInfoList,
-            UpsertListener upsertListener, String arrayElementSeparator) {
-        PreparedStatement preparedStatement = null;
-        try {
-            String upsertSql = QueryUtil.constructUpsertStatement(tableName, 
columnInfoList);
-            LOG.info("Upserting SQL data with {}", upsertSql);
-            preparedStatement = conn.prepareStatement(upsertSql);
-        } catch (SQLException e) {
-            throw new RuntimeException(e);
-        }
-        return new CsvUpsertExecutor(conn, columnInfoList, preparedStatement, 
upsertListener,
-                arrayElementSeparator);
-    }
+    protected final String arrayElementSeparator;
 
-    /**
-     * Construct with the definition of incoming columns, and the statement 
upon which upsert statements
-     * are to be performed.
-     */
-    CsvUpsertExecutor(Connection conn, List<ColumnInfo> columnInfoList, 
PreparedStatement preparedStatement,
-            UpsertListener upsertListener, String arrayElementSeparator) {
-        this.conn = conn;
-        this.preparedStatement = preparedStatement;
-        this.upsertListener = upsertListener;
+    /** Testing constructor. Do not use in prod. */
+    @VisibleForTesting
+    protected CsvUpsertExecutor(Connection conn, List<ColumnInfo> 
columnInfoList,
+            PreparedStatement stmt, UpsertListener<CSVRecord> upsertListener,
+            String arrayElementSeparator) {
+        super(conn, columnInfoList, stmt, upsertListener);
         this.arrayElementSeparator = arrayElementSeparator;
-        this.dataTypes = Lists.newArrayList();
-        this.conversionFunctions = Lists.newArrayList();
-        for (ColumnInfo columnInfo : columnInfoList) {
-            PDataType dataType = PDataType.fromTypeId(columnInfo.getSqlType());
-            dataTypes.add(dataType);
-            conversionFunctions.add(createConversionFunction(dataType));
-        }
+        finishInit();
     }
 
-    /**
-     * Execute upserts for each CSV record contained in the given iterable, 
notifying this instance's
-     * {@code UpsertListener} for each completed upsert.
-     *
-     * @param csvRecords iterable of CSV records to be upserted
-     */
-    public void execute(Iterable<CSVRecord> csvRecords) {
-        for (CSVRecord csvRecord : csvRecords) {
-            execute(csvRecord);
-        }
+    public CsvUpsertExecutor(Connection conn, String tableName,
+            List<ColumnInfo> columnInfoList, UpsertListener<CSVRecord> 
upsertListener,
+            String arrayElementSeparator) {
+        super(conn, tableName, columnInfoList, upsertListener);
+        this.arrayElementSeparator = arrayElementSeparator;
+        finishInit();
     }
 
-    /**
-     * Upsert a single record.
-     *
-     * @param csvRecord CSV record containing the data to be upserted
-     */
-    void execute(CSVRecord csvRecord) {
+    @Override
+    protected void execute(CSVRecord csvRecord) {
         try {
             if (csvRecord.size() < conversionFunctions.size()) {
                 String message = String.format("CSV record does not have 
enough values (has %d, but needs %d)",
@@ -170,17 +93,7 @@ public class CsvUpsertExecutor implements Closeable {
     }
 
     @Override
-    public void close() throws IOException {
-        try {
-            preparedStatement.close();
-        } catch (SQLException e) {
-            // An exception while closing the prepared statement is most 
likely a sign of a real problem, so we don't
-            // want to hide it with closeQuietly or something similar
-            throw new RuntimeException(e);
-        }
-    }
-
-    private Function<String, Object> createConversionFunction(PDataType 
dataType) {
+    protected Function<String, Object> createConversionFunction(PDataType 
dataType) {
         if (dataType.isArrayType()) {
             return new ArrayDatatypeConversionFunction(
                     new StringToArrayConverter(
@@ -201,7 +114,7 @@ public class CsvUpsertExecutor implements Closeable {
         private final DateUtil.DateTimeParser dateTimeParser;
 
         SimpleDatatypeConversionFunction(PDataType dataType, Connection conn) {
-            Properties props = null;
+            Properties props;
             try {
                 props = conn.getClientInfo();
             } catch (SQLException e) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/util/json/JsonUpsertExecutor.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/json/JsonUpsertExecutor.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/util/json/JsonUpsertExecutor.java
new file mode 100644
index 0000000..bbe0e30
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/util/json/JsonUpsertExecutor.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util.json;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import javax.annotation.Nullable;
+
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PTimestamp;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.DateUtil;
+import org.apache.phoenix.util.UpsertExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.CaseFormat;
+import com.google.common.base.Function;
+
+/** {@link UpsertExecutor} over {@link Map} objects, as parsed from JSON. */
+public class JsonUpsertExecutor extends UpsertExecutor<Map<?, ?>, Object> {
+
+    protected static final Logger LOG = 
LoggerFactory.getLogger(JsonUpsertExecutor.class);
+
+    /** Testing constructor. Do not use in prod. */
+    @VisibleForTesting
+    protected JsonUpsertExecutor(Connection conn, List<ColumnInfo> 
columnInfoList,
+            PreparedStatement stmt, UpsertListener<Map<?, ?>> upsertListener) {
+        super(conn, columnInfoList, stmt, upsertListener);
+        finishInit();
+    }
+
+    public JsonUpsertExecutor(Connection conn, String tableName, 
List<ColumnInfo> columnInfoList,
+            UpsertExecutor.UpsertListener<Map<?, ?>> upsertListener) {
+        super(conn, tableName, columnInfoList, upsertListener);
+        finishInit();
+    }
+
+    @Override
+    protected void execute(Map<?, ?> record) {
+        int fieldIndex = 0;
+        String colName = null;
+        try {
+            if (record.size() < conversionFunctions.size()) {
+                String message = String.format("JSON record does not have 
enough values (has %d, but needs %d)",
+                        record.size(), conversionFunctions.size());
+                throw new IllegalArgumentException(message);
+            }
+            for (fieldIndex = 0; fieldIndex < conversionFunctions.size(); 
fieldIndex++) {
+                colName = CaseFormat.UPPER_UNDERSCORE.to(
+                        CaseFormat.LOWER_UNDERSCORE, 
columnInfos.get(fieldIndex).getColumnName());
+                if (colName.contains(".")) {
+                    StringBuilder sb = new StringBuilder();
+                    String[] parts = colName.split("\\.");
+                    // assume first part is the column family name; omita
+                    for (int i = 1; i < parts.length; i++) {
+                        sb.append(parts[i]);
+                        if (i != parts.length - 1) {
+                            sb.append(".");
+                        }
+                    }
+                    colName = sb.toString();
+                }
+                if (colName.contains("\"")) {
+                    colName = colName.replace("\"", "");
+                }
+                Object sqlValue = 
conversionFunctions.get(fieldIndex).apply(record.get(colName));
+                if (sqlValue != null) {
+                    preparedStatement.setObject(fieldIndex + 1, sqlValue);
+                } else {
+                    preparedStatement.setNull(fieldIndex + 1, 
dataTypes.get(fieldIndex).getSqlType());
+                }
+            }
+            preparedStatement.execute();
+            upsertListener.upsertDone(++upsertCount);
+        } catch (Exception e) {
+            if (LOG.isDebugEnabled()) {
+                // Even though this is an error we only log it with debug 
logging because we're notifying the
+                // listener, and it can do its own logging if needed
+                LOG.debug("Error on record " + record + ", fieldIndex " + 
fieldIndex + ", colName " + colName, e);
+            }
+            upsertListener.errorOnRecord(record, new Exception("fieldIndex: " 
+ fieldIndex + ", colName " + colName, e));
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            preparedStatement.close();
+        } catch (SQLException e) {
+            // An exception while closing the prepared statement is most 
likely a sign of a real problem, so we don't
+            // want to hide it with closeQuietly or something similar
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    protected Function<Object, Object> createConversionFunction(PDataType 
dataType) {
+        if (dataType.isArrayType()) {
+            return new ArrayDatatypeConversionFunction(
+                    new ObjectToArrayConverter(
+                            conn,
+                            PDataType.fromTypeId(dataType.getSqlType() - 
PDataType.ARRAY_TYPE_BASE)));
+        } else {
+            return new SimpleDatatypeConversionFunction(dataType, this.conn);
+        }
+    }
+
+    /**
+     * Performs typed conversion from String values to a given column value 
type.
+     */
+    static class SimpleDatatypeConversionFunction implements Function<Object, 
Object> {
+
+        private final PDataType dataType;
+        private final DateUtil.DateTimeParser dateTimeParser;
+
+        SimpleDatatypeConversionFunction(PDataType dataType, Connection conn) {
+            Properties props;
+            try {
+                props = conn.getClientInfo();
+            } catch (SQLException e) {
+                throw new RuntimeException(e);
+            }
+            this.dataType = dataType;
+            if (dataType.isCoercibleTo(PTimestamp.INSTANCE)) {
+                // TODO: move to DateUtil
+                String dateFormat;
+                int dateSqlType = dataType.getResultSetSqlType();
+                if (dateSqlType == Types.DATE) {
+                    dateFormat = 
props.getProperty(QueryServices.DATE_FORMAT_ATTRIB,
+                            DateUtil.DEFAULT_DATE_FORMAT);
+                } else if (dateSqlType == Types.TIME) {
+                    dateFormat = 
props.getProperty(QueryServices.TIME_FORMAT_ATTRIB,
+                            DateUtil.DEFAULT_TIME_FORMAT);
+                } else {
+                    dateFormat = 
props.getProperty(QueryServices.TIMESTAMP_FORMAT_ATTRIB,
+                            DateUtil.DEFAULT_TIMESTAMP_FORMAT);
+                }
+                String timeZoneId = 
props.getProperty(QueryServices.DATE_FORMAT_TIMEZONE_ATTRIB,
+                        QueryServicesOptions.DEFAULT_DATE_FORMAT_TIMEZONE);
+                this.dateTimeParser = DateUtil.getDateTimeParser(dateFormat, 
dataType, timeZoneId);
+            } else {
+                this.dateTimeParser = null;
+            }
+        }
+
+        @Nullable
+        @Override
+        public Object apply(@Nullable Object input) {
+            if (input == null) {
+                return null;
+            }
+            if (dateTimeParser != null && input instanceof String) {
+                final String s = (String) input;
+                long epochTime = dateTimeParser.parseDateTime(s);
+                byte[] byteValue = new byte[dataType.getByteSize()];
+                dataType.getCodec().encodeLong(epochTime, byteValue, 0);
+                return dataType.toObject(byteValue);
+            }
+            return dataType.toObject(input, dataType);
+        }
+    }
+
+    /**
+     * Converts string representations of arrays into Phoenix arrays of the 
correct type.
+     */
+    private static class ArrayDatatypeConversionFunction implements 
Function<Object, Object> {
+
+        private final ObjectToArrayConverter arrayConverter;
+
+        private ArrayDatatypeConversionFunction(ObjectToArrayConverter 
arrayConverter) {
+            this.arrayConverter = arrayConverter;
+        }
+
+        @Nullable
+        @Override
+        public Object apply(@Nullable Object input) {
+            try {
+                return arrayConverter.toArray(input);
+            } catch (SQLException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/util/json/ObjectToArrayConverter.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/json/ObjectToArrayConverter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/util/json/ObjectToArrayConverter.java
new file mode 100644
index 0000000..16bef15
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/util/json/ObjectToArrayConverter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util.json;
+
+import java.sql.Array;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.schema.types.PDataType;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+/**
+ * Converts Objects (presumably lists) into Phoenix arrays.
+ */
+class ObjectToArrayConverter {
+
+    private final Connection conn;
+    private final PDataType elementDataType;
+    private final JsonUpsertExecutor.SimpleDatatypeConversionFunction 
elementConvertFunction;
+
+    /**
+     * Instantiate with the array value separator and data type.
+     *
+     * @param conn Phoenix connection to target database
+     * @param elementDataType datatype of the elements of arrays to be created
+     */
+    public ObjectToArrayConverter(Connection conn, PDataType elementDataType) {
+        this.conn = conn;
+        this.elementDataType = elementDataType;
+        this.elementConvertFunction =
+            new 
JsonUpsertExecutor.SimpleDatatypeConversionFunction(elementDataType, this.conn);
+    }
+
+    /**
+     * Convert an input delimited string into a phoenix array of the 
configured type.
+     *
+     * @param input string containing delimited array values
+     * @return the array containing the values represented in the input string
+     */
+    public Array toArray(Object input) throws SQLException {
+        if (input == null) {
+            return conn.createArrayOf(elementDataType.getSqlTypeName(), new 
Object[0]);
+        }
+        List<?> list = (List<?>) input;
+        if (list.isEmpty()) {
+            return conn.createArrayOf(elementDataType.getSqlTypeName(), new 
Object[0]);
+        }
+        return conn.createArrayOf(elementDataType.getSqlTypeName(),
+            Lists.newArrayList(Iterables.transform(list, 
elementConvertFunction)).toArray());
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/BulkLoadToolTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/BulkLoadToolTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/BulkLoadToolTest.java
new file mode 100644
index 0000000..95e9b43
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/BulkLoadToolTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.commons.cli.CommandLine;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class BulkLoadToolTest {
+
+    @Parameterized.Parameters
+    public static Collection<Object[]> params() {
+        return Arrays.asList(new Object[][]{
+                { new CsvBulkLoadTool() },
+                { new JsonBulkLoadTool() },
+        });
+    }
+
+    @Parameterized.Parameter(value = 0)
+    public AbstractBulkLoadTool bulkLoadTool;
+
+    @Test
+    public void testParseOptions() {
+        CommandLine cmdLine = bulkLoadTool.parseOptions(new String[] { 
"--input", "/input",
+                "--table", "mytable" });
+
+        assertEquals("mytable", 
cmdLine.getOptionValue(CsvBulkLoadTool.TABLE_NAME_OPT.getOpt()));
+        assertEquals("/input", 
cmdLine.getOptionValue(CsvBulkLoadTool.INPUT_PATH_OPT.getOpt()));
+    }
+
+    @Test(expected=IllegalStateException.class)
+    public void testParseOptions_ExtraArguments() {
+        bulkLoadTool.parseOptions(new String[] { "--input", "/input",
+                "--table", "mytable", "these", "shouldnt", "be", "here" });
+    }
+
+    @Test(expected=IllegalStateException.class)
+    public void testParseOptions_NoInput() {
+        bulkLoadTool.parseOptions(new String[] { "--table", "mytable" });
+    }
+
+    @Test(expected=IllegalStateException.class)
+    public void testParseOptions_NoTable() {
+        bulkLoadTool.parseOptions(new String[] { "--input", "/input" });
+    }
+
+    @Test
+    public void testGetQualifiedTableName() {
+        assertEquals("MYSCHEMA.MYTABLE", 
CsvBulkLoadTool.getQualifiedTableName("mySchema", "myTable"));
+    }
+
+    @Test
+    public void testGetQualifiedTableName_NullSchema() {
+        assertEquals("MYTABLE", CsvBulkLoadTool.getQualifiedTableName(null, 
"myTable"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
index f52a837..3c6271a 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
@@ -17,9 +17,6 @@
  */
 package org.apache.phoenix.mapreduce;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
@@ -29,11 +26,10 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
-import org.apache.phoenix.schema.types.PInteger;
-import org.apache.phoenix.util.ColumnInfo;
 import org.junit.Test;
 
-import com.google.common.collect.ImmutableList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
 public class CsvBulkImportUtilTest {
 
@@ -41,16 +37,11 @@ public class CsvBulkImportUtilTest {
     public void testInitCsvImportJob() throws IOException {
         Configuration conf = new Configuration();
 
-        String tableName = "SCHEMANAME.TABLENAME";
         char delimiter = '\001';
         char quote = '\002';
         char escape = '!';
 
-        List<ColumnInfo> columnInfoList = ImmutableList.of(
-                new ColumnInfo("MYCOL", PInteger.INSTANCE.getSqlType()));
-
-        CsvBulkImportUtil.initCsvImportJob(
-                conf, tableName, delimiter, quote, escape, null, 
columnInfoList, true);
+        CsvBulkImportUtil.initCsvImportJob(conf, delimiter, quote, escape, 
null);
 
         // Serialize and deserialize the config to ensure that there aren't 
any issues
         // with non-printable characters as delimiters
@@ -61,7 +52,6 @@ public class CsvBulkImportUtilTest {
         Configuration deserialized = new Configuration();
         deserialized.addResource(new FileInputStream(tempFile));
 
-        assertEquals(tableName, 
deserialized.get(CsvToKeyValueMapper.TABLE_NAME_CONFKEY));
         assertEquals(Character.valueOf('\001'),
                 CsvBulkImportUtil.getCharacter(deserialized, 
CsvToKeyValueMapper.FIELD_DELIMITER_CONFKEY));
         assertEquals(Character.valueOf('\002'),
@@ -69,8 +59,6 @@ public class CsvBulkImportUtilTest {
         assertEquals(Character.valueOf('!'),
                 CsvBulkImportUtil.getCharacter(deserialized, 
CsvToKeyValueMapper.ESCAPE_CHAR_CONFKEY));
         
assertNull(deserialized.get(CsvToKeyValueMapper.ARRAY_DELIMITER_CONFKEY));
-        assertEquals(columnInfoList, 
CsvToKeyValueMapper.buildColumnInfoList(deserialized));
-        assertEquals(true, 
deserialized.getBoolean(CsvToKeyValueMapper.IGNORE_INVALID_ROW_CONFKEY, false));
 
         tempFile.delete();
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolTest.java
deleted file mode 100644
index 33bb976..0000000
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.mapreduce;
-
-import org.apache.commons.cli.CommandLine;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class CsvBulkLoadToolTest {
-
-    private CsvBulkLoadTool bulkLoadTool;
-
-    @Before
-    public void setUp() {
-        bulkLoadTool = new CsvBulkLoadTool();
-    }
-
-    @Test
-    public void testParseOptions() {
-        CommandLine cmdLine = bulkLoadTool.parseOptions(new String[] { 
"--input", "/input",
-                "--table", "mytable" });
-
-        assertEquals("mytable", 
cmdLine.getOptionValue(CsvBulkLoadTool.TABLE_NAME_OPT.getOpt()));
-        assertEquals("/input", 
cmdLine.getOptionValue(CsvBulkLoadTool.INPUT_PATH_OPT.getOpt()));
-    }
-
-    @Test(expected=IllegalStateException.class)
-    public void testParseOptions_ExtraArguments() {
-        bulkLoadTool.parseOptions(new String[] { "--input", "/input",
-                "--table", "mytable", "these", "shouldnt", "be", "here" });
-    }
-
-    @Test(expected=IllegalStateException.class)
-    public void testParseOptions_NoInput() {
-        bulkLoadTool.parseOptions(new String[] { "--table", "mytable" });
-    }
-
-    @Test(expected=IllegalStateException.class)
-    public void testParseOptions_NoTable() {
-        bulkLoadTool.parseOptions(new String[] { "--input", "/input" });
-    }
-
-    @Test
-    public void testGetQualifiedTableName() {
-        assertEquals("MYSCHEMA.MYTABLE", 
CsvBulkLoadTool.getQualifiedTableName("mySchema", "myTable"));
-    }
-
-    @Test
-    public void testGetQualifiedTableName_NullSchema() {
-        assertEquals("MYTABLE", CsvBulkLoadTool.getQualifiedTableName(null, 
"myTable"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java
index dc6f497..fe4e068 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java
@@ -17,25 +17,13 @@
  */
 package org.apache.phoenix.mapreduce;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 import java.io.IOException;
-import java.util.List;
 
 import org.apache.commons.csv.CSVRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
-import org.apache.phoenix.schema.types.PInteger;
-import org.apache.phoenix.schema.types.PIntegerArray;
-import org.apache.phoenix.schema.types.PUnsignedInt;
-import org.apache.phoenix.util.ColumnInfo;
 import org.junit.Test;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class CsvToKeyValueMapperTest {
 
@@ -62,72 +50,4 @@ public class CsvToKeyValueMapperTest {
         assertTrue(parsed.isConsistent());
         assertEquals(1, parsed.getRecordNumber());
     }
-
-
-    @Test
-    public void testBuildColumnInfoList() {
-        List<ColumnInfo> columnInfoList = ImmutableList.of(
-                new ColumnInfo("idCol", PInteger.INSTANCE.getSqlType()),
-                new ColumnInfo("unsignedIntCol", 
PUnsignedInt.INSTANCE.getSqlType()),
-                new ColumnInfo("stringArrayCol", 
PIntegerArray.INSTANCE.getSqlType()));
-
-        Configuration conf = new Configuration();
-        CsvToKeyValueMapper.configureColumnInfoList(conf, columnInfoList);
-        List<ColumnInfo> fromConfig = 
CsvToKeyValueMapper.buildColumnInfoList(conf);
-
-        assertEquals(columnInfoList, fromConfig);
-    }
-
-    @Test
-    public void testBuildColumnInfoList_ContainingNulls() {
-        // A null value in the column info list means "skip that column in the 
input"
-        List<ColumnInfo> columnInfoListWithNull = Lists.newArrayList(
-                new ColumnInfo("idCol", PInteger.INSTANCE.getSqlType()),
-                null,
-                new ColumnInfo("unsignedIntCol", 
PUnsignedInt.INSTANCE.getSqlType()),
-                new ColumnInfo("stringArrayCol", 
PIntegerArray.INSTANCE.getSqlType()));
-
-        Configuration conf = new Configuration();
-        CsvToKeyValueMapper.configureColumnInfoList(conf, 
columnInfoListWithNull);
-        List<ColumnInfo> fromConfig = 
CsvToKeyValueMapper.buildColumnInfoList(conf);
-
-        assertEquals(columnInfoListWithNull, fromConfig);
-    }
-
-    @Test
-    public void testLoadPreUpdateProcessor() {
-        Configuration conf = new Configuration();
-        conf.setClass(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, 
MockUpsertProcessor.class,
-                ImportPreUpsertKeyValueProcessor.class);
-
-        ImportPreUpsertKeyValueProcessor processor = 
PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
-        assertEquals(MockUpsertProcessor.class, processor.getClass());
-    }
-
-    @Test
-    public void testLoadPreUpdateProcessor_NotConfigured() {
-
-        Configuration conf = new Configuration();
-        ImportPreUpsertKeyValueProcessor processor = 
PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
-
-        
assertEquals(CsvToKeyValueMapper.DefaultImportPreUpsertKeyValueProcessor.class,
-                processor.getClass());
-    }
-
-    @Test(expected=IllegalStateException.class)
-    public void testLoadPreUpdateProcessor_ClassNotFound() {
-        Configuration conf = new Configuration();
-        conf.set(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, 
"MyUndefinedClass");
-
-        PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
-    }
-
-
-    static class MockUpsertProcessor implements 
ImportPreUpsertKeyValueProcessor {
-
-        @Override
-        public List<KeyValue> preUpsert(byte[] rowKey, List<KeyValue> 
keyValues) {
-            throw new UnsupportedOperationException("Not yet implemented");
-        }
-    }
 }

Reply via email to