PHOENIX-3539 Fix bulkload for StorageScheme - ONE_CELL_PER_KEYVALUE_COLUMN (Samarth Jain and Ankit Singhal)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5f5662b2 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5f5662b2 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5f5662b2 Branch: refs/heads/calcite Commit: 5f5662b24dad478c9cb0917f20e2af9e6a539266 Parents: c387260 Author: Samarth <samarth.j...@salesforce.com> Authored: Tue Feb 28 18:00:58 2017 -0800 Committer: Samarth <samarth.j...@salesforce.com> Committed: Tue Feb 28 18:00:58 2017 -0800 ---------------------------------------------------------------------- .../phoenix/end2end/CsvBulkLoadToolIT.java | 36 ++++++++++++++ .../mapreduce/FormatToBytesWritableMapper.java | 51 ++++++++++++-------- .../mapreduce/FormatToKeyValueReducer.java | 44 +++++++++++------ .../flume/serializer/CsvEventSerializer.java | 2 +- 4 files changed, 97 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/5f5662b2/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java index 9103bd8..5a186a0 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java @@ -372,4 +372,40 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterIT { assertTrue(ex instanceof FileAlreadyExistsException); } } + + @Test + public void testImportInImmutableTable() throws Exception { + Statement stmt = conn.createStatement(); + stmt.execute("CREATE IMMUTABLE TABLE S.TABLE10 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, T DATE, CF1.T2 DATE, CF2.T3 DATE) "); + + FileSystem fs = FileSystem.get(getUtility().getConfiguration()); + FSDataOutputStream outputStream = fs.create(new Path("/tmp/input10.csv")); + PrintWriter printWriter = new PrintWriter(outputStream); + printWriter.println("1,Name 1,1970/01/01,1970/02/01,1970/03/01"); + printWriter.println("2,Name 2,1970/01/02,1970/02/02,1970/03/02"); + printWriter.close(); + CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool(); + csvBulkLoadTool.setConf(new Configuration(getUtility().getConfiguration())); + csvBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB, "yyyy/MM/dd"); + int exitCode = csvBulkLoadTool.run(new String[] { "--input", "/tmp/input10.csv", "--table", "table10", + "--schema", "s", "--zookeeper", zkQuorum }); + assertEquals(0, exitCode); + ResultSet rs = stmt.executeQuery("SELECT id, name, t, CF1.T2, CF2.T3 FROM s.table10 ORDER BY id"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Name 1", rs.getString(2)); + assertEquals(DateUtil.parseDate("1970-01-01"), rs.getDate(3)); + assertEquals(DateUtil.parseDate("1970-02-01"), rs.getDate(4)); + assertEquals(DateUtil.parseDate("1970-03-01"), rs.getDate(5)); + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + assertEquals("Name 2", rs.getString(2)); + assertEquals(DateUtil.parseDate("1970-01-02"), rs.getDate(3)); + assertEquals(DateUtil.parseDate("1970-02-02"), rs.getDate(4)); + assertEquals(DateUtil.parseDate("1970-03-02"), rs.getDate(5)); + assertFalse(rs.next()); + + rs.close(); + stmt.close(); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5f5662b2/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java index 278489d..1dae981 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java @@ -47,7 +47,9 @@ import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PColumnFamily; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; import org.apache.phoenix.util.ColumnInfo; import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.PhoenixRuntime; @@ -211,30 +213,41 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri private void initColumnIndexes() throws SQLException { columnIndexes = new TreeMap<>(Bytes.BYTES_COMPARATOR); int columnIndex = 0; - for(int index = 0; index < logicalNames.size(); index++) { + for (int index = 0; index < logicalNames.size(); index++) { PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(index)); - List<PColumn> cls = table.getColumns(); - for (int i = 0; i < cls.size(); i++) { - PColumn c = cls.get(i); - byte[] family = new byte[0]; - byte[] cq; - if (!SchemaUtil.isPKColumn(c)) { - family = c.getFamilyName().getBytes(); - cq = c.getColumnQualifierBytes(); - } else { - cq = c.getName().getBytes(); - } - byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq); - if (!columnIndexes.containsKey(cfn)) { + if (!table.getImmutableStorageScheme().equals(ImmutableStorageScheme.ONE_CELL_PER_COLUMN)) { + List<PColumnFamily> cfs = table.getColumnFamilies(); + for (int i = 0; i < cfs.size(); i++) { + byte[] family = cfs.get(i).getName().getBytes(); + byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, + QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES); columnIndexes.put(cfn, new Integer(columnIndex)); columnIndex++; } + } else { + List<PColumn> cls = table.getColumns(); + for (int i = 0; i < cls.size(); i++) { + PColumn c = cls.get(i); + byte[] family = new byte[0]; + byte[] cq; + if (!SchemaUtil.isPKColumn(c)) { + family = c.getFamilyName().getBytes(); + cq = c.getColumnQualifierBytes(); + } else { + cq = c.getName().getBytes(); + } + byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq); + if (!columnIndexes.containsKey(cfn)) { + columnIndexes.put(cfn, new Integer(columnIndex)); + columnIndex++; + } + } + byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table); + byte[] emptyKeyValue = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst(); + byte[] cfn = Bytes.add(emptyColumnFamily, QueryConstants.NAMESPACE_SEPARATOR_BYTES, emptyKeyValue); + columnIndexes.put(cfn, new Integer(columnIndex)); + columnIndex++; } - byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table); - byte[] emptyKeyValue = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst(); - byte[] cfn = Bytes.add(emptyColumnFamily, QueryConstants.NAMESPACE_SEPARATOR_BYTES, emptyKeyValue); - columnIndexes.put(cfn, new Integer(columnIndex)); - columnIndex++; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/5f5662b2/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java index c529afe..07cf285 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java @@ -42,7 +42,9 @@ import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair; import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PColumnFamily; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.PhoenixRuntime; @@ -95,25 +97,35 @@ public class FormatToKeyValueReducer int columnIndex = 0; for (int index = 0; index < logicalNames.size(); index++) { PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(index)); - List<PColumn> cls = table.getColumns(); - for (int i = 0; i < cls.size(); i++) { - PColumn c = cls.get(i); - byte[] family = new byte[0]; - byte[] cq; - if (!SchemaUtil.isPKColumn(c)) { - family = c.getFamilyName().getBytes(); - cq = c.getColumnQualifierBytes(); - } else { - // TODO: samarth verify if this is the right thing to do here. - cq = c.getName().getBytes(); - } - byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq); - Pair<byte[], byte[]> pair = new Pair<>(family, cq); - if (!indexMap.containsKey(cfn)) { - indexMap.put(cfn, new Integer(columnIndex)); + if (!table.getImmutableStorageScheme().equals(ImmutableStorageScheme.ONE_CELL_PER_COLUMN)) { + List<PColumnFamily> cfs = table.getColumnFamilies(); + for (int i = 0; i < cfs.size(); i++) { + byte[] family = cfs.get(i).getName().getBytes(); + Pair<byte[], byte[]> pair = new Pair<>(family, + QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES); columnIndexes.put(new Integer(columnIndex), pair); columnIndex++; } + } else { + List<PColumn> cls = table.getColumns(); + for (int i = 0; i < cls.size(); i++) { + PColumn c = cls.get(i); + byte[] family = new byte[0]; + byte[] cq; + if (!SchemaUtil.isPKColumn(c)) { + family = c.getFamilyName().getBytes(); + cq = c.getColumnQualifierBytes(); + } else { + cq = c.getName().getBytes(); + } + byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq); + Pair<byte[], byte[]> pair = new Pair<>(family, cq); + if (!indexMap.containsKey(cfn)) { + indexMap.put(cfn, new Integer(columnIndex)); + columnIndexes.put(new Integer(columnIndex), pair); + columnIndex++; + } + } } byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table); byte[] emptyKeyValue = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/5f5662b2/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java ---------------------------------------------------------------------- diff --git a/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java index 1521084..a856c3e 100644 --- a/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java +++ b/phoenix-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java @@ -189,7 +189,7 @@ public class CsvEventSerializer extends BaseEventSerializer { public CSVRecord parse(String input) throws IOException { CSVParser csvParser = new CSVParser(new StringReader(input), this.csvFormat); - return ((CSVRecord) Iterables.getFirst(csvParser, null)); + return Iterables.getFirst(csvParser, null); } }