PHOENIX-2565 Store data for immutable tables in single KeyValue (Thomas DSilva)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/920be8fe Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/920be8fe Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/920be8fe Branch: refs/heads/encodecolumns Commit: 920be8fecfcbf2170ba040b3b52d1fcd0a1622fd Parents: cc472a8 Author: Samarth <samarth.j...@salesforce.com> Authored: Mon Sep 19 16:45:50 2016 -0700 Committer: Samarth <samarth.j...@salesforce.com> Committed: Mon Sep 19 16:45:50 2016 -0700 ---------------------------------------------------------------------- .../apache/phoenix/end2end/UpsertValuesIT.java | 5 +- .../end2end/index/IndexExpressionIT.java | 7 +- .../apache/phoenix/end2end/index/IndexIT.java | 24 ++-- .../phoenix/compile/ExpressionCompiler.java | 3 +- .../apache/phoenix/compile/JoinCompiler.java | 2 +- .../apache/phoenix/compile/QueryCompiler.java | 2 +- .../compile/TupleProjectionCompiler.java | 2 +- .../apache/phoenix/compile/WhereCompiler.java | 31 +++++- .../coprocessor/BaseScannerRegionObserver.java | 1 + .../coprocessor/MetaDataEndpointImpl.java | 1 + .../UngroupedAggregateRegionObserver.java | 2 +- .../apache/phoenix/execute/BaseQueryPlan.java | 13 ++- .../apache/phoenix/execute/MutationState.java | 10 +- .../expression/ArrayColumnExpression.java | 109 +++++++++++++++++++ .../expression/ArrayConstructorExpression.java | 1 + .../phoenix/expression/ExpressionType.java | 3 +- .../phoenix/expression/LiteralExpression.java | 11 +- .../visitor/CloneExpressionVisitor.java | 6 + .../expression/visitor/ExpressionVisitor.java | 2 + ...lumnWithKeyValueColumnExpressionVisitor.java | 37 +++++++ .../StatelessTraverseAllExpressionVisitor.java | 7 +- .../StatelessTraverseNoExpressionVisitor.java | 7 +- .../filter/MultiKeyValueComparisonFilter.java | 6 + .../SingleCQKeyValueComparisonFilter.java | 3 +- .../filter/SingleKeyValueComparisonFilter.java | 8 ++ .../apache/phoenix/hbase/index/ValueGetter.java | 1 + .../apache/phoenix/index/IndexMaintainer.java | 101 ++++++++++++++--- .../apache/phoenix/index/PhoenixIndexCodec.java | 2 +- .../phoenix/iterate/BaseResultIterators.java | 7 +- .../query/ConnectionlessQueryServicesImpl.java | 1 - .../org/apache/phoenix/schema/ColumnRef.java | 7 +- .../apache/phoenix/schema/MetaDataClient.java | 16 ++- .../java/org/apache/phoenix/schema/PTable.java | 3 +- .../org/apache/phoenix/schema/PTableImpl.java | 77 ++++++++++--- .../apache/phoenix/schema/TableProperty.java | 2 +- .../apache/phoenix/schema/tuple/BaseTuple.java | 30 +++++ .../apache/phoenix/util/EncodedColumnsUtil.java | 19 +++- .../java/org/apache/phoenix/util/IndexUtil.java | 52 +++++---- .../java/org/apache/phoenix/util/ScanUtil.java | 4 +- .../org/apache/phoenix/util/SchemaUtil.java | 18 +++ .../phoenix/index/IndexMaintainerTest.java | 2 +- 41 files changed, 547 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java index 28574ed..24ca24a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java @@ -35,8 +35,11 @@ import java.sql.SQLException; import java.sql.Statement; import java.sql.Time; import java.sql.Timestamp; +import java.util.Map.Entry; +import java.util.NavigableMap; import java.util.Properties; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -839,7 +842,7 @@ public class UpsertValuesIT extends BaseClientManagedTimeIT { assertEquals("KV2", rs.getString(2)); assertFalse(rs.next()); - // Verify now that the data was correctly added to the mutable index too. + // Verify now that the data was correctly added to the immutable index too. stmt = conn.prepareStatement("SELECT KV2 FROM " + tableName + " WHERE PK2 = ? AND KV1 = ?"); stmt.setDate(1, upsertedDate); stmt.setString(2, "KV1"); http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexExpressionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexExpressionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexExpressionIT.java index 21da43a..bdb5b93 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexExpressionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexExpressionIT.java @@ -30,9 +30,14 @@ import java.sql.SQLException; import java.util.Properties; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.execute.CommitException; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.IndexUtil; @@ -1300,7 +1305,7 @@ public class IndexExpressionIT extends BaseHBaseManagedTimeIT { try { conn.createStatement().execute( "CREATE TABLE t (k VARCHAR NOT NULL PRIMARY KEY, v VARCHAR) " - + (mutable ? "IMMUTABLE_ROWS=true" : "")); + + (!mutable ? "IMMUTABLE_ROWS=true" : "")); String query = "SELECT * FROM t"; ResultSet rs = conn.createStatement().executeQuery(query); assertFalse(rs.next()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java index b7537a6..d1bb085 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java @@ -770,23 +770,23 @@ public class IndexIT extends BaseHBaseManagedTimeIT { stmt.execute(); conn.commit(); - // make sure the index is working as expected - query = "SELECT * FROM " + fullIndexName; + query = "SELECT /*+ NO_INDEX */ * FROM " + testTable; rs = conn.createStatement().executeQuery(query); assertTrue(rs.next()); - assertEquals("x", rs.getString(1)); - assertEquals("1", rs.getString(2)); - assertEquals("a", rs.getString(3)); + assertEquals("a", rs.getString(1)); + assertEquals("x", rs.getString(2)); + assertEquals("1", rs.getString(3)); assertTrue(rs.next()); - assertEquals("y", rs.getString(1)); - assertEquals("2", rs.getString(2)); - assertEquals("b", rs.getString(3)); + assertEquals("b", rs.getString(1)); + assertEquals("y", rs.getString(2)); + assertEquals("2", rs.getString(3)); assertTrue(rs.next()); - assertEquals("z", rs.getString(1)); - assertEquals("3", rs.getString(2)); - assertEquals("c", rs.getString(3)); + assertEquals("c", rs.getString(1)); + assertEquals("z", rs.getString(2)); + assertEquals("3", rs.getString(3)); assertFalse(rs.next()); + // make sure the index is working as expected query = "SELECT * FROM " + testTable; rs = conn.createStatement().executeQuery("EXPLAIN " + query); if (localIndex) { @@ -855,7 +855,7 @@ public class IndexIT extends BaseHBaseManagedTimeIT { } else { assertEquals("CLIENT PARALLEL 1-WAY RANGE SCAN OVER " + fullIndexName + " ['1']", QueryUtil.getExplainPlan(rs)); } - + rs = conn.createStatement().executeQuery(query); assertTrue(rs.next()); assertEquals("a",rs.getString(1)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java index 1623cab..cfd0afc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExpressionCompiler.java @@ -116,6 +116,7 @@ import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PDatum; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.schema.PTable.StorageScheme; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.RowKeyValueAccessor; import org.apache.phoenix.schema.SortOrder; @@ -408,7 +409,7 @@ public class ExpressionCompiler extends UnsupportedAllParseNodeVisitor<Expressio return LiteralExpression.newConstant(column.getDataType().toObject(ptr), column.getDataType()); } if (tableRef.equals(context.getCurrentTable()) && !SchemaUtil.isPKColumn(column)) { // project only kv columns - context.getScan().addColumn(column.getFamilyName().getBytes(), EncodedColumnsUtil.getColumnQualifier(column, tableRef.getTable())); + EncodedColumnsUtil.setColumns(column, tableRef.getTable(), context.getScan()); } Expression expression = ref.newColumnExpression(node.isTableNameCaseSensitive(), node.isCaseSensitive()); Expression wrappedExpression = wrapGroupByExpression(expression); http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java index 36c93f7..489b993 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java @@ -716,7 +716,7 @@ public class JoinCompiler { if (columnRef.getTableRef().equals(tableRef) && !SchemaUtil.isPKColumn(columnRef.getColumn()) && !(columnRef instanceof LocalIndexColumnRef)) { - scan.addColumn(columnRef.getColumn().getFamilyName().getBytes(), EncodedColumnsUtil.getColumnQualifier(columnRef.getColumn(), tableRef.getTable())); + EncodedColumnsUtil.setColumns(columnRef.getColumn(), tableRef.getTable(), scan); } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java index 2258f28..5126c8b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java @@ -94,7 +94,7 @@ public class QueryCompiler { */ private static final String LOAD_COLUMN_FAMILIES_ON_DEMAND_ATTR = "_ondemand_"; private final PhoenixStatement statement; - private final Scan scan; + private final Scan scan; private final Scan originalScan; private final ColumnResolver resolver; private final SelectStatement select; http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java index b6bb5da..3898969 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java @@ -141,7 +141,7 @@ public class TupleProjectionCompiler { projectedColumns.add(column); // Wildcard or FamilyWildcard will be handled by ProjectionCompiler. if (!isWildcard && !families.contains(sourceColumn.getFamilyName())) { - context.getScan().addColumn(sourceColumn.getFamilyName().getBytes(), EncodedColumnsUtil.getColumnQualifier(column, table)); + EncodedColumnsUtil.setColumns(column, table, context.getScan()); } } // add LocalIndexDataColumnRef http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java index 5e64209..63ad9c0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java @@ -29,6 +29,8 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.expression.AndExpression; +import org.apache.phoenix.expression.ArrayColumnExpression; +import org.apache.phoenix.expression.ColumnExpression; import org.apache.phoenix.expression.Determinism; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.KeyValueColumnExpression; @@ -51,6 +53,7 @@ import org.apache.phoenix.schema.ColumnNotFoundException; import org.apache.phoenix.schema.ColumnRef; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.schema.PTable.StorageScheme; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.TypeMismatchException; @@ -169,12 +172,14 @@ public class WhereCompiler { public Expression visit(ColumnParseNode node) throws SQLException { ColumnRef ref = resolveColumn(node); TableRef tableRef = ref.getTableRef(); + ColumnExpression newColumnExpression = ref.newColumnExpression(node.isTableNameCaseSensitive(), node.isCaseSensitive()); if (tableRef.equals(context.getCurrentTable()) && !SchemaUtil.isPKColumn(ref.getColumn())) { + byte[] cq = tableRef.getTable().getStorageScheme() == StorageScheme.COLUMNS_STORED_IN_SINGLE_CELL + ? ref.getColumn().getFamilyName().getBytes() : EncodedColumnsUtil.getColumnQualifier(ref.getColumn(), tableRef.getTable()); // track the where condition columns. Later we need to ensure the Scan in HRS scans these column CFs - byte[] cq = EncodedColumnsUtil.getColumnQualifier(ref.getColumn(), tableRef.getTable()); context.addWhereCoditionColumn(ref.getColumn().getFamilyName().getBytes(), cq); } - return ref.newColumnExpression(node.isTableNameCaseSensitive(), node.isCaseSensitive()); + return newColumnExpression; } @Override @@ -223,6 +228,22 @@ public class WhereCompiler { } } + + public void increment(ArrayColumnExpression column) { + switch (count) { + case NONE: + count = Count.SINGLE; + this.column = column.getArrayExpression(); + break; + case SINGLE: + count = column.getArrayExpression().equals(this.column) ? Count.SINGLE : Count.MULTIPLE; + break; + case MULTIPLE: + break; + + } + } + public Count getCount() { return count; } @@ -257,6 +278,12 @@ public class WhereCompiler { counter.increment(expression); return null; } + + @Override + public Void visit(ArrayColumnExpression expression) { + counter.increment(expression); + return null; + } }); switch (counter.getCount()) { case NONE: http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 5922b5d..ee65575 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -86,6 +86,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { public static final String LOCAL_INDEX_BUILD = "_LocalIndexBuild"; public static final String LOCAL_INDEX_JOIN_SCHEMA = "_LocalIndexJoinSchema"; public static final String DATA_TABLE_COLUMNS_TO_JOIN = "_DataTableColumnsToJoin"; + public static final String COLUMNS_STORED_IN_SINGLE_CELL = "_ColumnsStoredInSingleCell"; public static final String VIEW_CONSTANTS = "_ViewConstants"; public static final String EXPECTED_UPPER_REGION_KEY = "_ExpectedUpperRegionKey"; public static final String REVERSE_SCAN = "_ReverseScan"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index de68551..d8372ce 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -944,6 +944,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso StorageScheme storageScheme = storageSchemeKv == null ? StorageScheme.NON_ENCODED_COLUMN_NAMES : StorageScheme .fromSerializedValue((byte)PTinyint.INSTANCE.toObject(storageSchemeKv.getValueArray(), storageSchemeKv.getValueOffset(), storageSchemeKv.getValueLength())); + List<PColumn> columns = Lists.newArrayListWithExpectedSize(columnCount); List<PTable> indexes = new ArrayList<PTable>(); List<PName> physicalTables = new ArrayList<PName>(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index c1ef0b3..3129ef8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -498,7 +498,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver Put put = maintainer.buildUpdateMutation(kvBuilder, valueGetter, ptr, results.get(0).getTimestamp(), env.getRegion().getRegionInfo().getStartKey(), - env.getRegion().getRegionInfo().getEndKey()); + env.getRegion().getRegionInfo().getEndKey(), false); indexMutations.add(put); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java index 3453120..0dceaea 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java @@ -64,6 +64,7 @@ import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.schema.PTable.StorageScheme; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.trace.TracingIterator; @@ -298,6 +299,7 @@ public abstract class BaseQueryPlan implements QueryPlan { // TODO: can have an hint to skip joining back to data table, in that case if any column to // project is not present in the index then we need to skip this plan. if (!dataColumns.isEmpty()) { + // Set data columns to be join back from data table. PTable parentTable = context.getCurrentTable().getTable(); String parentSchemaName = parentTable.getParentSchemaName().getString(); String parentTableName = parentTable.getParentTableName().getString(); @@ -420,10 +422,17 @@ public abstract class BaseQueryPlan implements QueryPlan { ByteArrayOutputStream stream = new ByteArrayOutputStream(); try { DataOutputStream output = new DataOutputStream(stream); + boolean storeColsInSingleCell = dataTable.getStorageScheme() == StorageScheme.COLUMNS_STORED_IN_SINGLE_CELL; + if (storeColsInSingleCell) { + // if storeColsInSingleCell is true all columns of a given column family are stored in a single cell + scan.setAttribute(BaseScannerRegionObserver.COLUMNS_STORED_IN_SINGLE_CELL, QueryConstants.EMPTY_COLUMN_VALUE_BYTES); + } WritableUtils.writeVInt(output, dataColumns.size()); for (PColumn column : dataColumns) { - Bytes.writeByteArray(output, column.getFamilyName().getBytes()); - Bytes.writeByteArray(output, EncodedColumnsUtil.getColumnQualifier(column, dataTable)); + byte[] cf = column.getFamilyName().getBytes(); + byte[] cq = EncodedColumnsUtil.getColumnQualifier(column, dataTable); + Bytes.writeByteArray(output, cf); + Bytes.writeByteArray(output, cq); } scan.setAttribute(BaseScannerRegionObserver.DATA_TABLE_COLUMNS_TO_JOIN, stream.toByteArray()); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 51ca59c..34dca46 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -582,7 +582,7 @@ public class MutationState implements SQLCloseable { List<Mutation> indexMutations; try { indexMutations = - IndexUtil.generateIndexData(table, index, mutationsPertainingToIndex, + IndexUtil.generateIndexData(table, index, values, mutationsPertainingToIndex, connection.getKeyValueBuilder(), connection); // we may also have to include delete mutations for immutable tables if we are not processing all the tables in the mutations map if (!sendAll) { @@ -616,6 +616,7 @@ public class MutationState implements SQLCloseable { Iterator<Map.Entry<ImmutableBytesPtr, RowMutationState>> iterator = values.entrySet().iterator(); long timestampToUse = timestamp; + Map<ImmutableBytesPtr, RowMutationState> modifiedValues = Maps.newHashMap(); while (iterator.hasNext()) { Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry = iterator.next(); ImmutableBytesPtr key = rowEntry.getKey(); @@ -623,6 +624,10 @@ public class MutationState implements SQLCloseable { if (tableWithRowTimestampCol) { RowTimestampColInfo rowTsColInfo = state.getRowTimestampColInfo(); if (rowTsColInfo.useServerTimestamp()) { + // since we are about to modify the byte[] stored in key (which changes its hashcode) + // we need to remove the entry from the values map and add a new entry with the modified byte[] + modifiedValues.put(key, state); + iterator.remove(); // regenerate the key with this timestamp. key = getNewRowKeyWithRowTimestamp(key, timestampToUse, table); } else { @@ -654,6 +659,7 @@ public class MutationState implements SQLCloseable { if (mutationsPertainingToIndex != null) mutationsPertainingToIndex .addAll(rowMutationsPertainingToIndex); } + values.putAll(modifiedValues); } /** @@ -1383,7 +1389,7 @@ public class MutationState implements SQLCloseable { this.rowTsColInfo = rowTsColInfo; } - Map<PColumn, byte[]> getColumnValues() { + public Map<PColumn, byte[]> getColumnValues() { return columnValues; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayColumnExpression.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayColumnExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayColumnExpression.java new file mode 100644 index 0000000..aaa9f75 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayColumnExpression.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.expression; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.phoenix.expression.visitor.ExpressionVisitor; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PDatum; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.schema.types.PArrayDataType; +import org.apache.phoenix.schema.types.PVarbinary; +import org.apache.phoenix.schema.types.PVarchar; + +/** + * + * Class to access a column that is stored in a KeyValue that contains all + * columns for a given column family (stored in an array) + * + */ +public class ArrayColumnExpression extends ColumnExpression { + + private String displayName; // client-side only + private int index; + // expression that represents the array (where all cols are stored in a single key value) + private KeyValueColumnExpression arrayExpression; + // expression that represents this column if (it were stored as a regular key value) + private KeyValueColumnExpression origKVExpression; + + public ArrayColumnExpression() { + } + + public ArrayColumnExpression(PDatum column, byte[] cf, int index) { + super(column); + this.index = index; + this.arrayExpression = new KeyValueColumnExpression(column, cf, cf); + } + + public ArrayColumnExpression(PColumn column, String displayName, boolean encodedColumnName) { + super(column); + // array indexes are 1-based + this.index = column.getEncodedColumnQualifier()+1; + byte[] cf = column.getFamilyName().getBytes(); + this.arrayExpression = new KeyValueColumnExpression(column, cf, cf); + this.origKVExpression = new KeyValueColumnExpression(column, displayName, encodedColumnName); + this.displayName = displayName; + } + + @Override + public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) { + return PArrayDataType.positionAtArrayElement(tuple, ptr, index, arrayExpression, PVarbinary.INSTANCE, null); + } + + @Override + public <T> T accept(ExpressionVisitor<T> visitor) { + return visitor.visit(this); + } + + @Override + public void readFields(DataInput input) throws IOException { + super.readFields(input); + index = WritableUtils.readVInt(input); + arrayExpression = new KeyValueColumnExpression(); + arrayExpression.readFields(input); + origKVExpression = new KeyValueColumnExpression(); + origKVExpression.readFields(input); + } + + @Override + public void write(DataOutput output) throws IOException { + super.write(output); + WritableUtils.writeVInt(output, index); + arrayExpression.write(output); + origKVExpression.write(output); + } + + public KeyValueColumnExpression getArrayExpression() { + return arrayExpression; + } + + public KeyValueColumnExpression getKeyValueExpression() { + return origKVExpression; + } + + @Override + public String toString() { + return displayName; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayConstructorExpression.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayConstructorExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayConstructorExpression.java index d8df29a..f513664 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayConstructorExpression.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/ArrayConstructorExpression.java @@ -115,6 +115,7 @@ public class ArrayConstructorExpression extends BaseCompoundExpression { offsetPos[i] = byteStream.size(); oStream.write(ptr.get(), ptr.getOffset(), ptr.getLength()); oStream.write(PArrayDataType.getSeparatorByte(rowKeyOrderOptimizable, getSortOrder())); + nNulls = 0; } } else { // No nulls for fixed length oStream.write(ptr.get(), ptr.getOffset(), ptr.getLength()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java index e750732..64f0ffa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java @@ -280,7 +280,8 @@ public enum ExpressionType { FloorYearExpression(FloorYearExpression.class), CeilWeekExpression(CeilWeekExpression.class), CeilMonthExpression(CeilMonthExpression.class), - CeilYearExpression(CeilYearExpression.class); + CeilYearExpression(CeilYearExpression.class), + ArrayColumnExpression(ArrayColumnExpression.class); ExpressionType(Class<? extends Expression> clazz) { this.clazz = clazz; http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/expression/LiteralExpression.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/LiteralExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/LiteralExpression.java index 90882a2..f20d7e2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/LiteralExpression.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/LiteralExpression.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.sql.SQLException; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.expression.visitor.ExpressionVisitor; import org.apache.phoenix.schema.IllegalDataException; @@ -214,6 +215,11 @@ public class LiteralExpression extends BaseTerminalExpression { public LiteralExpression() { } + + public LiteralExpression(byte[] byteValue) { + this.byteValue = byteValue!=null ? byteValue : ByteUtil.EMPTY_BYTE_ARRAY; + this.determinism = Determinism.ALWAYS; + } private LiteralExpression(PDataType type, Determinism determinism) { this(null, type, ByteUtil.EMPTY_BYTE_ARRAY, determinism); @@ -242,7 +248,10 @@ public class LiteralExpression extends BaseTerminalExpression { @Override public String toString() { - if (value == null) { + if (value == null && byteValue!=null) { + return Bytes.toStringBinary(byteValue); + } + else if (value == null) { return "null"; } // TODO: move into PDataType? http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java index 00ece40..15a9f74 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/CloneExpressionVisitor.java @@ -26,6 +26,7 @@ import org.apache.phoenix.expression.ArrayConstructorExpression; import org.apache.phoenix.expression.CaseExpression; import org.apache.phoenix.expression.CoerceExpression; import org.apache.phoenix.expression.ComparisonExpression; +import org.apache.phoenix.expression.ArrayColumnExpression; import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression; import org.apache.phoenix.expression.DivideExpression; import org.apache.phoenix.expression.Expression; @@ -80,6 +81,11 @@ public abstract class CloneExpressionVisitor extends TraverseAllExpressionVisito public Expression visit(KeyValueColumnExpression node) { return node; } + + @Override + public Expression visit(ArrayColumnExpression node) { + return node; + } @Override public Expression visit(ProjectedColumnExpression node) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java index 31f340d..100f099 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ExpressionVisitor.java @@ -27,6 +27,7 @@ import org.apache.phoenix.expression.ArrayConstructorExpression; import org.apache.phoenix.expression.CaseExpression; import org.apache.phoenix.expression.CoerceExpression; import org.apache.phoenix.expression.ComparisonExpression; +import org.apache.phoenix.expression.ArrayColumnExpression; import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression; import org.apache.phoenix.expression.DivideExpression; import org.apache.phoenix.expression.Expression; @@ -113,6 +114,7 @@ public interface ExpressionVisitor<E> { public E visit(LiteralExpression node); public E visit(RowKeyColumnExpression node); public E visit(KeyValueColumnExpression node); + public E visit(ArrayColumnExpression node); public E visit(ProjectedColumnExpression node); public E visit(SequenceValueExpression node); http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ReplaceArrayColumnWithKeyValueColumnExpressionVisitor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ReplaceArrayColumnWithKeyValueColumnExpressionVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ReplaceArrayColumnWithKeyValueColumnExpressionVisitor.java new file mode 100644 index 0000000..7ca6d9e --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/ReplaceArrayColumnWithKeyValueColumnExpressionVisitor.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.expression.visitor; + +import java.util.List; + +import org.apache.phoenix.expression.ArrayColumnExpression; +import org.apache.phoenix.expression.Expression; + +public class ReplaceArrayColumnWithKeyValueColumnExpressionVisitor extends CloneExpressionVisitor { + + @Override + public boolean isCloneNode(Expression node, List<Expression> children) { + return !children.equals(node.getChildren()); + } + + @Override + public Expression visit(ArrayColumnExpression node) { + return node.getKeyValueExpression(); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java index 3b7067a..9e50bc4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseAllExpressionVisitor.java @@ -26,9 +26,9 @@ import org.apache.phoenix.expression.ArrayConstructorExpression; import org.apache.phoenix.expression.CaseExpression; import org.apache.phoenix.expression.CoerceExpression; import org.apache.phoenix.expression.ComparisonExpression; +import org.apache.phoenix.expression.ArrayColumnExpression; import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression; import org.apache.phoenix.expression.DivideExpression; -import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.InListExpression; import org.apache.phoenix.expression.IsNullExpression; import org.apache.phoenix.expression.KeyValueColumnExpression; @@ -121,6 +121,11 @@ public class StatelessTraverseAllExpressionVisitor<E> extends TraverseAllExpress } @Override + public E visit(ArrayColumnExpression node) { + return null; + } + + @Override public E visit(ProjectedColumnExpression node) { return null; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java index 83b28bd..1a2f2cc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/visitor/StatelessTraverseNoExpressionVisitor.java @@ -26,9 +26,9 @@ import org.apache.phoenix.expression.ArrayConstructorExpression; import org.apache.phoenix.expression.CaseExpression; import org.apache.phoenix.expression.CoerceExpression; import org.apache.phoenix.expression.ComparisonExpression; +import org.apache.phoenix.expression.ArrayColumnExpression; import org.apache.phoenix.expression.CorrelateVariableFieldAccessExpression; import org.apache.phoenix.expression.DivideExpression; -import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.InListExpression; import org.apache.phoenix.expression.IsNullExpression; import org.apache.phoenix.expression.KeyValueColumnExpression; @@ -114,6 +114,11 @@ public class StatelessTraverseNoExpressionVisitor<E> extends TraverseNoExpressio public E visit(RowKeyColumnExpression node) { return null; } + + @Override + public E visit(ArrayColumnExpression node) { + return null; + } @Override public E visit(KeyValueColumnExpression node) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java index c3d52a4..5909286 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java @@ -26,6 +26,7 @@ import java.util.TreeSet; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.expression.ArrayColumnExpression; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.KeyValueColumnExpression; import org.apache.phoenix.expression.visitor.ExpressionVisitor; @@ -187,6 +188,11 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil inputTuple.addColumn(expression.getColumnFamily(), expression.getColumnQualifier()); return null; } + @Override + public Void visit(ArrayColumnExpression expression) { + inputTuple.addColumn(expression.getArrayExpression().getColumnFamily(), expression.getArrayExpression().getColumnQualifier()); + return null; + } }; expression.accept(visitor); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java index 0d904bc..195c89c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleCQKeyValueComparisonFilter.java @@ -47,7 +47,8 @@ public class SingleCQKeyValueComparisonFilter extends SingleKeyValueComparisonFi public static SingleCQKeyValueComparisonFilter parseFrom(final byte [] pbBytes) throws DeserializationException { try { - return (SingleCQKeyValueComparisonFilter)Writables.getWritable(pbBytes, new SingleCQKeyValueComparisonFilter()); + SingleCQKeyValueComparisonFilter writable = (SingleCQKeyValueComparisonFilter)Writables.getWritable(pbBytes, new SingleCQKeyValueComparisonFilter()); + return writable; } catch (IOException e) { throw new DeserializationException(e); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java index 07f7072..527b948 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java @@ -22,11 +22,13 @@ import java.io.IOException; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.expression.ArrayColumnExpression; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.KeyValueColumnExpression; import org.apache.phoenix.expression.visitor.StatelessTraverseAllExpressionVisitor; import org.apache.phoenix.expression.visitor.TraverseAllExpressionVisitor; import org.apache.phoenix.schema.tuple.SingleKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; @@ -61,6 +63,12 @@ public abstract class SingleKeyValueComparisonFilter extends BooleanExpressionFi cq = expression.getColumnQualifier(); return null; } + @Override + public Void visit(ArrayColumnExpression expression) { + cf = expression.getArrayExpression().getColumnFamily(); + cq = expression.getArrayExpression().getColumnQualifier(); + return null; + } }; expression.accept(visitor); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java index bcadc2b..19797cf 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java @@ -35,4 +35,5 @@ public interface ValueGetter { public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws IOException; public byte[] getRowKey(); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index 2ad0c8d..a3f2a1b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -35,7 +35,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; - import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; @@ -52,10 +51,14 @@ import org.apache.phoenix.compile.ColumnResolver; import org.apache.phoenix.compile.FromCompiler; import org.apache.phoenix.compile.IndexExpressionCompiler; import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.expression.ArrayColumnExpression; +import org.apache.phoenix.expression.ArrayConstructorExpression; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.ExpressionType; import org.apache.phoenix.expression.KeyValueColumnExpression; +import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.expression.visitor.KeyValueExpressionVisitor; +import org.apache.phoenix.expression.visitor.ReplaceArrayColumnWithKeyValueColumnExpressionVisitor; import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; @@ -76,6 +79,7 @@ import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.schema.PTable.StorageScheme; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.SaltingUtil; @@ -83,8 +87,11 @@ import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.ValueSchema; import org.apache.phoenix.schema.ValueSchema.Field; +import org.apache.phoenix.schema.tuple.BaseTuple; import org.apache.phoenix.schema.tuple.ValueGetterTuple; import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.schema.types.PInteger; +import org.apache.phoenix.schema.types.PVarchar; import org.apache.phoenix.util.BitSet; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.EncodedColumnsUtil; @@ -295,6 +302,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { private boolean indexWALDisabled; private boolean isLocalIndex; private boolean immutableRows; + private boolean storeColsInSingleCell; // Transient state private final boolean isDataTableSalted; @@ -386,6 +394,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { // TODO: check whether index is immutable or not. Currently it's always false so checking // data table is with immutable rows or not. this.immutableRows = dataTable.isImmutableRows(); + this.storeColsInSingleCell = index.getStorageScheme() == StorageScheme.COLUMNS_STORED_IN_SINGLE_CELL; int indexColByteSize = 0; ColumnResolver resolver = null; List<ParseNode> parseNodes = new ArrayList<ParseNode>(1); @@ -455,7 +464,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { initCachedState(); } - public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable rowKeyPtr, byte[] regionStartKey, byte[] regionEndKey) { + public byte[] buildRowKey(ValueGetter valueGetter, ImmutableBytesWritable rowKeyPtr, byte[] regionStartKey, byte[] regionEndKey, boolean convertArrayColToKeyValueCol) { ImmutableBytesWritable ptr = new ImmutableBytesWritable(); boolean prependRegionStartKey = isLocalIndex && regionStartKey != null; boolean isIndexSalted = !isLocalIndex && nIndexSaltBuckets > 0; @@ -524,6 +533,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { SortOrder dataSortOrder; if (dataPkPosition[i] == EXPRESSION_NOT_PRESENT) { Expression expression = expressionIterator.next(); + if (convertArrayColToKeyValueCol) { + expression = expression.accept(new ReplaceArrayColumnWithKeyValueColumnExpressionVisitor()); + } dataColumnType = expression.getDataType(); dataSortOrder = expression.getSortOrder(); isNullable = expression.isNullable(); @@ -856,11 +868,11 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { return indexRowKeySchema; } - public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter valueGetter, ImmutableBytesWritable dataRowKeyPtr, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException { + public Put buildUpdateMutation(KeyValueBuilder kvBuilder, ValueGetter valueGetter, ImmutableBytesWritable dataRowKeyPtr, long ts, byte[] regionStartKey, byte[] regionEndKey, boolean convertArrayColToKeyValueCol) throws IOException { Put put = null; // New row being inserted: add the empty key value if (valueGetter.getLatestValue(dataEmptyKeyValueRef) == null) { - byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey); + byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey, convertArrayColToKeyValueCol); put = new Put(indexRowKey); // add the keyvalue for the empty row put.add(kvBuilder.buildPut(new ImmutableBytesPtr(indexRowKey), @@ -869,21 +881,72 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { emptyKeyValueQualifierPtr)); put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } - for (ColumnReference ref : this.getCoveredColumns()) { - //FIXME: samarth figure out a backward compatible way to handle this as coveredcolumnsmap won't be availble for older phoenix clients. - ColumnReference indexColRef = this.coveredColumnsMap.get(ref); - ImmutableBytesPtr cq = indexColRef.getQualifierWritable(); - ImmutableBytesPtr cf = indexColRef.getFamilyWritable(); - ImmutableBytesWritable value = valueGetter.getLatestValue(ref); - byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey); - ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey); - if (value != null) { + byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey, convertArrayColToKeyValueCol); + ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey); + if (storeColsInSingleCell) { + // map from column family to list of columns (for covered columns) + Map<String, List<ColumnReference>> familyToColListMap = Maps.newHashMap(); + for (ColumnReference ref : this.getCoveredColumns()) { + String cf = Bytes.toString(ref.getFamily()); + if (!familyToColListMap.containsKey(cf)) { + familyToColListMap.put(cf, Lists.<ColumnReference>newArrayList()); + } + familyToColListMap.get(cf).add(ref); + } + // iterate over each column family and create a byte[] containing all the columns + for (Entry<String, List<ColumnReference>> entry : familyToColListMap.entrySet()) { + byte[] columnFamily = entry.getKey().getBytes(); + List<ColumnReference> colRefs = entry.getValue(); + int maxIndex = Integer.MIN_VALUE; + // find the max col qualifier + for (ColumnReference colRef : colRefs) { + byte[] qualifier = this.coveredColumnsMap.get(colRef).getQualifier(); + maxIndex = Math.max(maxIndex, PInteger.INSTANCE.getCodec().decodeInt(qualifier, 0, SortOrder.getDefault())); + } + byte[][] colValues = new byte[maxIndex+1][]; + // set the values of the columns + for (ColumnReference colRef : colRefs) { + ImmutableBytesWritable value = valueGetter.getLatestValue(colRef); + if (value != null) { + byte[] qualifier = this.coveredColumnsMap.get(colRef).getQualifier(); + int index = PInteger.INSTANCE.getCodec().decodeInt(qualifier, 0, SortOrder.getDefault()); + colValues[index] = value.get(); + } + } + + List<Expression> children = Lists.newArrayListWithExpectedSize(colRefs.size()); + // create an expression list with all the columns + for (int j=0; j<colValues.length; ++j) { + children.add(new LiteralExpression(colValues[j]==null ? ByteUtil.EMPTY_BYTE_ARRAY : colValues[j] )); + } + // we use ArrayConstructorExpression to serialize multiple columns into a single byte[] + // construct the ArrayConstructorExpression with a variable length data type (PVarchar) since columns can be of fixed or variable length + ArrayConstructorExpression arrayExpression = new ArrayConstructorExpression(children, PVarchar.INSTANCE, rowKeyOrderOptimizable); + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + arrayExpression.evaluate(new BaseTuple() {}, ptr); if (put == null) { put = new Put(indexRowKey); put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } + ImmutableBytesPtr colFamilyPtr = new ImmutableBytesPtr(columnFamily); //this is a little bit of extra work for installations that are running <0.94.14, but that should be rare and is a short-term set of wrappers - it shouldn't kill GC - put.add(kvBuilder.buildPut(rowKey, cf, cq, ts, value)); + put.add(kvBuilder.buildPut(rowKey, colFamilyPtr, colFamilyPtr, ts, ptr)); + } + } + else { + for (ColumnReference ref : this.getCoveredColumns()) { + //FIXME: samarth figure out a backward compatible way to handle this as coveredcolumnsmap won't be availble for older phoenix clients. + ColumnReference indexColRef = this.coveredColumnsMap.get(ref); + ImmutableBytesPtr cq = indexColRef.getQualifierWritable(); + ImmutableBytesPtr cf = indexColRef.getFamilyWritable(); + ImmutableBytesWritable value = valueGetter.getLatestValue(ref); + if (value != null) { + if (put == null) { + put = new Put(indexRowKey); + put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); + } + put.add(kvBuilder.buildPut(rowKey, cf, cq, ts, value)); + } } } return put; @@ -962,7 +1025,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { @SuppressWarnings("deprecation") public Delete buildDeleteMutation(KeyValueBuilder kvBuilder, ValueGetter oldState, ImmutableBytesWritable dataRowKeyPtr, Collection<KeyValue> pendingUpdates, long ts, byte[] regionStartKey, byte[] regionEndKey) throws IOException { - byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr, regionStartKey, regionEndKey); + byte[] indexRowKey = this.buildRowKey(oldState, dataRowKeyPtr, regionStartKey, regionEndKey, false); // Delete the entire row if any of the indexed columns changed DeleteType deleteType = null; if (oldState == null || (deleteType=getDeleteTypeOrNull(pendingUpdates)) != null || hasIndexedColumnChanged(oldState, pendingUpdates)) { // Deleting the entire row @@ -1264,6 +1327,14 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } return null; } + @Override + public Void visit(ArrayColumnExpression expression) { + KeyValueColumnExpression colExpression = expression.getArrayExpression(); + if (indexedColumns.add(new ColumnReference(colExpression.getColumnFamily(), colExpression.getColumnQualifier()))) { + indexedColumnTypes.add(colExpression.getDataType()); + } + return null; + } }; expression.accept(visitor); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java index 9d2955b..b1454b7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java @@ -74,7 +74,7 @@ public class PhoenixIndexCodec extends BaseIndexCodec { indexUpdate.setTable(maintainer.isLocalIndex() ? state.getEnvironment().getRegion() .getTableDesc().getName() : maintainer.getIndexTableName()); Put put = maintainer.buildUpdateMutation(KV_BUILDER, valueGetter, ptr, state.getCurrentTimestamp(), env - .getRegion().getRegionInfo().getStartKey(), env.getRegion().getRegionInfo().getEndKey()); + .getRegion().getRegionInfo().getStartKey(), env.getRegion().getRegionInfo().getEndKey(), false); indexUpdate.setUpdate(put); indexUpdates.add(indexUpdate); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index 496b3b0..f6b929e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -52,6 +52,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.hadoop.hbase.HConstants; + import javax.management.Query; import org.apache.hadoop.hbase.HRegionInfo; @@ -87,6 +88,7 @@ import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PColumnFamily; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.schema.PTable.StorageScheme; import org.apache.phoenix.schema.PTable.ViewType; import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; import org.apache.phoenix.schema.TableRef; @@ -260,7 +262,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result private static Pair<Integer, Integer> getMinMaxQualifiers(Scan scan, StatementContext context) { PTable table = context.getCurrentTable().getTable(); - checkArgument(EncodedColumnsUtil.usesEncodedColumnNames(table), "Method should only be used for tables using encoded column names"); + StorageScheme storageScheme = table.getStorageScheme(); + checkArgument(EncodedColumnsUtil.usesEncodedColumnNames(storageScheme), "Method should only be used for tables using encoded column names"); Integer minQualifier = null; Integer maxQualifier = null; boolean emptyKVProjected = false; @@ -410,7 +413,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result // the ExplicitColumnTracker not to be used, though. if (!statement.isAggregate() && filteredColumnNotInProjection) { ScanUtil.andFilterAtEnd(scan, new ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table), - columnsTracker, conditionOnlyCfs, EncodedColumnsUtil.usesEncodedColumnNames(table))); + columnsTracker, conditionOnlyCfs, EncodedColumnsUtil.usesEncodedColumnNames(table.getStorageScheme()))); } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index f373de2..bb3306e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -94,7 +94,6 @@ import org.apache.tephra.TransactionManager; import org.apache.tephra.TransactionSystemClient; import org.apache.tephra.inmemory.InMemoryTxSystemClient; - /** * * Implementation of ConnectionQueryServices used in testing where no connection to http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java index 544fb20..ed0c711 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java @@ -18,10 +18,12 @@ package org.apache.phoenix.schema; import org.apache.http.annotation.Immutable; +import org.apache.phoenix.expression.ArrayColumnExpression; import org.apache.phoenix.expression.ColumnExpression; import org.apache.phoenix.expression.KeyValueColumnExpression; import org.apache.phoenix.expression.ProjectedColumnExpression; import org.apache.phoenix.expression.RowKeyColumnExpression; +import org.apache.phoenix.schema.PTable.StorageScheme; import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.SchemaUtil; @@ -110,7 +112,10 @@ public class ColumnRef { return new ProjectedColumnExpression(column, table, displayName); } - return new KeyValueColumnExpression(column, displayName, EncodedColumnsUtil.usesEncodedColumnNames(table)); + if (table.getStorageScheme() == StorageScheme.COLUMNS_STORED_IN_SINGLE_CELL) { + return new ArrayColumnExpression(column, displayName, EncodedColumnsUtil.usesEncodedColumnNames(table.getStorageScheme())); + } + return new KeyValueColumnExpression(column, displayName, EncodedColumnsUtil.usesEncodedColumnNames(table.getStorageScheme())); } public ColumnRef cloneAtTimestamp(long timestamp) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 95e0f4f..18176b9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -1805,7 +1805,7 @@ public class MetaDataClient { updateCacheFrequency = updateCacheFrequencyProp; } String autoPartitionSeq = (String) TableProperty.AUTO_PARTITION_SEQ.getValue(tableProps); - + Boolean storeNullsProp = (Boolean) TableProperty.STORE_NULLS.getValue(tableProps); if (storeNullsProp == null) { if (parent == null) { @@ -1827,7 +1827,7 @@ public class MetaDataClient { if (transactionalProp == null) { transactional = connection.getQueryServices().getProps().getBoolean( QueryServices.DEFAULT_TABLE_ISTRANSACTIONAL_ATTRIB, - QueryServicesOptions.DEFAULT_TRANSACTIONAL); + QueryServicesOptions.DEFAULT_TABLE_ISTRANSACTIONAL); } else { transactional = transactionalProp; } @@ -2037,7 +2037,7 @@ public class MetaDataClient { */ viewPhysicalTable = connection.getTable(new PTableKey(null, physicalNames.get(0).getString())); storageScheme = viewPhysicalTable.getStorageScheme(); - if (storageScheme == StorageScheme.ENCODED_COLUMN_NAMES) { + if (EncodedColumnsUtil.usesEncodedColumnNames(viewPhysicalTable)) { cqCounter = viewPhysicalTable.getEncodedCQCounter(); } } @@ -2078,10 +2078,15 @@ public class MetaDataClient { storageScheme = parent.getStorageScheme(); } else if (tableExists) { storageScheme = StorageScheme.NON_ENCODED_COLUMN_NAMES; + } else if (isImmutableRows) { +// storageScheme = StorageScheme.NON_ENCODED_COLUMN_NAMES; + storageScheme = StorageScheme.COLUMNS_STORED_IN_SINGLE_CELL; + // since we are storing all columns of a column family in a single key value we can't use deletes to store nulls + storeNulls = true; } else { storageScheme = StorageScheme.ENCODED_COLUMN_NAMES; } - cqCounter = storageScheme == StorageScheme.ENCODED_COLUMN_NAMES ? new EncodedCQCounter() : NULL_COUNTER; + cqCounter = storageScheme != StorageScheme.ENCODED_COLUMN_NAMES ? new EncodedCQCounter() : NULL_COUNTER; } Map<String, Integer> changedCqCounters = new HashMap<>(colDefs.size()); @@ -2531,7 +2536,7 @@ public class MetaDataClient { } private static boolean incrementEncodedCQCounter(StorageScheme storageScheme, ColumnDef colDef) { - return storageScheme == StorageScheme.ENCODED_COLUMN_NAMES && !colDef.isPK(); + return storageScheme != StorageScheme.NON_ENCODED_COLUMN_NAMES && !colDef.isPK(); } private byte[][] getSplitKeys(List<HRegionLocation> allTableRegions) { @@ -2951,7 +2956,6 @@ public class MetaDataClient { PName tenantId = connection.getTenantId(); String schemaName = table.getSchemaName().getString(); String tableName = table.getTableName().getString(); - Boolean isImmutableRowsProp = null; Boolean multiTenantProp = null; Boolean disableWALProp = null; http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java index ecef105..b9f3eaf 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java @@ -166,7 +166,8 @@ public interface PTable extends PMetaDataEntity { public enum StorageScheme { ENCODED_COLUMN_NAMES((byte)1), - NON_ENCODED_COLUMN_NAMES((byte)2); + NON_ENCODED_COLUMN_NAMES((byte)2), + COLUMNS_STORED_IN_SINGLE_CELL((byte)3); private final byte[] byteValue; private final byte serializedValue; http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java index dcc0e08..fee39ce 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java @@ -24,6 +24,7 @@ import static org.apache.phoenix.schema.SaltingUtil.SALTING_COLUMN; import java.io.IOException; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -42,6 +43,9 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.coprocessor.generated.PTableProtos; import org.apache.phoenix.exception.DataExceedsCapacityException; +import org.apache.phoenix.expression.ArrayConstructorExpression; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.index.IndexMaintainer; @@ -49,12 +53,14 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.protobuf.ProtobufUtil; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.RowKeySchema.RowKeySchemaBuilder; +import org.apache.phoenix.schema.tuple.BaseTuple; import org.apache.phoenix.schema.types.PBinary; import org.apache.phoenix.schema.types.PChar; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PDouble; import org.apache.phoenix.schema.types.PFloat; import org.apache.phoenix.schema.types.PInteger; +import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.schema.types.PVarchar; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.EncodedColumnsUtil; @@ -421,7 +427,7 @@ public class PTableImpl implements PTable { PColumn[] allColumns; this.columnsByName = ArrayListMultimap.create(columns.size(), 1); - this.kvColumnsByEncodedColumnNames = (storageScheme == StorageScheme.ENCODED_COLUMN_NAMES ? ArrayListMultimap.<Integer, PColumn>create(columns.size(), 1) : null); + this.kvColumnsByEncodedColumnNames = (EncodedColumnsUtil.usesEncodedColumnNames(storageScheme) ? ArrayListMultimap.<Integer, PColumn>create(columns.size(), 1) : null); int numPKColumns = 0; if (bucketNum != null) { // Add salt column to allColumns and pkColumns, but don't add to @@ -527,7 +533,7 @@ public class PTableImpl implements PTable { .orderedBy(Bytes.BYTES_COMPARATOR); for (int i = 0; i < families.length; i++) { Map.Entry<PName,List<PColumn>> entry = iterator.next(); - PColumnFamily family = new PColumnFamilyImpl(entry.getKey(), entry.getValue(), storageScheme == StorageScheme.ENCODED_COLUMN_NAMES); + PColumnFamily family = new PColumnFamilyImpl(entry.getKey(), entry.getValue(), EncodedColumnsUtil.usesEncodedColumnNames(storageScheme)); families[i] = family; familyByString.put(family.getName().getString(), family); familyByBytes.put(family.getName().getBytes(), family); @@ -688,7 +694,7 @@ public class PTableImpl implements PTable { } private PRow newRow(KeyValueBuilder builder, long ts, ImmutableBytesWritable key, int i, byte[]... values) { - PRow row = new PRowImpl(builder, key, ts, getBucketNum()); + PRow row = new PRowImpl(builder, key, ts, getBucketNum(), values.length); if (i < values.length) { for (PColumnFamily family : getColumnFamilies()) { for (PColumn column : family.getColumns()) { @@ -781,8 +787,10 @@ public class PTableImpl implements PTable { private Delete unsetValues; private Mutation deleteRow; private final long ts; + // map from column name to value + private Map<PColumn, byte[]> columnToValueMap; - public PRowImpl(KeyValueBuilder kvBuilder, ImmutableBytesWritable key, long ts, Integer bucketNum) { + public PRowImpl(KeyValueBuilder kvBuilder, ImmutableBytesWritable key, long ts, Integer bucketNum, int numColumns) { this.kvBuilder = kvBuilder; this.ts = ts; if (bucketNum != null) { @@ -792,7 +800,7 @@ public class PTableImpl implements PTable { this.keyPtr = new ImmutableBytesPtr(key); this.key = ByteUtil.copyKeyBytesIfNecessary(key); } - + this.columnToValueMap = Maps.newHashMapWithExpectedSize(numColumns); newMutations(); } @@ -814,8 +822,43 @@ public class PTableImpl implements PTable { // Include only deleteRow mutation if present because it takes precedence over all others mutations.add(deleteRow); } else { + // store all columns for a given column family in a single cell instead of one column per cell in order to improve write performance + if (storageScheme == StorageScheme.COLUMNS_STORED_IN_SINGLE_CELL) { + Put put = new Put(this.key); + if (isWALDisabled()) { + put.setDurability(Durability.SKIP_WAL); + } + // the setValues Put contains one cell per column, we need to convert it to a Put that contains a cell with all columns for a given column family + for (PColumnFamily family : families) { + byte[] columnFamily = family.getName().getBytes(); + Collection<PColumn> columns = family.getColumns(); + int maxEncodedColumnQualifier = Integer.MIN_VALUE; + for (PColumn column : columns) { + maxEncodedColumnQualifier = Math.max(maxEncodedColumnQualifier, column.getEncodedColumnQualifier()); + } + byte[][] colValues = new byte[maxEncodedColumnQualifier+1][]; + for (PColumn column : columns) { + colValues[column.getEncodedColumnQualifier()] = columnToValueMap.get(column); + } + + List<Expression> children = Lists.newArrayListWithExpectedSize(columns.size()); + // create an expression list with all the columns + for (int i=0; i<colValues.length; ++i) { + children.add(new LiteralExpression(colValues[i]==null ? ByteUtil.EMPTY_BYTE_ARRAY : colValues[i] )); + } + // we use ArrayConstructorExpression to serialize multiple columns into a single byte[] + // construct the ArrayConstructorExpression with a variable length data type since columns can be of fixed or variable length + ArrayConstructorExpression arrayExpression = new ArrayConstructorExpression(children, PVarbinary.INSTANCE, rowKeyOrderOptimizable); + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + arrayExpression.evaluate(new BaseTuple() {}, ptr); + ImmutableBytesPtr colFamilyPtr = new ImmutableBytesPtr(columnFamily); + addQuietly(put, kvBuilder, kvBuilder.buildPut(keyPtr, + colFamilyPtr, colFamilyPtr, ts, ptr)); + } + setValues = put; + } // Because we cannot enforce a not null constraint on a KV column (since we don't know if the row exists when - // we upsert it), se instead add a KV that is always emtpy. This allows us to imitate SQL semantics given the + // we upsert it), so instead add a KV that is always empty. This allows us to imitate SQL semantics given the // way HBase works. Pair<byte[], byte[]> emptyKvInfo = EncodedColumnsUtil.getEmptyKeyValueInfo(PTableImpl.this); addQuietly(setValues, kvBuilder, kvBuilder.buildPut(keyPtr, @@ -870,18 +913,18 @@ public class PTableImpl implements PTable { } else { ImmutableBytesWritable ptr = new ImmutableBytesWritable(byteValue == null ? HConstants.EMPTY_BYTE_ARRAY : byteValue); - Integer maxLength = column.getMaxLength(); - if (!isNull && type.isFixedWidth() && maxLength != null) { - if (ptr.getLength() < maxLength) { - type.pad(ptr, maxLength, column.getSortOrder()); - } else if (ptr.getLength() > maxLength) { - throw new DataExceedsCapacityException(name.getString() + "." + column.getName().getString() + " may not exceed " + maxLength + " bytes (" + type.toObject(byteValue) + ")"); - } + SchemaUtil.padData(name.getString(), column, ptr); + // store all columns for a given column family in a single cell instead of one column per cell in order to improve write performance + // we don't need to do anything with unsetValues as it is only used when storeNulls is false, storeNulls is always true when storeColsInSingleCell is true + if (storageScheme == StorageScheme.COLUMNS_STORED_IN_SINGLE_CELL) { + columnToValueMap.put(column, ptr.get()); + } + else { + removeIfPresent(unsetValues, family, qualifier); + addQuietly(setValues, kvBuilder, kvBuilder.buildPut(keyPtr, + column.getFamilyName().getBytesPtr(), qualifierPtr, + ts, ptr)); } - removeIfPresent(unsetValues, family, qualifier); - addQuietly(setValues, kvBuilder, kvBuilder.buildPut(keyPtr, - column.getFamilyName().getBytesPtr(), qualifierPtr, - ts, ptr)); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java index d5d0b84..6b55756 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java @@ -51,7 +51,7 @@ public enum TableProperty { STORE_NULLS(PhoenixDatabaseMetaData.STORE_NULLS, COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY, true, false), TRANSACTIONAL(PhoenixDatabaseMetaData.TRANSACTIONAL, COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY, true, false), - + UPDATE_CACHE_FREQUENCY(PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY, true, true) { @Override public Object getValue(Object value) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/920be8fe/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/BaseTuple.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/BaseTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/BaseTuple.java index 92371e7..8028eb2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/BaseTuple.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/BaseTuple.java @@ -20,9 +20,39 @@ package org.apache.phoenix.schema.tuple; import java.util.List; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; public abstract class BaseTuple implements Tuple { + @Override + public int size() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isImmutable() { + throw new UnsupportedOperationException(); + } + + @Override + public void getKey(ImmutableBytesWritable ptr) { + throw new UnsupportedOperationException(); + } + + @Override + public Cell getValue(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public Cell getValue(byte [] family, byte [] qualifier) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean getValue(byte [] family, byte [] qualifier, ImmutableBytesWritable ptr) { + throw new UnsupportedOperationException(); + } @Override public long getSequenceValue(int index) {