Repository: phoenix Updated Branches: refs/heads/4.x-HBase-0.98 09747fc42 -> 9569cb521 refs/heads/4.x-HBase-1.0 4d07528a7 -> ec44a57cc refs/heads/4.x-HBase-1.1 e36c9c62e -> 05dc0e320 refs/heads/master 789e66560 -> 18da4a046
PHOENIX-2925 CsvBulkloadTool not working properly if there are multiple local indexes to the same table(After PHOENIX-1973) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/18da4a04 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/18da4a04 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/18da4a04 Branch: refs/heads/master Commit: 18da4a0460967ce28fcdeaad4a1b90440ff36f00 Parents: 789e665 Author: Sergey Soldatov <sergey.solda...@gmail.com> Authored: Mon May 23 02:22:45 2016 -0700 Committer: Sergey Soldatov <sergey.solda...@gmail.com> Committed: Mon May 23 10:38:09 2016 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/CsvBulkLoadToolIT.java | 4 + .../mapreduce/FormatToBytesWritableMapper.java | 111 +++++++++---------- .../mapreduce/FormatToKeyValueReducer.java | 75 +++++++------ 3 files changed, 100 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/18da4a04/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java index 1e9c1d9..8968555 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java @@ -277,6 +277,10 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterHBaseManagedTimeIT { assertEquals(2, rs.getInt(1)); assertEquals("FirstName 2", rs.getString(2)); + rs = stmt.executeQuery("SELECT LAST_NAME FROM TABLE6 where last_name='LastName 1'"); + assertTrue(rs.next()); + assertEquals("LastName 1", rs.getString(1)); + rs.close(); stmt.close(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/18da4a04/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java index ff21d4f..eb0e3ed 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java @@ -44,7 +44,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair; import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; -import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.util.ColumnInfo; @@ -108,7 +107,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri /* lookup table for column index. Index in the List matches to the index in tableNames List */ - protected List<Map<byte[], Map<byte[], Integer>>> columnIndexes; + protected Map<byte[], Integer> columnIndexes; protected abstract UpsertExecutor<RECORD,?> buildUpsertExecutor(Configuration conf); protected abstract LineParser<RECORD> getLineParser(); @@ -135,7 +134,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf); logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf); - columnIndexes = initColumnIndexes(); + initColumnIndexes(); } catch (SQLException | ClassNotFoundException e) { throw new RuntimeException(e); } @@ -193,7 +192,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri int tableIndex = rowEntry.getKey(); List<KeyValue> lkv = rowEntry.getValue(); // All KV values combines to a single byte array - writeAggregatedRow(context, tableIndex, lkv); + writeAggregatedRow(context, tableNames.get(tableIndex), lkv); } conn.rollback(); } catch (Exception e) { @@ -201,99 +200,99 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri } } - private List<Map<byte[], Map<byte[], Integer>>> initColumnIndexes() throws SQLException { - List<Map<byte[], Map<byte[], Integer>>> tableMap = new ArrayList<>(); - int tableIndex; - for (tableIndex = 0; tableIndex < tableNames.size(); tableIndex++) { - PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(tableIndex)); - Map<byte[], Map<byte[], Integer>> columnMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + /* + Map all unique pairs <family, name> to index. Table name is part of TableRowkey, so we do + not care about it + */ + private void initColumnIndexes() throws SQLException { + columnIndexes = new TreeMap(Bytes.BYTES_COMPARATOR); + int columnIndex = 0; + for(int index = 0; index < logicalNames.size(); index++) { + PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(index)); List<PColumn> cls = table.getColumns(); for (int i = 0; i < cls.size(); i++) { PColumn c = cls.get(i); - if (c.getFamilyName() == null) continue; // Skip PK column - byte[] family = c.getFamilyName().getBytes(); + byte[] family = new byte[0]; + if (c.getFamilyName() != null) // Skip PK column + family = c.getFamilyName().getBytes(); byte[] name = c.getName().getBytes(); - if (!columnMap.containsKey(family)) { - columnMap.put(family, new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR)); + byte[] cfn = Bytes.add(family,":".getBytes(), name); + if (!columnIndexes.containsKey(cfn)) { + columnIndexes.put(cfn, new Integer(columnIndex)); + columnIndex++; } - Map<byte[], Integer> qualifier = columnMap.get(family); - qualifier.put(name, i); } - tableMap.add(columnMap); } - return tableMap; } /** * Find the column index which will replace the column name in * the aggregated array and will be restored in Reducer * - * @param tableIndex Table index in tableNames list * @param cell KeyValue for the column * @return column index for the specified cell or -1 if was not found */ - private int findIndex(int tableIndex, Cell cell) { - Map<byte[], Map<byte[], Integer>> columnMap = columnIndexes.get(tableIndex); - Map<byte[], Integer> qualifiers = columnMap.get(Bytes.copy(cell.getFamilyArray(), - cell.getFamilyOffset(), cell.getFamilyLength())); - if (qualifiers != null) { - Integer result = qualifiers.get(Bytes.copy(cell.getQualifierArray(), - cell.getQualifierOffset(), cell.getQualifierLength())); - if (result != null) { - return result; - } + private int findIndex(Cell cell) { + byte[] familyName = Bytes.copy(cell.getFamilyArray(), cell.getFamilyOffset(), + cell.getFamilyLength()); + byte[] name = Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength()); + byte[] cfn = Bytes.add(familyName, ":".getBytes(), name); + if(columnIndexes.containsKey(cfn)) { + return columnIndexes.get(cfn); } return -1; } /** - * Collect all column values for the same rowKey + * Collect all column values for the same Row. RowKey may be different if indexes are involved, + * so it writes a separate record for each unique RowKey * * @param context Current mapper context - * @param tableIndex Table index in tableNames list + * @param tableName Table index in tableNames list * @param lkv List of KV values that will be combined in a single ImmutableBytesWritable * @throws IOException * @throws InterruptedException */ - private void writeAggregatedRow(Context context, int tableIndex, List<KeyValue> lkv) + private void writeAggregatedRow(Context context, String tableName, List<KeyValue> lkv) throws IOException, InterruptedException { ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); DataOutputStream outputStream = new DataOutputStream(bos); - ImmutableBytesWritable outputKey = new ImmutableBytesWritable(); + ImmutableBytesWritable outputKey =null; if (!lkv.isEmpty()) { - // All Key Values for the same row are supposed to be the same, so init rowKey only once - Cell first = lkv.get(0); - outputKey.set(first.getRowArray(), first.getRowOffset(), first.getRowLength()); for (KeyValue cell : lkv) { - if (isEmptyCell(cell)) { - continue; - } - int i = findIndex(tableIndex, cell); - if (i == -1) { - throw new IOException("No column found for KeyValue"); + if (outputKey == null || Bytes.compareTo(outputKey.get(), outputKey.getOffset(), + outputKey.getLength(), cell.getRowArray(), cell.getRowOffset(), cell + .getRowLength()) != 0) { + // This a the first RowKey or a different from previous + if (outputKey != null) { //It's a different RowKey, so we need to write it + ImmutableBytesWritable aggregatedArray = + new ImmutableBytesWritable(bos.toByteArray()); + outputStream.close(); + context.write(new TableRowkeyPair(tableName, outputKey), aggregatedArray); + } + outputKey = new ImmutableBytesWritable(cell.getRowArray(), cell.getRowOffset() + , cell.getRowLength()); + bos = new ByteArrayOutputStream(1024); + outputStream = new DataOutputStream(bos); } - WritableUtils.writeVInt(outputStream, i); + /* + The order of aggregation: type, index of column, length of value, value itself + */ outputStream.writeByte(cell.getTypeByte()); + int i = findIndex(cell); + WritableUtils.writeVInt(outputStream, i); WritableUtils.writeVInt(outputStream, cell.getValueLength()); outputStream.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + } + ImmutableBytesWritable aggregatedArray = new ImmutableBytesWritable(bos.toByteArray()); + outputStream.close(); + context.write(new TableRowkeyPair(tableName, outputKey), aggregatedArray); } - ImmutableBytesWritable aggregatedArray = new ImmutableBytesWritable(bos.toByteArray()); - outputStream.close(); - context.write(new TableRowkeyPair(tableNames.get(tableIndex), outputKey), aggregatedArray); } - protected boolean isEmptyCell(KeyValue cell) { - if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), - cell.getQualifierLength(), QueryConstants.EMPTY_COLUMN_BYTES, 0, - QueryConstants.EMPTY_COLUMN_BYTES.length) != 0) - return false; - else - return true; - } - - @Override protected void cleanup(Context context) throws IOException, InterruptedException { try { http://git-wip-us.apache.org/repos/asf/phoenix/blob/18da4a04/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java index 799b3dc..aa807c4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java @@ -21,16 +21,18 @@ import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; import java.sql.SQLException; -import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.TreeMap; import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.Reducer; @@ -42,7 +44,6 @@ import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; @@ -63,8 +64,8 @@ public class FormatToKeyValueReducer protected List<String> tableNames; protected List<String> logicalNames; protected KeyValueBuilder builder; - List<List<Pair<byte[], byte[]>>> columnIndexes; - List<ImmutableBytesPtr> emptyFamilyName; + private Map<Integer, Pair<byte[], byte[]>> columnIndexes; + private Map<String, ImmutableBytesPtr> emptyFamilyName; @Override @@ -76,7 +77,6 @@ public class FormatToKeyValueReducer for (Map.Entry<String, String> entry : conf) { clientInfos.setProperty(entry.getKey(), entry.getValue()); } - try { PhoenixConnection conn = (PhoenixConnection) QueryUtil.getConnectionOnServer(clientInfos, conf); builder = conn.getKeyValueBuilder(); @@ -84,9 +84,6 @@ public class FormatToKeyValueReducer final String logicalNamesConf = conf.get(FormatToBytesWritableMapper.LOGICAL_NAMES_CONFKEY); tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf); logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf); - - columnIndexes = new ArrayList<>(tableNames.size()); - emptyFamilyName = new ArrayList<>(); initColumnsMap(conn); } catch (SQLException | ClassNotFoundException e) { throw new RuntimeException(e); @@ -94,24 +91,30 @@ public class FormatToKeyValueReducer } private void initColumnsMap(PhoenixConnection conn) throws SQLException { - for (String tableName : logicalNames) { - PTable table = PhoenixRuntime.getTable(conn, tableName); - emptyFamilyName.add(SchemaUtil.getEmptyColumnFamilyPtr(table)); + Map <byte[], Integer> indexMap = new TreeMap(Bytes.BYTES_COMPARATOR); + emptyFamilyName = new HashMap<>(); + columnIndexes = new HashMap<>(); + int columnIndex = 0; + for(int index = 0; index < logicalNames.size(); index++) { + PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(index)); + emptyFamilyName.put(tableNames.get(index), SchemaUtil.getEmptyColumnFamilyPtr(table)); List<PColumn> cls = table.getColumns(); - List<Pair<byte[], byte[]>> list = new ArrayList(cls.size()); for (int i = 0; i < cls.size(); i++) { PColumn c = cls.get(i); - if (c.getFamilyName() == null) { - list.add(null); // Skip PK column - continue; + byte[] family = new byte[0]; + if (c.getFamilyName() != null) { + family = c.getFamilyName().getBytes(); } - byte[] family = c.getFamilyName().getBytes(); byte[] name = c.getName().getBytes(); - list.add(new Pair(family, name)); + byte[] cfn = Bytes.add(family,":".getBytes(), name); + Pair<byte[], byte[]> pair = new Pair(family, name); + if (!indexMap.containsKey(cfn)) { + indexMap.put(cfn, new Integer(columnIndex)); + columnIndexes.put(new Integer(columnIndex), pair); + columnIndex++; + } } - columnIndexes.add(list); } - } @Override @@ -119,15 +122,27 @@ public class FormatToKeyValueReducer Reducer<TableRowkeyPair, ImmutableBytesWritable, TableRowkeyPair, KeyValue>.Context context) throws IOException, InterruptedException { TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR); - int tableIndex = tableNames.indexOf(key.getTableName()); - List<Pair<byte[], byte[]>> columns = columnIndexes.get(tableIndex); + ImmutableBytesWritable rowKey = key.getRowkey(); for (ImmutableBytesWritable aggregatedArray : values) { DataInputStream input = new DataInputStream(new ByteArrayInputStream(aggregatedArray.get())); while (input.available() != 0) { - int index = WritableUtils.readVInt(input); - Pair<byte[], byte[]> pair = columns.get(index); byte type = input.readByte(); - ImmutableBytesWritable value = null; + int index = WritableUtils.readVInt(input); + ImmutableBytesWritable family; + ImmutableBytesWritable name; + ImmutableBytesWritable value = QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR; + if (index == -1) { + family = emptyFamilyName.get(key.getTableName()); + name = QueryConstants.EMPTY_COLUMN_BYTES_PTR; + } else { + Pair<byte[], byte[]> pair = columnIndexes.get(index); + if(pair.getFirst() != null) { + family = new ImmutableBytesWritable(pair.getFirst()); + } else { + family = emptyFamilyName.get(key.getTableName()); + } + name = new ImmutableBytesWritable(pair.getSecond()); + } int len = WritableUtils.readVInt(input); if (len > 0) { byte[] array = new byte[len]; @@ -138,24 +153,16 @@ public class FormatToKeyValueReducer KeyValue.Type kvType = KeyValue.Type.codeToType(type); switch (kvType) { case Put: // not null value - kv = builder.buildPut(key.getRowkey(), - new ImmutableBytesWritable(pair.getFirst()), - new ImmutableBytesWritable(pair.getSecond()), value); + kv = builder.buildPut(key.getRowkey(), family, name, value); break; case DeleteColumn: // null value - kv = builder.buildDeleteColumns(key.getRowkey(), - new ImmutableBytesWritable(pair.getFirst()), - new ImmutableBytesWritable(pair.getSecond())); + kv = builder.buildDeleteColumns(key.getRowkey(), family, name); break; default: throw new IOException("Unsupported KeyValue type " + kvType); } map.add(kv); } - KeyValue empty = builder.buildPut(key.getRowkey(), - emptyFamilyName.get(tableIndex), - QueryConstants.EMPTY_COLUMN_BYTES_PTR, ByteUtil.EMPTY_BYTE_ARRAY_PTR); - map.add(empty); Closeables.closeQuietly(input); } context.setStatus("Read " + map.getClass());