http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java index c5065e0..59c10ad 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java @@ -19,120 +19,7 @@ package org.apache.phoenix.expression; import java.util.Map; -import org.apache.phoenix.expression.function.AbsFunction; -import org.apache.phoenix.expression.function.ArrayAllComparisonExpression; -import org.apache.phoenix.expression.function.ArrayAnyComparisonExpression; -import org.apache.phoenix.expression.function.ArrayAppendFunction; -import org.apache.phoenix.expression.function.ArrayConcatFunction; -import org.apache.phoenix.expression.function.ArrayElemRefExpression; -import org.apache.phoenix.expression.function.ArrayFillFunction; -import org.apache.phoenix.expression.function.ArrayIndexFunction; -import org.apache.phoenix.expression.function.ArrayLengthFunction; -import org.apache.phoenix.expression.function.ArrayPrependFunction; -import org.apache.phoenix.expression.function.ArrayRemoveFunction; -import org.apache.phoenix.expression.function.ArrayToStringFunction; -import org.apache.phoenix.expression.function.ByteBasedRegexpReplaceFunction; -import org.apache.phoenix.expression.function.ByteBasedRegexpSplitFunction; -import org.apache.phoenix.expression.function.ByteBasedRegexpSubstrFunction; -import org.apache.phoenix.expression.function.CbrtFunction; -import org.apache.phoenix.expression.function.CeilDateExpression; -import org.apache.phoenix.expression.function.CeilDecimalExpression; -import org.apache.phoenix.expression.function.CeilFunction; -import org.apache.phoenix.expression.function.CeilMonthExpression; -import org.apache.phoenix.expression.function.CeilTimestampExpression; -import org.apache.phoenix.expression.function.CeilWeekExpression; -import org.apache.phoenix.expression.function.CeilYearExpression; -import org.apache.phoenix.expression.function.CoalesceFunction; -import org.apache.phoenix.expression.function.CollationKeyFunction; -import org.apache.phoenix.expression.function.ConvertTimezoneFunction; -import org.apache.phoenix.expression.function.CountAggregateFunction; -import org.apache.phoenix.expression.function.DayOfMonthFunction; -import org.apache.phoenix.expression.function.DayOfWeekFunction; -import org.apache.phoenix.expression.function.DayOfYearFunction; -import org.apache.phoenix.expression.function.DecodeFunction; -import org.apache.phoenix.expression.function.DefaultValueExpression; -import org.apache.phoenix.expression.function.DistinctCountAggregateFunction; -import org.apache.phoenix.expression.function.DistinctCountHyperLogLogAggregateFunction; -import org.apache.phoenix.expression.function.DistinctValueAggregateFunction; -import org.apache.phoenix.expression.function.EncodeFunction; -import org.apache.phoenix.expression.function.ExpFunction; -import org.apache.phoenix.expression.function.ExternalSqlTypeIdFunction; -import org.apache.phoenix.expression.function.FirstValueFunction; -import org.apache.phoenix.expression.function.FirstValuesFunction; -import org.apache.phoenix.expression.function.FloorDateExpression; -import org.apache.phoenix.expression.function.FloorDecimalExpression; -import org.apache.phoenix.expression.function.FloorFunction; -import org.apache.phoenix.expression.function.FloorMonthExpression; -import org.apache.phoenix.expression.function.FloorWeekExpression; -import org.apache.phoenix.expression.function.FloorYearExpression; -import org.apache.phoenix.expression.function.GetBitFunction; -import org.apache.phoenix.expression.function.GetByteFunction; -import org.apache.phoenix.expression.function.HourFunction; -import org.apache.phoenix.expression.function.IndexStateNameFunction; -import org.apache.phoenix.expression.function.InstrFunction; -import org.apache.phoenix.expression.function.InvertFunction; -import org.apache.phoenix.expression.function.LTrimFunction; -import org.apache.phoenix.expression.function.LastValueFunction; -import org.apache.phoenix.expression.function.LastValuesFunction; -import org.apache.phoenix.expression.function.LengthFunction; -import org.apache.phoenix.expression.function.LnFunction; -import org.apache.phoenix.expression.function.LogFunction; -import org.apache.phoenix.expression.function.LowerFunction; -import org.apache.phoenix.expression.function.LpadFunction; -import org.apache.phoenix.expression.function.MD5Function; -import org.apache.phoenix.expression.function.MaxAggregateFunction; -import org.apache.phoenix.expression.function.MinAggregateFunction; -import org.apache.phoenix.expression.function.MinuteFunction; -import org.apache.phoenix.expression.function.MonthFunction; -import org.apache.phoenix.expression.function.NowFunction; -import org.apache.phoenix.expression.function.NthValueFunction; -import org.apache.phoenix.expression.function.OctetLengthFunction; -import org.apache.phoenix.expression.function.PercentRankAggregateFunction; -import org.apache.phoenix.expression.function.PercentileContAggregateFunction; -import org.apache.phoenix.expression.function.PercentileDiscAggregateFunction; -import org.apache.phoenix.expression.function.PowerFunction; -import org.apache.phoenix.expression.function.RTrimFunction; -import org.apache.phoenix.expression.function.RandomFunction; -import org.apache.phoenix.expression.function.RegexpReplaceFunction; -import org.apache.phoenix.expression.function.RegexpSplitFunction; -import org.apache.phoenix.expression.function.RegexpSubstrFunction; -import org.apache.phoenix.expression.function.ReverseFunction; -import org.apache.phoenix.expression.function.RoundDateExpression; -import org.apache.phoenix.expression.function.RoundDecimalExpression; -import org.apache.phoenix.expression.function.RoundFunction; -import org.apache.phoenix.expression.function.RoundMonthExpression; -import org.apache.phoenix.expression.function.RoundTimestampExpression; -import org.apache.phoenix.expression.function.RoundWeekExpression; -import org.apache.phoenix.expression.function.RoundYearExpression; -import org.apache.phoenix.expression.function.SQLIndexTypeFunction; -import org.apache.phoenix.expression.function.SQLTableTypeFunction; -import org.apache.phoenix.expression.function.SQLViewTypeFunction; -import org.apache.phoenix.expression.function.SecondFunction; -import org.apache.phoenix.expression.function.SetBitFunction; -import org.apache.phoenix.expression.function.SetByteFunction; -import org.apache.phoenix.expression.function.SignFunction; -import org.apache.phoenix.expression.function.SqlTypeNameFunction; -import org.apache.phoenix.expression.function.SqrtFunction; -import org.apache.phoenix.expression.function.StddevPopFunction; -import org.apache.phoenix.expression.function.StddevSampFunction; -import org.apache.phoenix.expression.function.StringBasedRegexpReplaceFunction; -import org.apache.phoenix.expression.function.StringBasedRegexpSplitFunction; -import org.apache.phoenix.expression.function.StringBasedRegexpSubstrFunction; -import org.apache.phoenix.expression.function.StringToArrayFunction; -import org.apache.phoenix.expression.function.SubstrFunction; -import org.apache.phoenix.expression.function.SumAggregateFunction; -import org.apache.phoenix.expression.function.TimezoneOffsetFunction; -import org.apache.phoenix.expression.function.ToCharFunction; -import org.apache.phoenix.expression.function.ToDateFunction; -import org.apache.phoenix.expression.function.ToNumberFunction; -import org.apache.phoenix.expression.function.ToTimeFunction; -import org.apache.phoenix.expression.function.ToTimestampFunction; -import org.apache.phoenix.expression.function.TrimFunction; -import org.apache.phoenix.expression.function.TruncFunction; -import org.apache.phoenix.expression.function.UDFExpression; -import org.apache.phoenix.expression.function.UpperFunction; -import org.apache.phoenix.expression.function.WeekFunction; -import org.apache.phoenix.expression.function.YearFunction; +import org.apache.phoenix.expression.function.*; import com.google.common.collect.Maps; @@ -298,7 +185,9 @@ public enum ExpressionType { LastValuesFunction(LastValuesFunction.class), DistinctCountHyperLogLogAggregateFunction(DistinctCountHyperLogLogAggregateFunction.class), CollationKeyFunction(CollationKeyFunction.class), - ArrayRemoveFunction(ArrayRemoveFunction.class); + ArrayRemoveFunction(ArrayRemoveFunction.class), + TransactionProviderNameFunction(TransactionProviderNameFunction.class), + ; ExpressionType(Class<? extends Expression> clazz) { this.clazz = clazz;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/expression/function/TransactionProviderNameFunction.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/TransactionProviderNameFunction.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/TransactionProviderNameFunction.java new file mode 100644 index 0000000..0117c1f --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/TransactionProviderNameFunction.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.expression.function; + +import java.sql.SQLException; +import java.util.List; + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.parse.FunctionParseNode.Argument; +import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.schema.types.PInteger; +import org.apache.phoenix.schema.types.PTinyint; +import org.apache.phoenix.schema.types.PVarchar; +import org.apache.phoenix.transaction.TransactionFactory; + + +/** + * + * Function used to get the index state name from the serialized byte value + * Usage: + * IndexStateName('a') + * will return 'ACTIVE' + * + * + * @since 2.1 + */ +@BuiltInFunction(name=TransactionProviderNameFunction.NAME, args= { + @Argument(allowedTypes= PInteger.class)} ) +public class TransactionProviderNameFunction extends ScalarFunction { + public static final String NAME = "TransactionProviderName"; + + public TransactionProviderNameFunction() { + } + + public TransactionProviderNameFunction(List<Expression> children) throws SQLException { + super(children); + } + + @Override + public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) { + Expression child = children.get(0); + if (!child.evaluate(tuple, ptr)) { + return false; + } + if (ptr.getLength() == 0) { + return true; + } + int code = PTinyint.INSTANCE.getCodec().decodeByte(ptr, child.getSortOrder()); + TransactionFactory.Provider provider = TransactionFactory.Provider.fromCode(code); + ptr.set(PVarchar.INSTANCE.toBytes(provider.name())); + return true; + } + + @Override + public PDataType getDataType() { + return PVarchar.INSTANCE; + } + + @Override + public String getName() { + return NAME; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index 15d8ac3..2f41dc3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -101,7 +101,6 @@ import org.apache.phoenix.schema.ValueSchema.Field; import org.apache.phoenix.schema.tuple.BaseTuple; import org.apache.phoenix.schema.tuple.ValueGetterTuple; import org.apache.phoenix.schema.types.PDataType; -import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.BitSet; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.EncodedColumnsUtil; @@ -109,6 +108,7 @@ import org.apache.phoenix.util.ExpressionUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.TransactionUtil; import org.apache.phoenix.util.TrustedByteArrayOutputStream; import com.google.common.base.Preconditions; @@ -1068,7 +1068,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } else if (kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode() // Since we don't include the index rows in the change set for txn tables, we need to detect row deletes that have transformed by TransactionProcessor - || (CellUtil.matchingQualifier(kv, TransactionFactory.getTransactionProvider().getTransactionContext().getFamilyDeleteMarker()) && CellUtil.matchingValue(kv, HConstants.EMPTY_BYTE_ARRAY))) { + || TransactionUtil.isDeleteFamily(kv)) { nDeleteCF++; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java index 94fbd0d..778401e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheFactory.java @@ -52,7 +52,7 @@ public class IndexMetaDataCacheFactory implements ServerCacheFactory { IndexMaintainer.deserialize(cachePtr, GenericKeyValueBuilder.INSTANCE, useProtoForIndexMaintainer); final PhoenixTransactionContext txnContext; try { - txnContext = txState.length != 0 ? TransactionFactory.getTransactionProvider().getTransactionContext(txState) : null; + txnContext = TransactionFactory.getTransactionContext(txState, clientVersion); } catch (IOException e) { throw new SQLException(e); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java index c5233d3..d33e3fe 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexCodec.java @@ -44,7 +44,6 @@ public class PhoenixIndexCodec extends BaseIndexCodec { public static final String INDEX_PROTO_MD = "IdxProtoMD"; public static final String INDEX_UUID = "IdxUUID"; public static final String INDEX_MAINTAINERS = "IndexMaintainers"; - public static final String CLIENT_VERSION = "_ClientVersion"; public static KeyValueBuilder KV_BUILDER = GenericKeyValueBuilder.INSTANCE; private byte[] regionStartKey; http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java index 5e6f756..949e6ed 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaDataBuilder.java @@ -37,6 +37,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.transaction.PhoenixTransactionContext; import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; public class PhoenixIndexMetaDataBuilder { @@ -63,9 +64,9 @@ public class PhoenixIndexMetaDataBuilder { boolean useProto = md != null; byte[] txState = attributes.get(BaseScannerRegionObserver.TX_STATE); final List<IndexMaintainer> indexMaintainers = IndexMaintainer.deserialize(md, useProto); - final PhoenixTransactionContext txnContext = TransactionFactory.getTransactionProvider().getTransactionContext(txState); - byte[] clientVersionBytes = attributes.get(PhoenixIndexCodec.CLIENT_VERSION); - final int clientVersion = clientVersionBytes == null ? IndexMetaDataCache.UNKNOWN_CLIENT_VERSION : Bytes.toInt(clientVersionBytes); + byte[] clientVersionBytes = attributes.get(BaseScannerRegionObserver.CLIENT_VERSION); + final int clientVersion = clientVersionBytes == null ? ScanUtil.UNKNOWN_CLIENT_VERSION : Bytes.toInt(clientVersionBytes); + final PhoenixTransactionContext txnContext = TransactionFactory.getTransactionContext(txState, clientVersion); return new IndexMetaDataCache() { @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java index ba6a08f..cc7221e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java @@ -42,7 +42,6 @@ import org.apache.phoenix.cache.TenantCache; import org.apache.phoenix.coprocessor.BaseRegionScanner; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.coprocessor.HashJoinRegionScanner; -import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.KeyValueColumnExpression; @@ -63,6 +62,7 @@ import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.transaction.PhoenixTransactionContext; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.ScanUtil; @@ -128,12 +128,13 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory { if (localIndexBytes == null) { localIndexBytes = scan.getAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD); } + int clientVersion = ScanUtil.getClientVersion(scan); List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes, useProto); indexMaintainer = indexMaintainers.get(0); viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan); byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE); - tx = MutationState.decodeTransaction(txState); + tx = TransactionFactory.getTransactionContext(txState, clientVersion); } final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan); http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java index 9caf7fb..add0628 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java @@ -47,6 +47,7 @@ import org.apache.phoenix.expression.function.SQLIndexTypeFunction; import org.apache.phoenix.expression.function.SQLTableTypeFunction; import org.apache.phoenix.expression.function.SQLViewTypeFunction; import org.apache.phoenix.expression.function.SqlTypeNameFunction; +import org.apache.phoenix.expression.function.TransactionProviderNameFunction; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.VersionUtil; import org.apache.phoenix.iterate.DelegateResultIterator; @@ -297,6 +298,9 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData { public static final String TRANSACTIONAL = "TRANSACTIONAL"; public static final byte[] TRANSACTIONAL_BYTES = Bytes.toBytes(TRANSACTIONAL); + public static final String TRANSACTION_PROVIDER = "TRANSACTION_PROVIDER"; + public static final byte[] TRANSACTION_PROVIDER_BYTES = Bytes.toBytes(TRANSACTION_PROVIDER); + public static final String UPDATE_CACHE_FREQUENCY = "UPDATE_CACHE_FREQUENCY"; public static final byte[] UPDATE_CACHE_FREQUENCY_BYTES = Bytes.toBytes(UPDATE_CACHE_FREQUENCY); @@ -1133,9 +1137,10 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData { VIEW_STATEMENT + "," + SQLViewTypeFunction.NAME + "(" + VIEW_TYPE + ") AS " + VIEW_TYPE + "," + SQLIndexTypeFunction.NAME + "(" + INDEX_TYPE + ") AS " + INDEX_TYPE + "," + - TRANSACTIONAL + "," + + TRANSACTION_PROVIDER + " IS NOT NULL AS " + TRANSACTIONAL + "," + IS_NAMESPACE_MAPPED + "," + - GUIDE_POSTS_WIDTH + + GUIDE_POSTS_WIDTH + "," + + TransactionProviderNameFunction.NAME + "(" + TRANSACTION_PROVIDER + ") AS TRANSACTION_PROVIDER" + " from " + SYSTEM_CATALOG + " " + SYSTEM_CATALOG_ALIAS + " where " + COLUMN_NAME + " is null" + " and " + COLUMN_FAMILY + " is null" + @@ -1175,7 +1180,8 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData { "'' " + INDEX_TYPE + "," + "CAST(null AS BOOLEAN) " + TRANSACTIONAL + "," + "CAST(null AS BOOLEAN) " + IS_NAMESPACE_MAPPED + "," + - "CAST(null AS BIGINT) " + GUIDE_POSTS_WIDTH + "\n"); + "CAST(null AS BIGINT) " + GUIDE_POSTS_WIDTH + "," + + "CAST(null AS VARCHAR) " + TRANSACTION_PROVIDER + "\n"); buf.append( " from " + SYSTEM_SEQUENCE + "\n"); StringBuilder whereClause = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index f526419..25b9fb0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -405,7 +405,7 @@ public class PhoenixStatement implements Statement, SQLCloseable { MutationState state = connection.getMutationState(); MutationPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.VALIDATE_SEQUENCE); if (plan.getTargetRef() != null && plan.getTargetRef().getTable() != null && plan.getTargetRef().getTable().isTransactional()) { - state.startTransaction(); + state.startTransaction(plan.getTargetRef().getTable().getTransactionProvider()); } Iterator<TableRef> tableRefs = plan.getSourceRefs().iterator(); state.sendUncommitted(tableRefs); http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java index 7c154f0..f4ecac2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java @@ -116,7 +116,7 @@ public class PhoenixIndexPartialBuildMapper extends TableMapper<ImmutableBytesWr put = new Put(CellUtil.cloneRow(cell)); put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); put.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue); - put.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, clientVersion); + put.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersion); put.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES); mutations.add(put); } @@ -126,7 +126,7 @@ public class PhoenixIndexPartialBuildMapper extends TableMapper<ImmutableBytesWr del = new Delete(CellUtil.cloneRow(cell)); del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); del.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue); - del.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, clientVersion); + del.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, clientVersion); del.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES); mutations.add(del); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java index 0b72ada..b75119b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java @@ -48,6 +48,8 @@ import org.apache.phoenix.schema.SequenceAllocation; import org.apache.phoenix.schema.SequenceKey; import org.apache.phoenix.schema.stats.GuidePostsInfo; import org.apache.phoenix.schema.stats.GuidePostsKey; +import org.apache.phoenix.transaction.PhoenixTransactionClient; +import org.apache.phoenix.transaction.TransactionFactory; public interface ConnectionQueryServices extends QueryServices, MetaDataMutated { @@ -152,4 +154,6 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated public User getUser(); public QueryLoggerDisruptor getQueryDisruptor(); -} \ No newline at end of file + + public PhoenixTransactionClient initTransactionClient(TransactionFactory.Provider provider); +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 6627a84..5cb14d6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -116,6 +116,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory; @@ -138,10 +139,10 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode; import org.apache.phoenix.coprocessor.MetaDataRegionObserver; -import org.apache.phoenix.coprocessor.PhoenixTransactionalProcessor; import org.apache.phoenix.coprocessor.ScanRegionObserver; import org.apache.phoenix.coprocessor.SequenceRegionObserver; import org.apache.phoenix.coprocessor.ServerCachingEndpointImpl; +import org.apache.phoenix.coprocessor.TephraTransactionalProcessor; import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver; import org.apache.phoenix.coprocessor.generated.MetaDataProtos; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.AddColumnRequest; @@ -226,8 +227,10 @@ import org.apache.phoenix.schema.types.PTinyint; import org.apache.phoenix.schema.types.PUnsignedTinyint; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.schema.types.PVarchar; +import org.apache.phoenix.transaction.PhoenixTransactionClient; import org.apache.phoenix.transaction.PhoenixTransactionContext; import org.apache.phoenix.transaction.TransactionFactory; +import org.apache.phoenix.transaction.TransactionFactory.Provider; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.ConfigUtil; @@ -264,7 +267,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement private static final int DEFAULT_OUT_OF_ORDER_MUTATIONS_WAIT_TIME_MS = 1000; private static final int TTL_FOR_MUTEX = 15 * 60; // 15min protected final Configuration config; - private final ConnectionInfo connectionInfo; + protected final ConnectionInfo connectionInfo; // Copy of config.getProps(), but read-only to prevent synchronization that we // don't need. private final ReadOnlyProps props; @@ -306,6 +309,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // List of queues instead of a single queue to provide reduced contention via lock striping private final List<LinkedBlockingQueue<WeakReference<PhoenixConnection>>> connectionQueues; private ScheduledExecutorService renewLeaseExecutor; + private PhoenixTransactionClient[] txClients = new PhoenixTransactionClient[TransactionFactory.Provider.values().length];; /* * We can have multiple instances of ConnectionQueryServices. By making the thread factory * static, renew lease thread names will be unique across them. @@ -410,23 +414,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } - private void initTxServiceClient() { - txZKClientService = TransactionFactory.getTransactionProvider().getTransactionContext().setTransactionClient(config, props, connectionInfo); - } - private void openConnection() throws SQLException { try { - boolean transactionsEnabled = props.getBoolean( - QueryServices.TRANSACTIONS_ENABLED, - QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED); this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config); GLOBAL_HCONNECTIONS_COUNTER.increment(); logger.info("HConnection established. Stacktrace for informational purposes: " + connection + " " + LogUtil.getCallerStackTrace()); - // only initialize the tx service client if needed and if we succeeded in getting a connection - // to HBase - if (transactionsEnabled) { - initTxServiceClient(); - } } catch (IOException e) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION) .setRootCause(e).build().buildException(); @@ -517,7 +509,11 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement renewLeaseExecutor.shutdownNow(); } // shut down the tx client service if we created one to support transactions - if (this.txZKClientService != null) this.txZKClientService.stopAndWait(); + for (PhoenixTransactionClient client : txClients) { + if (client != null) { + client.close(); + } + } } } catch (IOException e) { if (sqlE == null) { @@ -858,9 +854,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement if (!descriptor.hasCoprocessor(ServerCachingEndpointImpl.class.getName())) { descriptor.addCoprocessor(ServerCachingEndpointImpl.class.getName(), null, priority, null); } + // For ALTER TABLE + boolean nonTxToTx = Boolean.TRUE.equals(tableProps.get(PhoenixTransactionContext.READ_NON_TX_DATA)); boolean isTransactional = - Boolean.TRUE.equals(tableProps.get(TableProperty.TRANSACTIONAL.name())) || - Boolean.TRUE.equals(tableProps.get(PhoenixTransactionContext.READ_NON_TX_DATA)); // For ALTER TABLE + Boolean.TRUE.equals(tableProps.get(TableProperty.TRANSACTIONAL.name())) || nonTxToTx; // TODO: better encapsulation for this // Since indexes can't have indexes, don't install our indexing coprocessor for indexes. // Also don't install on the SYSTEM.CATALOG and SYSTEM.STATS table because we use @@ -923,13 +920,22 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } if (isTransactional) { - if (!descriptor.hasCoprocessor(PhoenixTransactionalProcessor.class.getName())) { - descriptor.addCoprocessor(PhoenixTransactionalProcessor.class.getName(), null, priority - 10, null); + TransactionFactory.Provider provider = (TransactionFactory.Provider)TableProperty.TRANSACTION_PROVIDER.getValue(tableProps); + if (provider == null) { + String providerValue = this.props.get(QueryServices.DEFAULT_TRANSACTION_PROVIDER_ATTRIB, QueryServicesOptions.DEFAULT_TRANSACTION_PROVIDER); + provider = (TransactionFactory.Provider)TableProperty.TRANSACTION_PROVIDER.getValue(providerValue); + } + Class<? extends RegionObserver> coprocessorClass = provider.getTransactionProvider().getCoprocessor(); + if (!descriptor.hasCoprocessor(coprocessorClass.getName())) { + descriptor.addCoprocessor(coprocessorClass.getName(), null, priority - 10, null); } } else { - // If exception on alter table to transition back to non transactional - if (descriptor.hasCoprocessor(PhoenixTransactionalProcessor.class.getName())) { - descriptor.removeCoprocessor(PhoenixTransactionalProcessor.class.getName()); + // Remove all potential transactional coprocessors + for (TransactionFactory.Provider provider : TransactionFactory.Provider.values()) { + Class<? extends RegionObserver> coprocessorClass = provider.getTransactionProvider().getCoprocessor(); + if (coprocessorClass != null && descriptor.hasCoprocessor(coprocessorClass.getName())) { + descriptor.removeCoprocessor(coprocessorClass.getName()); + } } } } catch (IOException e) { @@ -1126,7 +1132,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } else { // If we think we're creating a non transactional table when it's already // transactional, don't allow. - if (existingDesc.hasCoprocessor(PhoenixTransactionalProcessor.class.getName())) { + if (existingDesc.hasCoprocessor(TephraTransactionalProcessor.class.getName())) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX) .setSchemaName(SchemaUtil.getSchemaNameFromFullName(physicalTableName)) .setTableName(SchemaUtil.getTableNameFromFullName(physicalTableName)).build().buildException(); @@ -2895,6 +2901,14 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement + PBoolean.INSTANCE.getSqlTypeName()); addParentToChildLinks(metaConnection); } + if (currentServerSideTableTimeStamp < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0) { + metaConnection = addColumnsIfNotExists( + metaConnection, + PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0, + PhoenixDatabaseMetaData.TRANSACTION_PROVIDER + " " + + PTinyint.INSTANCE.getSqlTypeName()); + } } @@ -4080,7 +4094,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement public String getUserName() { return userName; } - + @Override public User getUser() { return user; @@ -4515,4 +4529,13 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement public QueryLoggerDisruptor getQueryDisruptor() { return this.queryDisruptor; } + + @Override + public synchronized PhoenixTransactionClient initTransactionClient(Provider provider) { + PhoenixTransactionClient client = txClients[provider.ordinal()]; + if (client == null) { + client = txClients[provider.ordinal()] = provider.getTransactionProvider().getTransactionClient(config, connectionInfo); + } + return client; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index ad354d1..aa8209d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -82,7 +82,8 @@ import org.apache.phoenix.schema.TableAlreadyExistsException; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.stats.GuidePostsInfo; import org.apache.phoenix.schema.stats.GuidePostsKey; -import org.apache.phoenix.transaction.TransactionFactory; +import org.apache.phoenix.transaction.PhoenixTransactionClient; +import org.apache.phoenix.transaction.TransactionFactory.Provider; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.JDBCUtil; import org.apache.phoenix.util.MetaDataUtil; @@ -116,7 +117,7 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple private final Configuration config; private User user; - + public ConnectionlessQueryServicesImpl(QueryServices services, ConnectionInfo connInfo, Properties info) { super(services); userName = connInfo.getPrincipal(); @@ -141,7 +142,6 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple // Without making a copy of the configuration we cons up, we lose some of our properties // on the server side during testing. this.config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(config); - TransactionFactory.getTransactionProvider().getTransactionContext().setInMemoryTransactionClient(config); this.guidePostsCache = new GuidePostsCache(this, config); } @@ -682,4 +682,9 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple public QueryLoggerDisruptor getQueryDisruptor() { return null; } + + @Override + public PhoenixTransactionClient initTransactionClient(Provider provider) { + return null; // Client is not necessary + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java index f5c8a59..ed9b9da 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java @@ -49,6 +49,8 @@ import org.apache.phoenix.schema.SequenceAllocation; import org.apache.phoenix.schema.SequenceKey; import org.apache.phoenix.schema.stats.GuidePostsInfo; import org.apache.phoenix.schema.stats.GuidePostsKey; +import org.apache.phoenix.transaction.PhoenixTransactionClient; +import org.apache.phoenix.transaction.TransactionFactory.Provider; public class DelegateConnectionQueryServices extends DelegateQueryServices implements ConnectionQueryServices { @@ -363,6 +365,11 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple public QueryLoggerDisruptor getQueryDisruptor() { return getDelegate().getQueryDisruptor(); } - - -} \ No newline at end of file + + + + @Override + public PhoenixTransactionClient initTransactionClient(Provider provider) { + return getDelegate().initTransactionClient(provider); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java index ae12e01..d181fc8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java @@ -100,6 +100,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTION_PROVIDER; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_SEQUENCE; @@ -339,6 +340,7 @@ public interface QueryConstants { ENCODING_SCHEME + " TINYINT, " + COLUMN_QUALIFIER_COUNTER + " INTEGER, " + USE_STATS_FOR_PARALLELIZATION + " BOOLEAN, " + + TRANSACTION_PROVIDER + " TINYINT, " + "CONSTRAINT " + SYSTEM_TABLE_PK_NAME + " PRIMARY KEY (" + TENANT_ID + "," + TABLE_SCHEM + "," + TABLE_NAME + "," + COLUMN_NAME + "," + COLUMN_FAMILY + "))\n" + HConstants.VERSIONS + "=" + MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS + ",\n" + http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 21f043c..29d18d9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -207,6 +207,7 @@ public interface QueryServices extends SQLCloseable { public static final String DEFAULT_KEEP_DELETED_CELLS_ATTRIB = "phoenix.table.default.keep.deleted.cells"; public static final String DEFAULT_STORE_NULLS_ATTRIB = "phoenix.table.default.store.nulls"; public static final String DEFAULT_TABLE_ISTRANSACTIONAL_ATTRIB = "phoenix.table.istransactional.default"; + public static final String DEFAULT_TRANSACTION_PROVIDER_ATTRIB = "phoenix.table.transaction.provider.default"; public static final String GLOBAL_METRICS_ENABLED = "phoenix.query.global.metrics.enabled"; // Transaction related configs http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 58c9812..70ac11b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -114,6 +114,7 @@ import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; import org.apache.phoenix.schema.PTableRefFactory; import org.apache.phoenix.trace.util.Tracing; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.ReadOnlyProps; @@ -263,6 +264,7 @@ public class QueryServicesOptions { // We'll also need this for transactions to work correctly public static final boolean DEFAULT_AUTO_COMMIT = false; public static final boolean DEFAULT_TABLE_ISTRANSACTIONAL = false; + public static final String DEFAULT_TRANSACTION_PROVIDER = TransactionFactory.Provider.getDefault().name(); public static final boolean DEFAULT_TRANSACTIONS_ENABLED = false; public static final boolean DEFAULT_IS_GLOBAL_METRICS_ENABLED = true; @@ -554,22 +556,22 @@ public class QueryServicesOptions { return set(GROUPBY_SPILL_FILES_ATTRIB, num); } - private QueryServicesOptions set(String name, boolean value) { + QueryServicesOptions set(String name, boolean value) { config.set(name, Boolean.toString(value)); return this; } - private QueryServicesOptions set(String name, int value) { + QueryServicesOptions set(String name, int value) { config.set(name, Integer.toString(value)); return this; } - private QueryServicesOptions set(String name, String value) { + QueryServicesOptions set(String name, String value) { config.set(name, value); return this; } - private QueryServicesOptions set(String name, long value) { + QueryServicesOptions set(String name, long value) { config.set(name, Long.toString(value)); return this; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java index 8f15c5e..d1b8f1e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.transaction.TransactionFactory; public class DelegateTable implements PTable { @Override @@ -237,7 +238,12 @@ public class DelegateTable implements PTable { } @Override - public boolean isTransactional() { + public TransactionFactory.Provider getTransactionProvider() { + return delegate.getTransactionProvider(); + } + + @Override + public final boolean isTransactional() { return delegate.isTransactional(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index d252879..1fb668e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -83,6 +83,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TENANT_ID; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTION_PROVIDER; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USE_STATS_FOR_PARALLELIZATION; @@ -222,6 +223,9 @@ import org.apache.phoenix.schema.types.PUnsignedLong; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.schema.types.PVarchar; import org.apache.phoenix.transaction.PhoenixTransactionContext; +import org.apache.phoenix.transaction.PhoenixTransactionProvider; +import org.apache.phoenix.transaction.TransactionFactory; +import org.apache.phoenix.transaction.TransactionFactory.Provider; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.CursorUtil; import org.apache.phoenix.util.EncodedColumnsUtil; @@ -280,7 +284,7 @@ public class MetaDataClient { INDEX_TYPE + "," + STORE_NULLS + "," + BASE_COLUMN_COUNT + "," + - TRANSACTIONAL + "," + + TRANSACTION_PROVIDER + "," + UPDATE_CACHE_FREQUENCY + "," + IS_NAMESPACE_MAPPED + "," + AUTO_PARTITION_SEQ + "," + @@ -572,14 +576,11 @@ public class MetaDataClient { } catch (TableNotFoundException e) { } - boolean defaultTransactional = connection.getQueryServices().getProps().getBoolean( - QueryServices.DEFAULT_TABLE_ISTRANSACTIONAL_ATTRIB, - QueryServicesOptions.DEFAULT_TRANSACTIONAL); // start a txn if all table are transactional by default or if we found the table in the cache and it is transactional // TODO if system tables become transactional remove the check - boolean isTransactional = defaultTransactional || (table!=null && table.isTransactional()); - if (!systemTable && isTransactional && !connection.getMutationState().isTransactionStarted()) { - connection.getMutationState().startTransaction(); + boolean isTransactional = (table!=null && table.isTransactional()); + if (isTransactional) { + connection.getMutationState().startTransaction(table.getTransactionProvider()); } resolvedTimestamp = resolvedTimestamp==null ? TransactionUtil.getResolvedTimestamp(connection, isTransactional, HConstants.LATEST_TIMESTAMP) : resolvedTimestamp; // Do not make rpc to getTable if @@ -632,16 +633,20 @@ public class MetaDataClient { result = queryServices.getTable(tenantId, schemaBytes, tableBytes, tableTimestamp, resolvedTimestamp); - // if the table was assumed to be transactional, but is actually not transactional - // then re-resolve as of the right timestamp (and vice versa) - if (table == null && result.getTable() != null - && result.getTable().isTransactional() != isTransactional) { - result = - queryServices.getTable(tenantId, schemaBytes, tableBytes, - tableTimestamp, - TransactionUtil.getResolvedTimestamp(connection, - result.getTable().isTransactional(), - HConstants.LATEST_TIMESTAMP)); + // if the table was assumed to be non transactional, but is actually transactional + // then re-resolve as of the right timestamp + if (result.getTable() != null + && result.getTable().isTransactional() + && !isTransactional) { + long resolveTimestamp = TransactionUtil.getResolvedTimestamp(connection, + result.getTable().isTransactional(), + HConstants.LATEST_TIMESTAMP); + // Reresolve if table timestamp is past timestamp as of which we should see data + if (result.getTable().getTimeStamp() >= resolveTimestamp) { + result = + queryServices.getTable(tenantId, schemaBytes, tableBytes, + tableTimestamp, resolveTimestamp); + } } if (SYSTEM_CATALOG_SCHEMA.equals(schemaName)) { @@ -1236,8 +1241,8 @@ public class MetaDataClient { //view all the data belonging to the table PTable nonTxnLogicalTable = new DelegateTable(logicalTable) { @Override - public boolean isTransactional() { - return false; + public TransactionFactory.Provider getTransactionProvider() { + return null; } }; TableRef tableRef = new TableRef(null, nonTxnLogicalTable, clientTimeStamp, false); @@ -1861,7 +1866,7 @@ public class MetaDataClient { long clientTimeStamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn; boolean multiTenant = false; boolean storeNulls = false; - boolean transactional = (parent!= null) ? parent.isTransactional() : false; + TransactionFactory.Provider transactionProvider = (parent!= null) ? parent.getTransactionProvider() : null; Integer saltBucketNum = null; String defaultFamilyName = null; boolean isImmutableRows = false; @@ -1877,7 +1882,7 @@ public class MetaDataClient { QualifierEncodingScheme encodingScheme = NON_ENCODED_QUALIFIERS; ImmutableStorageScheme immutableStorageScheme = ONE_CELL_PER_COLUMN; if (parent != null && tableType == PTableType.INDEX) { - timestamp = TransactionUtil.getTableTimestamp(connection, transactional); + timestamp = TransactionUtil.getTableTimestamp(connection, transactionProvider != null, transactionProvider); storeNulls = parent.getStoreNulls(); isImmutableRows = parent.isImmutableRows(); isAppendOnlySchema = parent.isAppendOnlySchema(); @@ -2018,31 +2023,45 @@ public class MetaDataClient { storeNulls = storeNullsProp; } Boolean transactionalProp = (Boolean) TableProperty.TRANSACTIONAL.getValue(tableProps); - if (transactionalProp != null && parent != null) { + TransactionFactory.Provider transactionProviderProp = (TransactionFactory.Provider) TableProperty.TRANSACTION_PROVIDER.getValue(tableProps); + if ((transactionalProp != null || transactionProviderProp != null) && parent != null) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.ONLY_TABLE_MAY_BE_DECLARED_TRANSACTIONAL) .setSchemaName(schemaName).setTableName(tableName) .build().buildException(); } if (parent == null) { - if (transactionalProp == null) { + boolean transactional; + if (transactionProviderProp != null) { + transactional = true; + } else if (transactionalProp == null) { transactional = connection.getQueryServices().getProps().getBoolean( QueryServices.DEFAULT_TABLE_ISTRANSACTIONAL_ATTRIB, QueryServicesOptions.DEFAULT_TABLE_ISTRANSACTIONAL); } else { transactional = transactionalProp; } + if (transactional) { + if (transactionProviderProp == null) { + transactionProvider = (TransactionFactory.Provider)TableProperty.TRANSACTION_PROVIDER.getValue( + connection.getQueryServices().getProps().get( + QueryServices.DEFAULT_TRANSACTION_PROVIDER_ATTRIB, + QueryServicesOptions.DEFAULT_TRANSACTION_PROVIDER)); + } else { + transactionProvider = transactionProviderProp; + } + } } boolean transactionsEnabled = connection.getQueryServices().getProps().getBoolean( QueryServices.TRANSACTIONS_ENABLED, QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED); // can't create a transactional table if transactions are not enabled - if (!transactionsEnabled && transactional) { + if (!transactionsEnabled && transactionProvider != null) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CREATE_TXN_TABLE_IF_TXNS_DISABLED) .setSchemaName(schemaName).setTableName(tableName) .build().buildException(); } // can't create a transactional table if it has a row timestamp column - if (pkConstraint.getNumColumnsWithRowTimestamp() > 0 && transactional) { + if (pkConstraint.getNumColumnsWithRowTimestamp() > 0 && transactionProvider != null) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CREATE_TXN_TABLE_WITH_ROW_TIMESTAMP) .setSchemaName(schemaName).setTableName(tableName) .build().buildException(); @@ -2050,8 +2069,9 @@ public class MetaDataClient { // Put potentially inferred value into tableProps as it's used by the createTable call below // to determine which coprocessors to install on the new table. - tableProps.put(PhoenixDatabaseMetaData.TRANSACTIONAL, transactional); - if (transactional) { + tableProps.put(PhoenixDatabaseMetaData.TRANSACTIONAL, transactionProvider != null); + if (transactionProvider != null) { + // TODO: for Omid // If TTL set, use Tephra TTL property name instead Object ttl = commonFamilyProps.remove(HColumnDescriptor.TTL); if (ttl != null) { @@ -2063,7 +2083,7 @@ public class MetaDataClient { (Boolean) TableProperty.USE_STATS_FOR_PARALLELIZATION.getValue(tableProps); boolean sharedTable = statement.getTableType() == PTableType.VIEW || allocateIndexId; - if (transactional) { + if (transactionProvider != null) { // Tephra uses an empty value cell as its delete marker, so we need to turn on // storeNulls for transactional tables. // If we use regular column delete markers (which is what non transactional tables @@ -2098,7 +2118,7 @@ public class MetaDataClient { } } } - timestamp = timestamp==null ? TransactionUtil.getTableTimestamp(connection, transactional) : timestamp; + timestamp = timestamp==null ? TransactionUtil.getTableTimestamp(connection, transactionProvider != null, transactionProvider) : timestamp; // Delay this check as it is supported to have IMMUTABLE_ROWS and SALT_BUCKETS defined on views if (sharedTable) { @@ -2481,7 +2501,7 @@ public class MetaDataClient { Collections.<PTable>emptyList(), isImmutableRows, Collections.<PName>emptyList(), defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), null, - Boolean.TRUE.equals(disableWAL), false, false, null, null, indexType, true, false, 0, 0L, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, PTable.EncodedCQCounter.NULL_COUNTER, true); + Boolean.TRUE.equals(disableWAL), false, false, null, null, indexType, true, null, 0, 0L, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, PTable.EncodedCQCounter.NULL_COUNTER, true); connection.addTable(table, MetaDataProtocol.MIN_TABLE_TIMESTAMP); } @@ -2621,7 +2641,11 @@ public class MetaDataClient { } else { tableUpsert.setInt(19, BASE_TABLE_BASE_COLUMN_COUNT); } - tableUpsert.setBoolean(20, transactional); + if (transactionProvider == null) { + tableUpsert.setNull(20, Types.TINYINT); + } else { + tableUpsert.setByte(20, transactionProvider.getCode()); + } tableUpsert.setLong(21, updateCacheFrequency); tableUpsert.setBoolean(22, isNamespaceMapped); if (autoPartitionSeq == null) { @@ -2746,7 +2770,7 @@ public class MetaDataClient { PTable.INITIAL_SEQ_NUM, pkName == null ? null : PNameFactory.newName(pkName), saltBucketNum, columns.values(), parent == null ? null : parent.getSchemaName(), parent == null ? null : parent.getTableName(), Collections.<PTable>emptyList(), isImmutableRows, physicalNames, defaultFamilyName == null ? null : PNameFactory.newName(defaultFamilyName), viewStatement, Boolean.TRUE.equals(disableWAL), multiTenant, storeNulls, viewType, - result.getViewIndexId(), indexType, rowKeyOrderOptimizable, transactional, updateCacheFrequency, 0L, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema, immutableStorageScheme, encodingScheme, cqCounterToBe, useStatsForParallelizationProp); + result.getViewIndexId(), indexType, rowKeyOrderOptimizable, transactionProvider, updateCacheFrequency, 0L, isNamespaceMapped, autoPartitionSeq, isAppendOnlySchema, immutableStorageScheme, encodingScheme, cqCounterToBe, useStatsForParallelizationProp); result = new MetaDataMutationResult(code, result.getMutationTime(), table, true); addTableToCache(result); return table; @@ -3251,8 +3275,8 @@ public class MetaDataClient { changingPhoenixTableProperty = evaluateStmtProperties(metaProperties,metaPropertiesEvaluated,table,schemaName,tableName); // If changing isImmutableRows to true or it's not being changed and is already true boolean willBeImmutableRows = Boolean.TRUE.equals(metaPropertiesEvaluated.getIsImmutableRows()) || (metaPropertiesEvaluated.getIsImmutableRows() == null && table.isImmutableRows()); - - Long timeStamp = TransactionUtil.getTableTimestamp(connection, table.isTransactional() || metaProperties.getNonTxToTx()); + boolean willBeTxnl = metaProperties.getNonTxToTx(); + Long timeStamp = TransactionUtil.getTableTimestamp(connection, table.isTransactional() || willBeTxnl, table.isTransactional() ? table.getTransactionProvider() : metaPropertiesEvaluated.getTransactionProvider()); int numPkColumnsAdded = 0; List<PColumn> columns = Lists.newArrayListWithExpectedSize(numCols); Set<String> colFamiliesForPColumnsToBeAdded = new LinkedHashSet<>(); @@ -4247,6 +4271,8 @@ public class MetaDataClient { metaProperties.setStoreNullsProp((Boolean)value); } else if (propName.equals(TRANSACTIONAL)) { metaProperties.setIsTransactionalProp((Boolean)value); + } else if (propName.equals(TRANSACTION_PROVIDER)) { + metaProperties.setTransactionProviderProp((TransactionFactory.Provider) value); } else if (propName.equals(UPDATE_CACHE_FREQUENCY)) { metaProperties.setUpdateCacheFrequencyProp((Long)value); } else if (propName.equals(GUIDE_POSTS_WIDTH)) { @@ -4369,6 +4395,22 @@ public class MetaDataClient { .setSchemaName(schemaName).setTableName(tableName) .build().buildException(); } + TransactionFactory.Provider provider = metaProperties.getTransactionProviderProp(); + if (provider == null) { + provider = (Provider) + TableProperty.TRANSACTION_PROVIDER.getValue( + connection.getQueryServices().getProps().get( + QueryServices.DEFAULT_TRANSACTION_PROVIDER_ATTRIB, + QueryServicesOptions.DEFAULT_TRANSACTION_PROVIDER)); + metaPropertiesEvaluated.setTransactionProvider(provider); + } + if (provider.getTransactionProvider().isUnsupported(PhoenixTransactionProvider.Feature.ALTER_NONTX_TO_TX)) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ALTER_TABLE_FROM_NON_TXN_TO_TXNL) + .setMessage(provider.name() + ". ") + .setSchemaName(schemaName) + .setTableName(tableName) + .build().buildException(); + } changingPhoenixTableProperty = true; metaProperties.setNonTxToTx(true); } @@ -4381,6 +4423,7 @@ public class MetaDataClient { private Boolean multiTenantProp = null; private Boolean disableWALProp = null; private Boolean storeNullsProp = null; + private TransactionFactory.Provider transactionProviderProp = null; private Boolean isTransactionalProp = null; private Long updateCacheFrequencyProp = null; private Boolean appendOnlySchemaProp = null; @@ -4421,6 +4464,14 @@ public class MetaDataClient { this.storeNullsProp = storeNullsProp; } + public TransactionFactory.Provider getTransactionProviderProp() { + return transactionProviderProp; + } + + public void setTransactionProviderProp(TransactionFactory.Provider transactionProviderProp) { + this.transactionProviderProp = transactionProviderProp; + } + public Boolean getIsTransactionalProp() { return isTransactionalProp; } @@ -4490,6 +4541,7 @@ public class MetaDataClient { private Boolean storeNulls = null; private Boolean useStatsForParallelization = null; private Boolean isTransactional = null; + private TransactionFactory.Provider transactionProvider = null; public Boolean getIsImmutableRows() { return isImmutableRows; @@ -4570,5 +4622,14 @@ public class MetaDataClient { public void setIsTransactional(Boolean isTransactional) { this.isTransactional = isTransactional; } + + public TransactionFactory.Provider getTransactionProvider() { + return transactionProvider; + } + + public void setTransactionProvider(TransactionFactory.Provider transactionProvider) { + this.transactionProvider = transactionProvider; + } + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/8eaca121/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java index 7e186ad..af78612 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java @@ -40,6 +40,7 @@ import org.apache.phoenix.schema.types.PArrayDataTypeDecoder; import org.apache.phoenix.schema.types.PArrayDataTypeEncoder; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PVarbinary; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.TrustedByteArrayOutputStream; import com.google.common.annotations.VisibleForTesting; @@ -680,6 +681,7 @@ public interface PTable extends PMetaDataEntity { boolean isMultiTenant(); boolean getStoreNulls(); boolean isTransactional(); + TransactionFactory.Provider getTransactionProvider(); ViewType getViewType(); String getViewStatement();