http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java index ff44d2e..3453120 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java @@ -69,6 +69,7 @@ import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.trace.TracingIterator; import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.SQLCloseable; @@ -297,11 +298,6 @@ public abstract class BaseQueryPlan implements QueryPlan { // TODO: can have an hint to skip joining back to data table, in that case if any column to // project is not present in the index then we need to skip this plan. if (!dataColumns.isEmpty()) { - // Set data columns to be join back from data table. - serializeDataTableColumnsToJoin(scan, dataColumns); - KeyValueSchema schema = ProjectedColumnExpression.buildSchema(dataColumns); - // Set key value schema of the data columns. - serializeSchemaIntoScan(scan, schema); PTable parentTable = context.getCurrentTable().getTable(); String parentSchemaName = parentTable.getParentSchemaName().getString(); String parentTableName = parentTable.getParentTableName().getString(); @@ -312,6 +308,12 @@ public abstract class BaseQueryPlan implements QueryPlan { FACTORY.namedTable(null, TableName.create(parentSchemaName, parentTableName)), context.getConnection()).resolveTable(parentSchemaName, parentTableName); PTable dataTable = dataTableRef.getTable(); + // Set data columns to be join back from data table. + serializeDataTableColumnsToJoin(scan, dataColumns, dataTable); + KeyValueSchema schema = ProjectedColumnExpression.buildSchema(dataColumns); + // Set key value schema of the data columns. + serializeSchemaIntoScan(scan, schema); + // Set index maintainer of the local index. serializeIndexMaintainerIntoScan(scan, dataTable); // Set view constants if exists. @@ -414,14 +416,14 @@ public abstract class BaseQueryPlan implements QueryPlan { } } - private void serializeDataTableColumnsToJoin(Scan scan, Set<PColumn> dataColumns) { + private void serializeDataTableColumnsToJoin(Scan scan, Set<PColumn> dataColumns, PTable dataTable) { ByteArrayOutputStream stream = new ByteArrayOutputStream(); try { DataOutputStream output = new DataOutputStream(stream); WritableUtils.writeVInt(output, dataColumns.size()); for (PColumn column : dataColumns) { Bytes.writeByteArray(output, column.getFamilyName().getBytes()); - Bytes.writeByteArray(output, column.getName().getBytes()); + Bytes.writeByteArray(output, EncodedColumnsUtil.getColumnQualifier(column, dataTable)); } scan.setAttribute(BaseScannerRegionObserver.DATA_TABLE_COLUMNS_TO_JOIN, stream.toByteArray()); } catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index ae78e97..51ca59c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -770,7 +770,7 @@ public class MutationState implements SQLCloseable { } for (PColumn column : columns) { if (column != null) { - resolvedTable.getColumnFamily(column.getFamilyName().getString()).getColumn(column.getName().getString()); + resolvedTable.getColumnFamily(column.getFamilyName().getString()).getPColumnForColumnName(column.getName().getString()); } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java index f4ff289..087257f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java @@ -656,6 +656,7 @@ public class SortMergeJoinPlan implements QueryPlan { byte[] b = new byte[length]; buffer.get(b); Result result = ResultUtil.toResult(new ImmutableBytesWritable(b)); + //TODO: samarth make joins work with position based look up. return new ResultTuple(result); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java index a884949..b6e1de2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjector.java @@ -17,6 +17,9 @@ */ package org.apache.phoenix.execute; +import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY; +import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_QUALIFIER; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; @@ -50,9 +53,6 @@ import org.apache.phoenix.util.SchemaUtil; import com.google.common.base.Preconditions; public class TupleProjector { - public static final byte[] VALUE_COLUMN_FAMILY = Bytes.toBytes("_v"); - public static final byte[] VALUE_COLUMN_QUALIFIER = new byte[0]; - private static final String SCAN_PROJECTOR = "scanProjector"; private final KeyValueSchema schema; http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java index 4b5fdbb..35862c4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/KeyValueColumnExpression.java @@ -28,6 +28,7 @@ import org.apache.phoenix.expression.visitor.ExpressionVisitor; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PDatum; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.SchemaUtil; @@ -41,13 +42,13 @@ import org.apache.phoenix.util.SchemaUtil; public class KeyValueColumnExpression extends ColumnExpression { private byte[] cf; private byte[] cq; - private String displayName; // client-side only + private String displayName; // client-side only. TODO: samarth see what can you do for encoded column names. public KeyValueColumnExpression() { } - public KeyValueColumnExpression(PColumn column) { - this(column, null); + public KeyValueColumnExpression(PColumn column, boolean encodedColumnName) { + this(column, null, encodedColumnName); } public KeyValueColumnExpression(PDatum column, byte[] cf, byte[] cq) { @@ -56,18 +57,19 @@ public class KeyValueColumnExpression extends ColumnExpression { this.cq = cq; } - public KeyValueColumnExpression(PColumn column, String displayName) { + public KeyValueColumnExpression(PColumn column, String displayName, boolean encodedColumnName) { super(column); this.cf = column.getFamilyName().getBytes(); - this.cq = column.getName().getBytes(); + this.cq = EncodedColumnsUtil.getColumnQualifier(column, encodedColumnName); this.displayName = displayName; } public byte[] getColumnFamily() { return cf; } - - public byte[] getColumnName() { + + //TODO: samarth look for the callers of this. + public byte[] getColumnQualifier() { return cq; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java index 3a38dee..2744f35 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/ProjectedColumnExpression.java @@ -154,6 +154,7 @@ public class ProjectedColumnExpression extends ColumnExpression { return Determinism.PER_INVOCATION; } + @Override public ProjectedColumnExpression clone() { return new ProjectedColumnExpression(this.column, this.columns, this.position, this.displayName); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java index b8b0350..d1f6211 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/ColumnProjectionFilter.java @@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; -import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.util.EncodedColumnsUtil; /** * When selecting specific columns in a SELECT query, this filter passes only selected columns @@ -53,6 +53,8 @@ public class ColumnProjectionFilter extends FilterBase implements Writable { private byte[] emptyCFName; private Map<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>> columnsTracker; private Set<byte[]> conditionOnlyCfs; + private boolean usesEncodedColumnNames; + private byte[] emptyKVQualifier; public ColumnProjectionFilter() { @@ -60,10 +62,12 @@ public class ColumnProjectionFilter extends FilterBase implements Writable { public ColumnProjectionFilter(byte[] emptyCFName, Map<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>> columnsTracker, - Set<byte[]> conditionOnlyCfs) { + Set<byte[]> conditionOnlyCfs, boolean usesEncodedColumnNames) { this.emptyCFName = emptyCFName; this.columnsTracker = columnsTracker; this.conditionOnlyCfs = conditionOnlyCfs; + this.usesEncodedColumnNames = usesEncodedColumnNames; + this.emptyKVQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(usesEncodedColumnNames).getFirst(); } @Override @@ -87,6 +91,9 @@ public class ColumnProjectionFilter extends FilterBase implements Writable { familyMapSize--; } int conditionOnlyCfsSize = WritableUtils.readVInt(input); + usesEncodedColumnNames = conditionOnlyCfsSize > 0; + emptyKVQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(usesEncodedColumnNames).getFirst(); + conditionOnlyCfsSize = Math.abs(conditionOnlyCfsSize) - 1; // restore to the actual value. this.conditionOnlyCfs = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR); while (conditionOnlyCfsSize > 0) { this.conditionOnlyCfs.add(WritableUtils.readCompressedByteArray(input)); @@ -110,12 +117,13 @@ public class ColumnProjectionFilter extends FilterBase implements Writable { } } } - // Write conditionOnlyCfs - WritableUtils.writeVInt(output, this.conditionOnlyCfs.size()); + // Encode usesEncodedColumnNames in conditionOnlyCfs size. + WritableUtils.writeVInt(output, (this.conditionOnlyCfs.size() + 1) * (usesEncodedColumnNames ? 1 : -1)); for (byte[] f : this.conditionOnlyCfs) { WritableUtils.writeCompressedByteArray(output, f); } - } + +} @Override public byte[] toByteArray() throws IOException { @@ -153,9 +161,9 @@ public class ColumnProjectionFilter extends FilterBase implements Writable { // make sure we're not holding to any of the byte[]'s ptr.set(HConstants.EMPTY_BYTE_ARRAY); if (kvs.isEmpty()) { - kvs.add(new KeyValue(firstKV.getRowArray(), firstKV.getRowOffset(),firstKV.getRowLength(), this.emptyCFName, - 0, this.emptyCFName.length, QueryConstants.EMPTY_COLUMN_BYTES, 0, - QueryConstants.EMPTY_COLUMN_BYTES.length, HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0)); + kvs.add(new KeyValue(firstKV.getRowArray(), firstKV.getRowOffset(), firstKV.getRowLength(), + this.emptyCFName, 0, this.emptyCFName.length, emptyKVQualifier, 0, + emptyKVQualifier.length, HConstants.LATEST_TIMESTAMP, Type.Maximum, null, 0, 0)); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java index dba700b..c3d52a4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiKeyValueComparisonFilter.java @@ -94,7 +94,7 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil refCount = foundColumns.size(); } - public ReturnCode resolveColumn(Cell value) { + private ReturnCode resolveColumn(Cell value) { // Always set key, in case we never find a key value column of interest, // and our expression uses row key columns. setKey(value); @@ -184,7 +184,7 @@ public abstract class MultiKeyValueComparisonFilter extends BooleanExpressionFil ExpressionVisitor<Void> visitor = new StatelessTraverseAllExpressionVisitor<Void>() { @Override public Void visit(KeyValueColumnExpression expression) { - inputTuple.addColumn(expression.getColumnFamily(), expression.getColumnName()); + inputTuple.addColumn(expression.getColumnFamily(), expression.getColumnQualifier()); return null; } }; http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java index eaf8d35..07f7072 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/SingleKeyValueComparisonFilter.java @@ -58,7 +58,7 @@ public abstract class SingleKeyValueComparisonFilter extends BooleanExpressionFi @Override public Void visit(KeyValueColumnExpression expression) { cf = expression.getColumnFamily(); - cq = expression.getColumnName(); + cq = expression.getColumnQualifier(); return null; } }; http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java index 6f9caa6..0f960e4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java @@ -32,7 +32,6 @@ import org.apache.phoenix.hbase.index.covered.TableState; import org.apache.phoenix.hbase.index.scanner.Scanner; import com.google.common.collect.Lists; -import com.google.common.collect.Lists; /** * http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index ee67ca2..2ad0c8d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -32,8 +32,10 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; + import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; @@ -85,6 +87,7 @@ import org.apache.phoenix.schema.tuple.ValueGetterTuple; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.util.BitSet; import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.SchemaUtil; @@ -275,8 +278,10 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { // columns required to evaluate all expressions in indexedExpressions (this does not include columns in the data row key) private Set<ColumnReference> indexedColumns; private Set<ColumnReference> coveredColumns; - // Map used to cache column family of data table and the corresponding column family for the local index - private Map<ImmutableBytesPtr, ImmutableBytesWritable> dataTableLocalIndexFamilyMap; + // Map of covered columns where a key is column reference for a column in the data table + // and value is column reference for corresponding column in the index table. + // TODO: samarth confirm that we don't need a separate map for tracking column families of local indexes. + private Map<ColumnReference, ColumnReference> coveredColumnsMap; // columns required to create index row i.e. indexedColumns + coveredColumns (this does not include columns in the data row key) private Set<ColumnReference> allColumns; // TODO remove this in the next major release @@ -295,13 +300,14 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { private final boolean isDataTableSalted; private final RowKeySchema dataRowKeySchema; - private List<ImmutableBytesPtr> indexQualifiers; private int estimatedIndexRowKeyBytes; private int estimatedExpressionSize; private int[] dataPkPosition; private int maxTrailingNulls; private ColumnReference dataEmptyKeyValueRef; private boolean rowKeyOrderOptimizable; + private boolean usesEncodedColumnNames; + private ImmutableBytesPtr emptyKeyValueQualifierPtr; private IndexMaintainer(RowKeySchema dataRowKeySchema, boolean isDataTableSalted) { this.dataRowKeySchema = dataRowKeySchema; @@ -315,14 +321,19 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { this.isMultiTenant = dataTable.isMultiTenant(); this.viewIndexId = index.getViewIndexId() == null ? null : MetaDataUtil.getViewIndexIdDataType().toBytes(index.getViewIndexId()); this.isLocalIndex = index.getIndexType() == IndexType.LOCAL; - + /* + * There is nothing to prevent new indexes on existing tables to have encoded column names. + * Except, due to backward compatibility reasons, we aren't able to change IndexMaintainer and the state + * that is serialized in it. Because of this we are forced to have the indexes inherit the + * storage scheme of the parent data tables. + */ + this.usesEncodedColumnNames = EncodedColumnsUtil.usesEncodedColumnNames(dataTable); byte[] indexTableName = index.getPhysicalName().getBytes(); // Use this for the nDataSaltBuckets as we need this for local indexes // TODO: persist nDataSaltBuckets separately, but maintain b/w compat. Integer nIndexSaltBuckets = isLocalIndex ? dataTable.getBucketNum() : index.getBucketNum(); boolean indexWALDisabled = index.isWALDisabled(); int indexPosOffset = (index.getBucketNum() == null ? 0 : 1) + (this.isMultiTenant ? 1 : 0) + (this.viewIndexId == null ? 0 : 1); -// int indexPosOffset = !isLocalIndex && nIndexSaltBuckets > 0 ? 1 : 0; int nIndexColumns = index.getColumns().size() - indexPosOffset; int nIndexPKColumns = index.getPKColumns().size() - indexPosOffset; // number of expressions that are indexed that are not present in the row key of the data table @@ -333,7 +344,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { String dataFamilyName = IndexUtil.getDataColumnFamilyName(indexColumnName); String dataColumnName = IndexUtil.getDataColumnName(indexColumnName); try { - PColumn dataColumn = dataFamilyName.equals("") ? dataTable.getColumn(dataColumnName) : dataTable.getColumnFamily(dataFamilyName).getColumn(dataColumnName); + PColumn dataColumn = dataFamilyName.equals("") ? dataTable.getPColumnForColumnName(dataColumnName) : dataTable.getColumnFamily(dataFamilyName).getPColumnForColumnName(dataColumnName); if (SchemaUtil.isPKColumn(dataColumn)) continue; } catch (ColumnNotFoundException e) { @@ -366,7 +377,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { this.indexedColumnTypes = Lists.<PDataType>newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns); this.indexedExpressions = Lists.newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns); this.coveredColumns = Sets.newLinkedHashSetWithExpectedSize(nIndexColumns-nIndexPKColumns); - this.dataTableLocalIndexFamilyMap = Maps.newHashMapWithExpectedSize(nIndexColumns-nIndexPKColumns); + this.coveredColumnsMap = Maps.newHashMapWithExpectedSize(nIndexColumns - nIndexPKColumns); this.nIndexSaltBuckets = nIndexSaltBuckets == null ? 0 : nIndexSaltBuckets; this.dataEmptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(dataTable); this.emptyKeyValueCFPtr = SchemaUtil.getEmptyColumnFamilyPtr(index); @@ -408,7 +419,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { throw new RuntimeException(e); // Impossible } if ( expressionIndexCompiler.getColumnRef()!=null ) { - // get the column of the data table that corresponds to this index column + // get the column of the data column that corresponds to this index column PColumn column = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString()); boolean isPKColumn = SchemaUtil.isPKColumn(column); if (isPKColumn) { @@ -432,12 +443,12 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { for (int i = 0; i < index.getColumnFamilies().size(); i++) { PColumnFamily family = index.getColumnFamilies().get(i); for (PColumn indexColumn : family.getColumns()) { - PColumn column = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString()); - PName dataTableFamily = column.getFamilyName(); - this.coveredColumns.add(new ColumnReference(dataTableFamily.getBytes(), column.getName().getBytes())); - if(isLocalIndex) { - this.dataTableLocalIndexFamilyMap.put(new ImmutableBytesPtr(dataTableFamily.getBytes()), new ImmutableBytesWritable(Bytes.toBytes(IndexUtil.getLocalIndexColumnFamily(dataTableFamily.getString())))); - } + PColumn dataColumn = IndexUtil.getDataColumn(dataTable, indexColumn.getName().getString()); + byte[] dataColumnCq = EncodedColumnsUtil.getColumnQualifier(dataColumn, dataTable); + byte[] indexColumnCq = EncodedColumnsUtil.getColumnQualifier(indexColumn, index); + this.coveredColumns.add(new ColumnReference(dataColumn.getFamilyName().getBytes(), dataColumnCq)); + this.coveredColumnsMap.put(new ColumnReference(dataColumn.getFamilyName().getBytes(), dataColumnCq), + new ColumnReference(indexColumn.getFamilyName().getBytes(), indexColumnCq)); } } this.estimatedIndexRowKeyBytes = estimateIndexRowKeyByteSize(indexColByteSize); @@ -853,14 +864,16 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { put = new Put(indexRowKey); // add the keyvalue for the empty row put.add(kvBuilder.buildPut(new ImmutableBytesPtr(indexRowKey), - this.getEmptyKeyValueFamily(), QueryConstants.EMPTY_COLUMN_BYTES_PTR, ts, + this.getEmptyKeyValueFamily(), emptyKeyValueQualifierPtr, ts, // set the value to the empty column name - QueryConstants.EMPTY_COLUMN_BYTES_PTR)); + emptyKeyValueQualifierPtr)); put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } - int i = 0; for (ColumnReference ref : this.getCoveredColumns()) { - ImmutableBytesPtr cq = this.indexQualifiers.get(i++); + //FIXME: samarth figure out a backward compatible way to handle this as coveredcolumnsmap won't be availble for older phoenix clients. + ColumnReference indexColRef = this.coveredColumnsMap.get(ref); + ImmutableBytesPtr cq = indexColRef.getQualifierWritable(); + ImmutableBytesPtr cf = indexColRef.getFamilyWritable(); ImmutableBytesWritable value = valueGetter.getLatestValue(ref); byte[] indexRowKey = this.buildRowKey(valueGetter, dataRowKeyPtr, regionStartKey, regionEndKey); ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexRowKey); @@ -870,12 +883,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } //this is a little bit of extra work for installations that are running <0.94.14, but that should be rare and is a short-term set of wrappers - it shouldn't kill GC - if(this.isLocalIndex) { - ImmutableBytesWritable localIndexColFamily = this.dataTableLocalIndexFamilyMap.get(ref.getFamilyWritable()); - put.add(kvBuilder.buildPut(rowKey, localIndexColFamily, cq, ts, value)); - } else { - put.add(kvBuilder.buildPut(rowKey, ref.getFamilyWritable(), cq, ts, value)); - } + put.add(kvBuilder.buildPut(rowKey, cf, cq, ts, value)); } } return put; @@ -963,14 +971,12 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { for (ColumnReference ref : getCoveredColumns()) { byte[] family = ref.getFamily(); - if (this.isLocalIndex) { - family = this.dataTableLocalIndexFamilyMap.get(ref.getFamilyWritable()).get(); - } + ColumnReference indexColumn = coveredColumnsMap.get(ref); // If table delete was single version, then index delete should be as well if (deleteType == DeleteType.SINGLE_VERSION) { - delete.deleteFamilyVersion(family, ts); + delete.deleteFamilyVersion(indexColumn.getFamily(), ts); } else { - delete.deleteFamily(family, ts); + delete.deleteFamily(indexColumn.getFamily(), ts); } } if (deleteType == DeleteType.SINGLE_VERSION) { @@ -991,12 +997,15 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { delete = new Delete(indexRowKey); delete.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } - byte[] family = this.isLocalIndex ? this.dataTableLocalIndexFamilyMap.get(ref.getFamilyWritable()).get() : ref.getFamily(); + ColumnReference indexColumn = coveredColumnsMap.get(ref); // If point delete for data table, then use point delete for index as well - if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) { - delete.deleteColumn(family, IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts); + if (kv.getTypeByte() == KeyValue.Type.Delete.getCode()) { + //FIXME: samarth change this. Index column qualifiers are not derivable from data table cqs. + // Figure out a backward compatible way of going this since coveredColumnsMap won't be available + // for older clients. + delete.deleteColumn(indexColumn.getFamily(), indexColumn.getQualifier(), ts); } else { - delete.deleteColumns(family, IndexUtil.getIndexColumnName(ref.getFamily(), ref.getQualifier()), ts); + delete.deleteColumns(indexColumn.getFamily(), indexColumn.getQualifier(), ts); } } } @@ -1051,15 +1060,16 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { isLocalIndex = encodedCoveredolumnsAndLocalIndex < 0; int nCoveredColumns = Math.abs(encodedCoveredolumnsAndLocalIndex) - 1; coveredColumns = Sets.newLinkedHashSetWithExpectedSize(nCoveredColumns); - dataTableLocalIndexFamilyMap = Maps.newHashMapWithExpectedSize(nCoveredColumns); + coveredColumnsMap = Maps.newHashMapWithExpectedSize(nCoveredColumns); for (int i = 0; i < nCoveredColumns; i++) { - byte[] cf = Bytes.readByteArray(input); - byte[] cq = Bytes.readByteArray(input); - ColumnReference ref = new ColumnReference(cf,cq); - coveredColumns.add(ref); - if(isLocalIndex) { - dataTableLocalIndexFamilyMap.put(ref.getFamilyWritable(), new ImmutableBytesWritable(Bytes.toBytes(IndexUtil.getLocalIndexColumnFamily(Bytes.toString(cf))))); - } + byte[] dataTableCf = Bytes.readByteArray(input); + byte[] dataTableCq = Bytes.readByteArray(input); + byte[] indexTableCf = Bytes.readByteArray(input); + byte[] indexTableCq = Bytes.readByteArray(input); + ColumnReference dataColumn = new ColumnReference(dataTableCf, dataTableCq); + coveredColumns.add(dataColumn); + ColumnReference indexColumn = new ColumnReference(indexTableCf, indexTableCq); + coveredColumnsMap.put(dataColumn, indexColumn); } // Hack to serialize whether the index row key is optimizable int len = WritableUtils.readVInt(input); @@ -1085,6 +1095,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { if (isNewClient) { int numIndexedExpressions = WritableUtils.readVInt(input); + usesEncodedColumnNames = numIndexedExpressions > 0; + numIndexedExpressions = Math.abs(numIndexedExpressions) - 1; indexedExpressions = Lists.newArrayListWithExpectedSize(numIndexedExpressions); for (int i = 0; i < numIndexedExpressions; i++) { Expression expression = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance(); @@ -1161,9 +1173,13 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } // Encode coveredColumns.size() and whether or not this is a local index WritableUtils.writeVInt(output, (coveredColumns.size() + 1) * (isLocalIndex ? -1 : 1)); - for (ColumnReference ref : coveredColumns) { - Bytes.writeByteArray(output, ref.getFamily()); - Bytes.writeByteArray(output, ref.getQualifier()); + for (Entry<ColumnReference, ColumnReference> ref : coveredColumnsMap.entrySet()) { + ColumnReference dataColumn = ref.getKey(); + ColumnReference indexColumn = ref.getValue(); + Bytes.writeByteArray(output, dataColumn.getFamily()); + Bytes.writeByteArray(output, dataColumn.getQualifier()); + Bytes.writeByteArray(output, indexColumn.getFamily()); + Bytes.writeByteArray(output, indexColumn.getQualifier()); } // TODO: remove when rowKeyOrderOptimizable hack no longer needed WritableUtils.writeVInt(output,indexTableName.length * (rowKeyOrderOptimizable ? 1 : -1)); @@ -1174,7 +1190,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { WritableUtils.writeVInt(output,-emptyKeyValueCFPtr.getLength()); output.write(emptyKeyValueCFPtr.get(),emptyKeyValueCFPtr.getOffset(), emptyKeyValueCFPtr.getLength()); - WritableUtils.writeVInt(output, indexedExpressions.size()); + // Hack to encode usesEncodedColumnNames in indexedExpressions size. + int indexedExpressionsSize = (indexedExpressions.size() + 1) * (usesEncodedColumnNames ? 1 : -1); + WritableUtils.writeVInt(output, indexedExpressionsSize); for (Expression expression : indexedExpressions) { WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal()); expression.write(output); @@ -1231,16 +1249,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { * Init calculated state reading/creating */ private void initCachedState() { - dataEmptyKeyValueRef = - new ColumnReference(emptyKeyValueCFPtr.copyBytesIfNecessary(), - QueryConstants.EMPTY_COLUMN_BYTES); - - indexQualifiers = Lists.newArrayListWithExpectedSize(this.coveredColumns.size()); - for (ColumnReference ref : coveredColumns) { - indexQualifiers.add(new ImmutableBytesPtr(IndexUtil.getIndexColumnName( - ref.getFamily(), ref.getQualifier()))); - } - + byte[] emptyKvQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(usesEncodedColumnNames).getFirst(); + dataEmptyKeyValueRef = new ColumnReference(emptyKeyValueCFPtr.copyBytesIfNecessary(), emptyKvQualifier); + emptyKeyValueQualifierPtr = new ImmutableBytesPtr(emptyKvQualifier); this.allColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size() + coveredColumns.size()); // columns that are required to evaluate all expressions in indexedExpressions (not including columns in data row key) this.indexedColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size()); @@ -1248,7 +1259,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { KeyValueExpressionVisitor visitor = new KeyValueExpressionVisitor() { @Override public Void visit(KeyValueColumnExpression expression) { - if (indexedColumns.add(new ColumnReference(expression.getColumnFamily(), expression.getColumnName()))) { + if (indexedColumns.add(new ColumnReference(expression.getColumnFamily(), expression.getColumnQualifier()))) { indexedColumnTypes.add(expression.getDataType()); } return null; @@ -1513,4 +1524,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { return udfParseNodes; } } + + public byte[] getEmptyKeyValueQualifier() { + return emptyKeyValueQualifierPtr.copyBytes(); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index c67da6e..9ee5ea7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -67,7 +67,6 @@ import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.write.IndexWriter; import org.apache.phoenix.query.KeyRange; -import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.trace.TracingUtils; import org.apache.phoenix.trace.util.NullSpan; @@ -304,8 +303,16 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { for (ColumnReference ref : mutableColumns) { scan.addColumn(ref.getFamily(), ref.getQualifier()); } + /* + * Indexes inherit the storage scheme of the data table which means all the indexes have the same + * storage scheme and empty key value qualifier. Note that this assumption would be broken if we start + * supporting new indexes over existing data tables to have a different storage scheme than the data + * table. + */ + byte[] emptyKeyValueQualifier = indexMaintainers.get(0).getEmptyKeyValueQualifier(); + // Project empty key value column - scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES); + scan.addColumn(indexMaintainers.get(0).getDataEmptyKeyValueCF(), emptyKeyValueQualifier); ScanRanges scanRanges = ScanRanges.create(SchemaUtil.VAR_BINARY_SCHEMA, Collections.singletonList(keys), ScanUtil.SINGLE_COLUMN_SLOT_SPAN, KeyRange.EVERYTHING_RANGE, null, true, -1); scanRanges.initializeScan(scan); TableName tableName = env.getRegion().getRegionInfo().getTable(); @@ -356,7 +363,8 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { Map<ImmutableBytesPtr, MultiMutation> mutationsToFindPreviousValue) throws IOException { if (scanner != null) { Result result; - ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES); + ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0) + .getDataEmptyKeyValueCF(), indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier()); // Process existing data table rows by removing the old index row and adding the new index row while ((result = scanner.next()) != null) { Mutation m = mutationsToFindPreviousValue.remove(new ImmutableBytesPtr(result.getRow())); @@ -384,7 +392,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { // to generate point delete markers for all index rows that were added. We don't have Tephra // manage index rows in change sets because we don't want to be hit with the additional // memory hit and do not need to do conflict detection on index rows. - ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), QueryConstants.EMPTY_COLUMN_BYTES); + ColumnReference emptyColRef = new ColumnReference(indexMetaData.getIndexMaintainers().get(0).getDataEmptyKeyValueCF(), indexMetaData.getIndexMaintainers().get(0).getEmptyKeyValueQualifier()); while ((result = scanner.next()) != null) { Mutation m = mutations.remove(new ImmutableBytesPtr(result.getRow())); // Sort by timestamp, type, cf, cq so we can process in time batches from oldest to newest http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index ceba000..496b3b0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -17,12 +17,16 @@ */ package org.apache.phoenix.iterate; +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER; +import static org.apache.phoenix.query.QueryConstants.ENCODED_EMPTY_COLUMN_NAME; import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY; +import static org.apache.phoenix.util.ScanUtil.setMinMaxQualifiersOnScan; import java.io.ByteArrayInputStream; import java.io.DataInput; @@ -48,6 +52,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.hadoop.hbase.HConstants; +import javax.management.Query; + import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; @@ -87,8 +93,10 @@ import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.stats.GuidePostsInfo; import org.apache.phoenix.schema.stats.PTableStats; import org.apache.phoenix.schema.stats.StatisticsUtil; +import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.Closeables; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.PrefixByteCodec; import org.apache.phoenix.util.PrefixByteDecoder; @@ -207,7 +215,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result // Project empty key value unless the column family containing it has // been projected in its entirety. if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != null) { - scan.addColumn(ecf, QueryConstants.EMPTY_COLUMN_BYTES); + scan.addColumn(ecf, EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst()); } } } @@ -225,7 +233,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result if(offset!=null){ ScanUtil.addOffsetAttribute(scan, offset); } - int cols = plan.getGroupBy().getOrderPreservingColumnCount(); if (cols > 0 && context.getWhereConditionColumns().size() == 0 && !plan.getStatement().getHint().hasHint(HintNode.Hint.RANGE_SCAN) && @@ -237,13 +244,76 @@ public abstract class BaseResultIterators extends ExplainTable implements Result new DistinctPrefixFilter(plan.getTableRef().getTable().getRowKeySchema(), cols)); } - + //TODO: samarth add condition to not do position based look ups in case of joins so that we won't need to do the hacky check inside co-processors. + if (setMinMaxQualifiersOnScan(table)) { + Pair<Integer, Integer> minMaxQualifiers = getMinMaxQualifiers(scan, context); + if (minMaxQualifiers != null) { + scan.setAttribute(BaseScannerRegionObserver.MIN_QUALIFIER, PInteger.INSTANCE.toBytes(minMaxQualifiers.getFirst())); + scan.setAttribute(BaseScannerRegionObserver.MAX_QUALIFIER, PInteger.INSTANCE.toBytes(minMaxQualifiers.getSecond())); + } + } if (optimizeProjection) { optimizeProjection(context, scan, table, statement); } } } - + + private static Pair<Integer, Integer> getMinMaxQualifiers(Scan scan, StatementContext context) { + PTable table = context.getCurrentTable().getTable(); + checkArgument(EncodedColumnsUtil.usesEncodedColumnNames(table), "Method should only be used for tables using encoded column names"); + Integer minQualifier = null; + Integer maxQualifier = null; + boolean emptyKVProjected = false; + for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) { + byte[] cq = whereCol.getSecond(); + if (cq != null) { + int qualifier = (Integer)PInteger.INSTANCE.toObject(cq); + if (qualifier == ENCODED_EMPTY_COLUMN_NAME) { + emptyKVProjected = true; + continue; + } + if (minQualifier == null && maxQualifier == null) { + minQualifier = maxQualifier = qualifier; + } else { + if (qualifier < minQualifier) { + minQualifier = qualifier; + } else if (qualifier > maxQualifier) { + maxQualifier = qualifier; + } + } + } + } + Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap(); + for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) { + if (entry.getValue() != null) { + for (byte[] cq : entry.getValue()) { + if (cq != null) { + int qualifier = (Integer)PInteger.INSTANCE.toObject(cq); + if (qualifier == ENCODED_EMPTY_COLUMN_NAME) { + emptyKVProjected = true; + continue; + } + if (minQualifier == null && maxQualifier == null) { + minQualifier = maxQualifier = qualifier; + } else { + if (qualifier < minQualifier) { + minQualifier = qualifier; + } else if (qualifier > maxQualifier) { + maxQualifier = qualifier; + } + } + } + } + } + } + if (minQualifier == null && emptyKVProjected) { + return new Pair<>(ENCODED_EMPTY_COLUMN_NAME, ENCODED_EMPTY_COLUMN_NAME); + } else if (minQualifier == null) { + return null; + } + return new Pair<>(minQualifier, maxQualifier); + } + private static void optimizeProjection(StatementContext context, Scan scan, PTable table, FilterableStatement statement) { Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap(); // columnsTracker contain cf -> qualifiers which should get returned. @@ -340,7 +410,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result // the ExplicitColumnTracker not to be used, though. if (!statement.isAggregate() && filteredColumnNotInProjection) { ScanUtil.andFilterAtEnd(scan, new ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table), - columnsTracker, conditionOnlyCfs)); + columnsTracker, conditionOnlyCfs, EncodedColumnsUtil.usesEncodedColumnNames(table))); } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java index 3293f65..1e5f09e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LookAheadResultIterator.java @@ -49,7 +49,7 @@ abstract public class LookAheadResultIterator implements PeekingResultIterator { }; } - private final static Tuple UNINITIALIZED = new ResultTuple(); + private final static Tuple UNINITIALIZED = ResultTuple.EMPTY_TUPLE; private Tuple next = UNINITIALIZED; abstract protected Tuple advance() throws SQLException; http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java index 8ada952..135ab26 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java @@ -180,6 +180,7 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> { return this.index; } + @Override public int size() { if (flushBuffer) return flushedCount; http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java index 8dcb2e8..e4c52c0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java @@ -32,6 +32,7 @@ import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.OrderByExpression; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.SizedUtil; import com.google.common.base.Function; @@ -264,7 +265,7 @@ public class OrderedResultIterator implements PeekingResultIterator { } this.byteSize = queueEntries.getByteSize(); } catch (IOException e) { - throw new SQLException("", e); + ServerUtil.createIOException(e.getMessage(), e); } finally { delegate.close(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java index 88e141a..531bbe7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java @@ -24,16 +24,24 @@ import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; public class RegionScannerResultIterator extends BaseResultIterator { private final RegionScanner scanner; + private final Pair<Integer, Integer> minMaxQualifiers; + private final boolean useQualifierAsIndex; - public RegionScannerResultIterator(RegionScanner scanner) { + public RegionScannerResultIterator(RegionScanner scanner, Pair<Integer, Integer> minMaxQualifiers, boolean isJoin) { this.scanner = scanner; + this.useQualifierAsIndex = ScanUtil.useQualifierAsIndex(minMaxQualifiers, isJoin); + this.minMaxQualifiers = minMaxQualifiers; } @Override @@ -43,7 +51,7 @@ public class RegionScannerResultIterator extends BaseResultIterator { synchronized (scanner) { try { // TODO: size - List<Cell> results = new ArrayList<Cell>(); + List<Cell> results = useQualifierAsIndex ? new EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond()) : new ArrayList<Cell>(); // Results are potentially returned even when the return value of s.next is false // since this is an indication of whether or not there are more values after the // ones returned @@ -53,7 +61,7 @@ public class RegionScannerResultIterator extends BaseResultIterator { } // We instantiate a new tuple because in all cases currently we hang on to it // (i.e. to compute and hold onto the TopN). - MultiKeyValueTuple tuple = new MultiKeyValueTuple(); + Tuple tuple = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple(); tuple.setKeyValues(results); return tuple; } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index 2927de1..7da41c7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -108,9 +108,9 @@ import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.SQLCloseables; +import org.apache.phoenix.util.SchemaUtil; import org.cloudera.htrace.Sampler; import org.cloudera.htrace.TraceScope; -import org.apache.phoenix.util.SchemaUtil; import org.apache.tephra.TransactionContext; http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java index 4fd4485..0e7db1c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java @@ -205,7 +205,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData { public static final byte[] BASE_COLUMN_COUNT_BYTES = Bytes.toBytes(BASE_COLUMN_COUNT); public static final String IS_ROW_TIMESTAMP = "IS_ROW_TIMESTAMP"; public static final byte[] IS_ROW_TIMESTAMP_BYTES = Bytes.toBytes(IS_ROW_TIMESTAMP); - + public static final String TABLE_FAMILY = QueryConstants.DEFAULT_COLUMN_FAMILY; public static final byte[] TABLE_FAMILY_BYTES = QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES; @@ -315,6 +315,13 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData { /** Version below which we fall back on the generic KeyValueBuilder */ public static final int CLIENT_KEY_VALUE_BUILDER_THRESHOLD = VersionUtil.encodeVersion("0", "94", "14"); + public static final String STORAGE_SCHEME = "STORAGE_SCHEME"; + public static final byte[] STORAGE_SCHEME_BYTES = Bytes.toBytes(STORAGE_SCHEME); + public static final String ENCODED_COLUMN_QUALIFIER = "COLUMN_QUALIFIER"; + public static final byte[] ENCODED_COLUMN_QUALIFIER_BYTES = Bytes.toBytes(ENCODED_COLUMN_QUALIFIER); + public static final String COLUMN_QUALIFIER_COUNTER = "QUALIFIER_COUNTER"; + public static final byte[] COLUMN_QUALIFIER_COUNTER_BYTES = Bytes.toBytes(COLUMN_QUALIFIER_COUNTER); + PhoenixDatabaseMetaData(PhoenixConnection connection) throws SQLException { this.emptyResultSet = new PhoenixResultSet(ResultIterator.EMPTY_ITERATOR, RowProjector.EMPTY_PROJECTOR, new StatementContext(new PhoenixStatement(connection), false)); this.connection = connection; @@ -588,9 +595,8 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData { newCells.addAll(cells); newCells.add(kv); Collections.sort(newCells, KeyValue.COMPARATOR); - resultTuple.setResult(Result.create(newCells)); + tuple = new ResultTuple(Result.create(newCells)); } - return tuple; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java index 47c17ae..3ca48a1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java @@ -107,7 +107,7 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable { private final static String STRING_FALSE = "0"; private final static BigDecimal BIG_DECIMAL_FALSE = BigDecimal.valueOf(0); private final static Integer INTEGER_FALSE = Integer.valueOf(0); - private final static Tuple BEFORE_FIRST = new ResultTuple(); + private final static Tuple BEFORE_FIRST = ResultTuple.EMPTY_TUPLE; private final ResultIterator scanner; private final RowProjector rowProjector; http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java index 908a117..2d7550a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/join/HashCacheFactory.java @@ -122,6 +122,7 @@ public class HashCacheFactory implements ServerCacheFactory { int resultSize = (int)Bytes.readVLong(hashCacheByteArray, offset); offset += WritableUtils.decodeVIntSize(hashCacheByteArray[offset]); ImmutableBytesWritable value = new ImmutableBytesWritable(hashCacheByteArray,offset,resultSize); + //TODO: samarth make joins work with position look up. Tuple result = new ResultTuple(ResultUtil.toResult(value)); ImmutableBytesPtr key = TupleUtil.getConcatenatedValue(result, onExpressions); List<Tuple> tuples = hashCacheMap.get(key); http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java index cacbce7..d94fa42 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.TreeMap; + import javax.annotation.Nullable; import org.apache.hadoop.conf.Configuration; @@ -48,6 +49,7 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.SchemaUtil; @@ -207,7 +209,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri not care about it */ private void initColumnIndexes() throws SQLException { - columnIndexes = new TreeMap(Bytes.BYTES_COMPARATOR); + columnIndexes = new TreeMap<>(Bytes.BYTES_COMPARATOR); int columnIndex = 0; for(int index = 0; index < logicalNames.size(); index++) { PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(index)); @@ -215,18 +217,23 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri for (int i = 0; i < cls.size(); i++) { PColumn c = cls.get(i); byte[] family = new byte[0]; - if (c.getFamilyName() != null) // Skip PK column + byte[] cq; + if (!SchemaUtil.isPKColumn(c)) { family = c.getFamilyName().getBytes(); - byte[] name = c.getName().getBytes(); - byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, name); + cq = EncodedColumnsUtil.getColumnQualifier(c, table); + } else { + // TODO: samarth verify if this is the right thing to do here. + cq = c.getName().getBytes(); + } + byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq); if (!columnIndexes.containsKey(cfn)) { columnIndexes.put(cfn, new Integer(columnIndex)); columnIndex++; } } byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table); - byte[] cfn = Bytes.add(emptyColumnFamily, QueryConstants.NAMESPACE_SEPARATOR_BYTES, - QueryConstants.EMPTY_COLUMN_BYTES); + byte[] emptyKeyValue = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst(); + byte[] cfn = Bytes.add(emptyColumnFamily, QueryConstants.NAMESPACE_SEPARATOR_BYTES, emptyKeyValue); columnIndexes.put(cfn, new Integer(columnIndex)); columnIndex++; } @@ -242,9 +249,9 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri private int findIndex(Cell cell) throws IOException { byte[] familyName = Bytes.copy(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()); - byte[] name = Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(), + byte[] cq = Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); - byte[] cfn = Bytes.add(familyName, QueryConstants.NAMESPACE_SEPARATOR_BYTES, name); + byte[] cfn = Bytes.add(familyName, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq); if(columnIndexes.containsKey(cfn)) { return columnIndexes.get(cfn); } @@ -397,4 +404,4 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri return keyValues; } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java index 15d6d2f..c5f690b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java @@ -44,6 +44,7 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.util.Closeables; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.SchemaUtil; @@ -89,7 +90,7 @@ public class FormatToKeyValueReducer } private void initColumnsMap(PhoenixConnection conn) throws SQLException { - Map<byte[], Integer> indexMap = new TreeMap(Bytes.BYTES_COMPARATOR); + Map<byte[], Integer> indexMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); columnIndexes = new HashMap<>(); int columnIndex = 0; for (int index = 0; index < logicalNames.size(); index++) { @@ -98,12 +99,16 @@ public class FormatToKeyValueReducer for (int i = 0; i < cls.size(); i++) { PColumn c = cls.get(i); byte[] family = new byte[0]; - if (c.getFamilyName() != null) { + byte[] cq; + if (!SchemaUtil.isPKColumn(c)) { family = c.getFamilyName().getBytes(); + cq = EncodedColumnsUtil.getColumnQualifier(c, table); + } else { + // TODO: samarth verify if this is the right thing to do here. + cq = c.getName().getBytes(); } - byte[] name = c.getName().getBytes(); - byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, name); - Pair<byte[], byte[]> pair = new Pair(family, name); + byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, cq); + Pair<byte[], byte[]> pair = new Pair<>(family, cq); if (!indexMap.containsKey(cfn)) { indexMap.put(cfn, new Integer(columnIndex)); columnIndexes.put(new Integer(columnIndex), pair); @@ -111,8 +116,8 @@ public class FormatToKeyValueReducer } } byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table); - Pair<byte[], byte[]> pair = new Pair(emptyColumnFamily, QueryConstants - .EMPTY_COLUMN_BYTES); + byte[] emptyKeyValue = EncodedColumnsUtil.getEmptyKeyValueInfo(table).getFirst(); + Pair<byte[], byte[]> pair = new Pair<>(emptyColumnFamily, emptyKeyValue); columnIndexes.put(new Integer(columnIndex), pair); columnIndex++; } @@ -123,18 +128,17 @@ public class FormatToKeyValueReducer Reducer<TableRowkeyPair, ImmutableBytesWritable, TableRowkeyPair, KeyValue>.Context context) throws IOException, InterruptedException { TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR); - ImmutableBytesWritable rowKey = key.getRowkey(); for (ImmutableBytesWritable aggregatedArray : values) { DataInputStream input = new DataInputStream(new ByteArrayInputStream(aggregatedArray.get())); while (input.available() != 0) { byte type = input.readByte(); int index = WritableUtils.readVInt(input); ImmutableBytesWritable family; - ImmutableBytesWritable name; + ImmutableBytesWritable cq; ImmutableBytesWritable value = QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR; Pair<byte[], byte[]> pair = columnIndexes.get(index); family = new ImmutableBytesWritable(pair.getFirst()); - name = new ImmutableBytesWritable(pair.getSecond()); + cq = new ImmutableBytesWritable(pair.getSecond()); int len = WritableUtils.readVInt(input); if (len > 0) { byte[] array = new byte[len]; @@ -145,10 +149,10 @@ public class FormatToKeyValueReducer KeyValue.Type kvType = KeyValue.Type.codeToType(type); switch (kvType) { case Put: // not null value - kv = builder.buildPut(key.getRowkey(), family, name, value); + kv = builder.buildPut(key.getRowkey(), family, cq, value); break; case DeleteColumn: // null value - kv = builder.buildDeleteColumns(key.getRowkey(), family, name); + kv = builder.buildDeleteColumns(key.getRowkey(), family, cq); break; default: throw new IOException("Unsupported KeyValue type " + kvType); @@ -164,4 +168,4 @@ public class FormatToKeyValueReducer if (++index % 100 == 0) context.setStatus("Wrote " + index); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index f14371d..bdb2432 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -32,8 +32,6 @@ import static org.apache.phoenix.util.UpgradeUtil.upgradeTo4_5_0; import java.io.IOException; import java.lang.ref.WeakReference; -import java.sql.PreparedStatement; -import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; @@ -85,7 +83,6 @@ import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; import org.apache.hadoop.hbase.regionserver.IndexHalfStoreFileReaderGenerator; -import org.apache.hadoop.hbase.regionserver.LocalIndexSplitter; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; @@ -161,7 +158,6 @@ import org.apache.phoenix.schema.PMetaDataImpl; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.ReadOnlyTableException; @@ -182,7 +178,6 @@ import org.apache.phoenix.schema.types.PVarchar; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.ConfigUtil; -import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.JDBCUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixContextExecutor; @@ -570,6 +565,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } + @Override public PMetaData updateResolvedTimestamp(PTable table, long resolvedTime) throws SQLException { synchronized (latestMetaDataLock) { throwConnectionClosedIfNullMetaData(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java index 9f8f58c..b1733f0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java @@ -31,6 +31,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CURRENT_VALUE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CYCLE_FLAG; @@ -40,6 +41,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DECIMAL_DIGITS; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODED_COLUMN_QUALIFIER; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FUNCTION_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS_ROW_COUNT; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.GUIDE_POSTS_WIDTH; @@ -85,6 +87,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SOURCE_DATA_TYPE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SQL_DATA_TYPE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SQL_DATETIME_SUB; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_WITH; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORAGE_SCHEME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE; @@ -118,7 +121,7 @@ import org.apache.phoenix.schema.MetaDataSplitPolicy; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.SortOrder; -import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.schema.types.PInteger; /** @@ -148,23 +151,30 @@ public interface QueryConstants { public final static byte[] OFFSET_ROW_KEY_BYTES = Bytes.toBytes(OFFSET_ROW_KEY); public final static ImmutableBytesPtr OFFSET_ROW_KEY_PTR = new ImmutableBytesPtr(OFFSET_ROW_KEY_BYTES); - public final static PName SINGLE_COLUMN_NAME = PNameFactory.newNormalizedName("s"); - public final static PName SINGLE_COLUMN_FAMILY_NAME = PNameFactory.newNormalizedName("s"); - public final static byte[] SINGLE_COLUMN = SINGLE_COLUMN_NAME.getBytes(); - public final static byte[] SINGLE_COLUMN_FAMILY = SINGLE_COLUMN_FAMILY_NAME.getBytes(); - public static final long AGG_TIMESTAMP = HConstants.LATEST_TIMESTAMP; /** * Key used for a single row aggregation where there is no group by */ public final static byte[] UNGROUPED_AGG_ROW_KEY = Bytes.toBytes("a"); - public final static PName AGG_COLUMN_NAME = SINGLE_COLUMN_NAME; - public final static PName AGG_COLUMN_FAMILY_NAME = SINGLE_COLUMN_FAMILY_NAME; - - public static final byte[] ARRAY_VALUE_COLUMN_FAMILY = Bytes.toBytes("a"); - // Use empty byte array for column qualifier so as not to accidentally conflict with any other columns - public static final byte[] ARRAY_VALUE_COLUMN_QUALIFIER = ByteUtil.EMPTY_BYTE_ARRAY; + + /** BEGIN Set of reserved column qualifiers **/ + + public static final String RESERVED_COLUMN_FAMILY = "_r"; + public static final byte[] RESERVED_COLUMN_FAMILY_BYTES = Bytes.toBytes(RESERVED_COLUMN_FAMILY); + + public static final byte[] VALUE_COLUMN_FAMILY = RESERVED_COLUMN_FAMILY_BYTES; + public static final byte[] VALUE_COLUMN_QUALIFIER = PInteger.INSTANCE.toBytes(1); + + public static final byte[] ARRAY_VALUE_COLUMN_FAMILY = RESERVED_COLUMN_FAMILY_BYTES; + public static final byte[] ARRAY_VALUE_COLUMN_QUALIFIER = PInteger.INSTANCE.toBytes(2); + + public final static PName SINGLE_COLUMN_NAME = PNameFactory.newNormalizedName("s"); + public final static PName SINGLE_COLUMN_FAMILY_NAME = PNameFactory.newNormalizedName("s"); + public final static byte[] SINGLE_COLUMN = PInteger.INSTANCE.toBytes(3); + public final static byte[] SINGLE_COLUMN_FAMILY = RESERVED_COLUMN_FAMILY_BYTES; + /** END Set of reserved column qualifiers **/ + public static final byte[] TRUE = new byte[] {1}; @@ -186,11 +196,18 @@ public interface QueryConstants { public static final byte[] EMPTY_COLUMN_BYTES = Bytes.toBytes(EMPTY_COLUMN_NAME); public static final ImmutableBytesPtr EMPTY_COLUMN_BYTES_PTR = new ImmutableBytesPtr( EMPTY_COLUMN_BYTES); + public static final Integer ENCODED_EMPTY_COLUMN_NAME = 0; + public static final byte[] ENCODED_EMPTY_COLUMN_BYTES = PInteger.INSTANCE.toBytes(ENCODED_EMPTY_COLUMN_NAME); + public static final ImmutableBytesPtr ENCODED_EMPTY_COLUMN_BYTES_PTR = new ImmutableBytesPtr( + ENCODED_EMPTY_COLUMN_BYTES); public final static String EMPTY_COLUMN_VALUE = "x"; public final static byte[] EMPTY_COLUMN_VALUE_BYTES = Bytes.toBytes(EMPTY_COLUMN_VALUE); public static final ImmutableBytesPtr EMPTY_COLUMN_VALUE_BYTES_PTR = new ImmutableBytesPtr( EMPTY_COLUMN_VALUE_BYTES); - + public static final String ENCODED_EMPTY_COLUMN_VALUE = EMPTY_COLUMN_VALUE; + public final static byte[] ENCODED_EMPTY_COLUMN_VALUE_BYTES = Bytes.toBytes(EMPTY_COLUMN_VALUE); + public static final ImmutableBytesPtr ENCODED_EMPTY_COLUMN_VALUE_BYTES_PTR = new ImmutableBytesPtr( + ENCODED_EMPTY_COLUMN_VALUE_BYTES); public static final String DEFAULT_COLUMN_FAMILY = "0"; public static final byte[] DEFAULT_COLUMN_FAMILY_BYTES = Bytes.toBytes(DEFAULT_COLUMN_FAMILY); public static final ImmutableBytesPtr DEFAULT_COLUMN_FAMILY_BYTES_PTR = new ImmutableBytesPtr( @@ -216,6 +233,13 @@ public interface QueryConstants { public static final int NANOS_IN_SECOND = BigDecimal.valueOf(Math.pow(10, 9)).intValue(); public static final int DIVERGED_VIEW_BASE_COLUMN_COUNT = -100; public static final int BASE_TABLE_BASE_COLUMN_COUNT = -1; + + //TODO: samarth think about this more. + /** + * We mark counter values 0 to 10 as reserved. Value 0 is used by {@link #ENCODED_EMPTY_COLUMN_NAME}. Values 1-10 + * are reserved for special column qualifiers returned by Phoenix co-processors. + */ + public static final int ENCODED_CQ_COUNTER_INITIAL_VALUE = 11; public static final String CREATE_TABLE_METADATA = // Do not use IF NOT EXISTS as we sometimes catch the TableAlreadyExists // exception and add columns to the SYSTEM.TABLE dynamically. @@ -282,6 +306,9 @@ public interface QueryConstants { IS_NAMESPACE_MAPPED + " BOOLEAN," + AUTO_PARTITION_SEQ + " VARCHAR," + APPEND_ONLY_SCHEMA + " BOOLEAN," + + ENCODED_COLUMN_QUALIFIER + " INTEGER," + + STORAGE_SCHEME + " TINYINT, " + + COLUMN_QUALIFIER_COUNTER + " INTEGER, " + "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + "," + TABLE_SCHEM + "," + TABLE_NAME + "," + COLUMN_NAME + "," + COLUMN_FAMILY + "))\n" + HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" + http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 892482d..0761b73 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -238,7 +238,7 @@ public class QueryServicesOptions { // doesn't depend on phoenix-core. public static final String DEFAULT_QUERY_SERVER_SERIALIZATION = "PROTOBUF"; public static final int DEFAULT_QUERY_SERVER_HTTP_PORT = 8765; - public static final boolean DEFAULT_RENEW_LEASE_ENABLED = true; + public static final boolean DEFAULT_RENEW_LEASE_ENABLED = false; public static final int DEFAULT_RUN_RENEW_LEASE_FREQUENCY_INTERVAL_MILLISECONDS = DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD / 2; public static final int DEFAULT_RENEW_LEASE_THRESHOLD_MILLISECONDS = http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java index 76f6218..544fb20 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnRef.java @@ -22,6 +22,7 @@ import org.apache.phoenix.expression.ColumnExpression; import org.apache.phoenix.expression.KeyValueColumnExpression; import org.apache.phoenix.expression.ProjectedColumnExpression; import org.apache.phoenix.expression.RowKeyColumnExpression; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.SchemaUtil; @@ -45,7 +46,7 @@ public class ColumnRef { } public ColumnRef(TableRef tableRef, String familyName, String columnName) throws MetaDataEntityNotFoundException { - this(tableRef, tableRef.getTable().getColumnFamily(familyName).getColumn(columnName).getPosition()); + this(tableRef, tableRef.getTable().getColumnFamily(familyName).getPColumnForColumnName(columnName).getPosition()); } public ColumnRef(TableRef tableRef, int columnPosition) { @@ -109,7 +110,7 @@ public class ColumnRef { return new ProjectedColumnExpression(column, table, displayName); } - return new KeyValueColumnExpression(column, displayName); + return new KeyValueColumnExpression(column, displayName, EncodedColumnsUtil.usesEncodedColumnNames(table)); } public ColumnRef cloneAtTimestamp(long timestamp) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java index a60229e..4ac8f46 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateColumn.java @@ -90,4 +90,9 @@ public class DelegateColumn extends DelegateDatum implements PColumn { public boolean isDynamic() { return getDelegate().isDynamic(); } + + @Override + public Integer getEncodedColumnQualifier() { + return getDelegate().getEncodedColumnQualifier(); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc472a8e/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java index 3ee012f..c7547c3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java @@ -96,8 +96,8 @@ public class DelegateTable implements PTable { } @Override - public PColumn getColumn(String name) throws ColumnNotFoundException, AmbiguousColumnException { - return delegate.getColumn(name); + public PColumn getPColumnForColumnName(String name) throws ColumnNotFoundException, AmbiguousColumnException { + return delegate.getPColumnForColumnName(name); } @Override @@ -280,4 +280,20 @@ public class DelegateTable implements PTable { public boolean isAppendOnlySchema() { return delegate.isAppendOnlySchema(); } + + @Override + public StorageScheme getStorageScheme() { + return delegate.getStorageScheme(); + } + + @Override + public PColumn getPColumnForColumnQualifier(byte[] cq) throws ColumnNotFoundException, AmbiguousColumnException { + return delegate.getPColumnForColumnQualifier(cq); + } + + @Override + public EncodedCQCounter getEncodedCQCounter() { + return delegate.getEncodedCQCounter(); + + } }