PHOENIX-2967 CSV BulkLoad should properly handle empty family for logical 
tables.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b538c1a9
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b538c1a9
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b538c1a9

Branch: refs/heads/4.x-HBase-1.0
Commit: b538c1a95c7b8c589aa1e42cab9e7a7967d6f3e2
Parents: 9b8f274
Author: Sergey Soldatov <s...@apache.org>
Authored: Mon Jun 6 14:30:04 2016 -0700
Committer: Sergey Soldatov <s...@apache.org>
Committed: Mon Jun 6 23:39:54 2016 -0700

----------------------------------------------------------------------
 .../mapreduce/FormatToBytesWritableMapper.java  | 15 +++++++---
 .../mapreduce/FormatToKeyValueReducer.java      | 30 +++++++-------------
 2 files changed, 22 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/b538c1a9/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
index eb0e3ed..a736fc4 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
@@ -44,11 +44,13 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
 import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.UpsertExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -216,12 +218,17 @@ public abstract class FormatToBytesWritableMapper<RECORD> 
extends Mapper<LongWri
                 if (c.getFamilyName() != null)  // Skip PK column
                     family = c.getFamilyName().getBytes();
                 byte[] name = c.getName().getBytes();
-                byte[] cfn = Bytes.add(family,":".getBytes(), name);
+                byte[] cfn = Bytes.add(family, 
QueryConstants.NAMESPACE_SEPARATOR_BYTES, name);
                 if (!columnIndexes.containsKey(cfn)) {
                     columnIndexes.put(cfn, new Integer(columnIndex));
                     columnIndex++;
                 }
             }
+            byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table);
+            byte[] cfn = Bytes.add(emptyColumnFamily, 
QueryConstants.NAMESPACE_SEPARATOR_BYTES,
+                    QueryConstants.EMPTY_COLUMN_BYTES);
+            columnIndexes.put(cfn, new Integer(columnIndex));
+            columnIndex++;
         }
     }
 
@@ -232,16 +239,16 @@ public abstract class FormatToBytesWritableMapper<RECORD> 
extends Mapper<LongWri
      * @param cell       KeyValue for the column
      * @return column index for the specified cell or -1 if was not found
      */
-    private int findIndex(Cell cell) {
+    private int findIndex(Cell cell) throws IOException {
         byte[] familyName = Bytes.copy(cell.getFamilyArray(), 
cell.getFamilyOffset(),
                 cell.getFamilyLength());
         byte[] name = Bytes.copy(cell.getQualifierArray(), 
cell.getQualifierOffset(),
                 cell.getQualifierLength());
-        byte[] cfn = Bytes.add(familyName, ":".getBytes(), name);
+        byte[] cfn = Bytes.add(familyName, 
QueryConstants.NAMESPACE_SEPARATOR_BYTES, name);
         if(columnIndexes.containsKey(cfn)) {
             return columnIndexes.get(cfn);
         }
-        return -1;
+        throw new IOException("Unable to map cell to column index");
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b538c1a9/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
index aa807c4..15d6d2f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
@@ -65,7 +64,6 @@ public class FormatToKeyValueReducer
     protected List<String> logicalNames;
     protected KeyValueBuilder builder;
     private Map<Integer, Pair<byte[], byte[]>> columnIndexes;
-    private Map<String, ImmutableBytesPtr> emptyFamilyName;
 
 
     @Override
@@ -91,13 +89,11 @@ public class FormatToKeyValueReducer
     }
 
     private void initColumnsMap(PhoenixConnection conn) throws SQLException {
-        Map <byte[], Integer> indexMap = new TreeMap(Bytes.BYTES_COMPARATOR);
-        emptyFamilyName = new HashMap<>();
+        Map<byte[], Integer> indexMap = new TreeMap(Bytes.BYTES_COMPARATOR);
         columnIndexes = new HashMap<>();
         int columnIndex = 0;
-        for(int index = 0; index < logicalNames.size(); index++) {
+        for (int index = 0; index < logicalNames.size(); index++) {
             PTable table = PhoenixRuntime.getTable(conn, 
logicalNames.get(index));
-            emptyFamilyName.put(tableNames.get(index), 
SchemaUtil.getEmptyColumnFamilyPtr(table));
             List<PColumn> cls = table.getColumns();
             for (int i = 0; i < cls.size(); i++) {
                 PColumn c = cls.get(i);
@@ -106,7 +102,7 @@ public class FormatToKeyValueReducer
                     family = c.getFamilyName().getBytes();
                 }
                 byte[] name = c.getName().getBytes();
-                byte[] cfn = Bytes.add(family,":".getBytes(), name);
+                byte[] cfn = Bytes.add(family, 
QueryConstants.NAMESPACE_SEPARATOR_BYTES, name);
                 Pair<byte[], byte[]> pair = new Pair(family, name);
                 if (!indexMap.containsKey(cfn)) {
                     indexMap.put(cfn, new Integer(columnIndex));
@@ -114,6 +110,11 @@ public class FormatToKeyValueReducer
                     columnIndex++;
                 }
             }
+            byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table);
+            Pair<byte[], byte[]> pair = new Pair(emptyColumnFamily, 
QueryConstants
+                    .EMPTY_COLUMN_BYTES);
+            columnIndexes.put(new Integer(columnIndex), pair);
+            columnIndex++;
         }
     }
 
@@ -131,18 +132,9 @@ public class FormatToKeyValueReducer
                 ImmutableBytesWritable family;
                 ImmutableBytesWritable name;
                 ImmutableBytesWritable value = 
QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR;
-                if (index == -1) {
-                    family = emptyFamilyName.get(key.getTableName());
-                    name = QueryConstants.EMPTY_COLUMN_BYTES_PTR;
-                } else {
-                    Pair<byte[], byte[]> pair = columnIndexes.get(index);
-                    if(pair.getFirst() != null) {
-                        family = new ImmutableBytesWritable(pair.getFirst());
-                    } else {
-                        family = emptyFamilyName.get(key.getTableName());
-                    }
-                    name = new ImmutableBytesWritable(pair.getSecond());
-                }
+                Pair<byte[], byte[]> pair = columnIndexes.get(index);
+                family = new ImmutableBytesWritable(pair.getFirst());
+                name = new ImmutableBytesWritable(pair.getSecond());
                 int len = WritableUtils.readVInt(input);
                 if (len > 0) {
                     byte[] array = new byte[len];

Reply via email to