http://git-wip-us.apache.org/repos/asf/phoenix/blob/02715adc/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java index a7b31e8..1a11427 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java @@ -69,6 +69,7 @@ import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PDouble; import org.apache.phoenix.schema.types.PFloat; import org.apache.phoenix.schema.types.PVarchar; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.PhoenixRuntime; @@ -134,7 +135,7 @@ public class PTableImpl implements PTable { private boolean disableWAL; private boolean multiTenant; private boolean storeNulls; - private boolean isTransactional; + private TransactionFactory.Provider transactionProvider; private ViewType viewType; private Short viewIndexId; private int estimatedSize; @@ -227,7 +228,7 @@ public class PTableImpl implements PTable { init(tenantId, this.schemaName, this.tableName, PTableType.INDEX, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, this.schemaName, parentTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName, null, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable, - isTransactional, updateCacheFrequency, indexDisableTimestamp, isNamespaceMpped, null, false, storageScheme, qualifierEncodingScheme, encodedCQCounter, useStatsForParallelization); + transactionProvider, updateCacheFrequency, indexDisableTimestamp, isNamespaceMpped, null, false, storageScheme, qualifierEncodingScheme, encodedCQCounter, useStatsForParallelization); } public PTableImpl(long timeStamp) { // For delete marker @@ -270,7 +271,7 @@ public class PTableImpl implements PTable { table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), getColumnsToClone(table), table.getParentSchemaName(), table.getParentTableName(), indexes, table.isImmutableRows(), physicalNames, table.getDefaultFamilyName(), viewStatement, table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), - table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), updateCacheFrequency, + table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.getTransactionProvider(), updateCacheFrequency, table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization()); } @@ -280,7 +281,7 @@ public class PTableImpl implements PTable { table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), getColumnsToClone(table), parentSchemaName, table.getParentTableName(), indexes, table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), viewStatement, table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), - table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), + table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.getTransactionProvider(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization()); } @@ -290,7 +291,7 @@ public class PTableImpl implements PTable { table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), - table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), + table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.getTransactionProvider(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization()); } @@ -300,7 +301,7 @@ public class PTableImpl implements PTable { table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), - table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), + table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.getTransactionProvider(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization()); } @@ -310,7 +311,7 @@ public class PTableImpl implements PTable { table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), defaultFamily, table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), - table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), + table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.getTransactionProvider(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization()); } @@ -320,7 +321,7 @@ public class PTableImpl implements PTable { sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), - table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), + table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.getTransactionProvider(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization()); } @@ -330,18 +331,18 @@ public class PTableImpl implements PTable { sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), - table.getIndexType(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), + table.getIndexType(), table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.getTransactionProvider(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization()); } public static PTableImpl makePTable(PTable table, long timeStamp, long sequenceNumber, Collection<PColumn> columns, boolean isImmutableRows, boolean isWalDisabled, - boolean isMultitenant, boolean storeNulls, boolean isTransactional, long updateCacheFrequency, boolean isNamespaceMapped) throws SQLException { + boolean isMultitenant, boolean storeNulls, TransactionFactory.Provider transactionProvider, long updateCacheFrequency, boolean isNamespaceMapped) throws SQLException { return new PTableImpl( table.getTenantId(), table.getSchemaName(), table.getTableName(), table.getType(), table.getIndexState(), timeStamp, sequenceNumber, table.getPKName(), table.getBucketNum(), columns, table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(), isImmutableRows, table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), isWalDisabled, isMultitenant, storeNulls, table.getViewType(), table.getViewIndexId(), table.getIndexType(), - table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), isTransactional, updateCacheFrequency, table.getIndexDisableTimestamp(), + table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), transactionProvider, updateCacheFrequency, table.getIndexDisableTimestamp(), isNamespaceMapped, table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization()); } @@ -352,7 +353,7 @@ public class PTableImpl implements PTable { table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), - table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), + table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.getTransactionProvider(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization()); } @@ -363,7 +364,7 @@ public class PTableImpl implements PTable { table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), - table.getBaseColumnCount(), rowKeyOrderOptimizable, table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), + table.getBaseColumnCount(), rowKeyOrderOptimizable, table.getTransactionProvider(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization()); } @@ -374,7 +375,7 @@ public class PTableImpl implements PTable { table.getParentSchemaName(), table.getParentTableName(), table.getIndexes(), table.isImmutableRows(), table.getPhysicalNames(), table.getDefaultFamilyName(), table.getViewStatement(), table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), table.getIndexType(), - table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), + table.getBaseColumnCount(), table.rowKeyOrderOptimizable(), table.getTransactionProvider(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getAutoPartitionSeqName(), table.isAppendOnlySchema(), table.getImmutableStorageScheme(), table.getEncodingScheme(), table.getEncodedCQCounter(), table.useStatsForParallelization()); } @@ -383,12 +384,12 @@ public class PTableImpl implements PTable { Collection<PColumn> columns, PName dataSchemaName, PName dataTableName, List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId, - IndexType indexType, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, + IndexType indexType, boolean rowKeyOrderOptimizable, TransactionFactory.Provider transactionProvider, long updateCacheFrequency, long indexDisableTimestamp, boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema, ImmutableStorageScheme storageScheme, QualifierEncodingScheme qualifierEncodingScheme, EncodedCQCounter encodedCQCounter, Boolean useStatsForParallelization) throws SQLException { return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, dataSchemaName, dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName, viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, - indexType, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, rowKeyOrderOptimizable, isTransactional, + indexType, QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT, rowKeyOrderOptimizable, transactionProvider, updateCacheFrequency,indexDisableTimestamp, isNamespaceMapped, autoPartitionSeqName, isAppendOnlySchema, storageScheme, qualifierEncodingScheme, encodedCQCounter, useStatsForParallelization); } @@ -397,7 +398,7 @@ public class PTableImpl implements PTable { Collection<PColumn> columns, PName dataSchemaName, PName dataTableName, List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId, - IndexType indexType, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, + IndexType indexType, boolean rowKeyOrderOptimizable, TransactionFactory.Provider transactionProvider, long updateCacheFrequency, int baseColumnCount, long indexDisableTimestamp, boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema, ImmutableStorageScheme storageScheme, QualifierEncodingScheme qualifierEncodingScheme, EncodedCQCounter encodedCQCounter, Boolean useStatsForParallelization) @@ -405,7 +406,7 @@ public class PTableImpl implements PTable { return new PTableImpl(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, dataSchemaName, dataTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName, viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, - indexType, baseColumnCount, rowKeyOrderOptimizable, isTransactional, updateCacheFrequency, + indexType, baseColumnCount, rowKeyOrderOptimizable, transactionProvider, updateCacheFrequency, indexDisableTimestamp, isNamespaceMapped, autoPartitionSeqName, isAppendOnlySchema, storageScheme, qualifierEncodingScheme, encodedCQCounter, useStatsForParallelization); } @@ -414,13 +415,13 @@ public class PTableImpl implements PTable { PName parentSchemaName, PName parentTableName, List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId, IndexType indexType, - int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, + int baseColumnCount, boolean rowKeyOrderOptimizable, TransactionFactory.Provider transactionProvider, long updateCacheFrequency, long indexDisableTimestamp, boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema, ImmutableStorageScheme storageScheme, QualifierEncodingScheme qualifierEncodingScheme, EncodedCQCounter encodedCQCounter, Boolean useStatsForParallelization) throws SQLException { init(tenantId, schemaName, tableName, type, state, timeStamp, sequenceNumber, pkName, bucketNum, columns, parentSchemaName, parentTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName, viewExpression, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable, - isTransactional, updateCacheFrequency, indexDisableTimestamp, isNamespaceMapped, autoPartitionSeqName, isAppendOnlySchema, storageScheme, + transactionProvider, updateCacheFrequency, indexDisableTimestamp, isNamespaceMapped, autoPartitionSeqName, isAppendOnlySchema, storageScheme, qualifierEncodingScheme, encodedCQCounter, useStatsForParallelization); } @@ -454,7 +455,7 @@ public class PTableImpl implements PTable { PName pkName, Integer bucketNum, Collection<PColumn> columns, PName parentSchemaName, PName parentTableName, List<PTable> indexes, boolean isImmutableRows, List<PName> physicalNames, PName defaultFamilyName, String viewExpression, boolean disableWAL, boolean multiTenant, boolean storeNulls, ViewType viewType, Short viewIndexId, - IndexType indexType , int baseColumnCount, boolean rowKeyOrderOptimizable, boolean isTransactional, long updateCacheFrequency, long indexDisableTimestamp, + IndexType indexType , int baseColumnCount, boolean rowKeyOrderOptimizable, TransactionFactory.Provider transactionProvider, long updateCacheFrequency, long indexDisableTimestamp, boolean isNamespaceMapped, String autoPartitionSeqName, boolean isAppendOnlySchema, ImmutableStorageScheme storageScheme, QualifierEncodingScheme qualifierEncodingScheme, EncodedCQCounter encodedCQCounter, Boolean useStatsForParallelization) throws SQLException { Preconditions.checkNotNull(schemaName); @@ -486,7 +487,7 @@ public class PTableImpl implements PTable { this.viewType = viewType; this.viewIndexId = viewIndexId; this.indexType = indexType; - this.isTransactional = isTransactional; + this.transactionProvider = transactionProvider; this.rowKeyOrderOptimizable = rowKeyOrderOptimizable; this.updateCacheFrequency = updateCacheFrequency; this.isNamespaceMapped = isNamespaceMapped; @@ -1279,7 +1280,13 @@ public class PTableImpl implements PTable { boolean disableWAL = table.getDisableWAL(); boolean multiTenant = table.getMultiTenant(); boolean storeNulls = table.getStoreNulls(); - boolean isTransactional = table.getTransactional(); + TransactionFactory.Provider transactionProvider = null; + if (table.hasTransactionProvider()) { + transactionProvider = TransactionFactory.Provider.fromCode(table.getTransactionProvider()); + } else if (table.hasTransactional()) { + // For backward compatibility prior to transactionProvider field + transactionProvider = TransactionFactory.Provider.TEPHRA; + } ViewType viewType = null; String viewStatement = null; List<PName> physicalNames = Collections.emptyList(); @@ -1352,7 +1359,7 @@ public class PTableImpl implements PTable { (bucketNum == NO_SALTING) ? null : bucketNum, columns, parentSchemaName, parentTableName, indexes, isImmutableRows, physicalNames, defaultFamilyName, viewStatement, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, baseColumnCount, rowKeyOrderOptimizable, - isTransactional, updateCacheFrequency, indexDisableTimestamp, isNamespaceMapped, autoParititonSeqName, + transactionProvider, updateCacheFrequency, indexDisableTimestamp, isNamespaceMapped, autoParititonSeqName, isAppendOnlySchema, storageScheme, qualifierEncodingScheme, encodedColumnQualifierCounter, useStatsForParallelization); return result; } catch (SQLException e) { @@ -1418,7 +1425,9 @@ public class PTableImpl implements PTable { builder.setDisableWAL(table.isWALDisabled()); builder.setMultiTenant(table.isMultiTenant()); builder.setStoreNulls(table.getStoreNulls()); - builder.setTransactional(table.isTransactional()); + if (table.getTransactionProvider() != null) { + builder.setTransactionProvider(table.getTransactionProvider().getCode()); + } if(table.getType() == PTableType.VIEW){ builder.setViewType(ByteStringer.wrap(new byte[]{table.getViewType().getSerializedValue()})); } @@ -1473,8 +1482,13 @@ public class PTableImpl implements PTable { } @Override - public boolean isTransactional() { - return isTransactional; + public TransactionFactory.Provider getTransactionProvider() { + return transactionProvider; + } + + @Override + public final boolean isTransactional() { + return transactionProvider != null; } @Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/02715adc/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java index c500b2e..78b9beb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java @@ -33,6 +33,7 @@ import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.SchemaUtil; public enum TableProperty { @@ -94,6 +95,23 @@ public enum TableProperty { } }, + TRANSACTION_PROVIDER(PhoenixDatabaseMetaData.TRANSACTION_PROVIDER, COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY, true, false, false) { + @Override + public Object getPTableValue(PTable table) { + return table.getTransactionProvider(); + } + @Override + public Object getValue(Object value) { + try { + return value == null ? null : TransactionFactory.Provider.valueOf(value.toString()); + } catch (IllegalArgumentException e) { + throw new RuntimeException(new SQLExceptionInfo.Builder(SQLExceptionCode.UNKNOWN_TRANSACTION_PROVIDER) + .setMessage(value.toString()) + .build().buildException()); + } + } + }, + UPDATE_CACHE_FREQUENCY(PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY, true, true, true) { @Override public Object getValue(Object value) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/02715adc/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java index 110868e..543eda1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java @@ -17,17 +17,11 @@ */ package org.apache.phoenix.transaction; -import java.io.IOException; import java.sql.SQLException; -import java.util.concurrent.TimeoutException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; -import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; +import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.util.ReadOnlyProps; -import org.apache.twill.zookeeper.ZKClientService; -import org.slf4j.Logger; +import org.apache.phoenix.transaction.TransactionFactory.Provider; public class OmidTransactionContext implements PhoenixTransactionContext { @@ -56,7 +50,7 @@ public class OmidTransactionContext implements PhoenixTransactionContext { } @Override - public void commitDDLFence(PTable dataTable, Logger logger) throws SQLException { + public void commitDDLFence(PTable dataTable) throws SQLException { // TODO Auto-generated method stub } @@ -116,59 +110,24 @@ public class OmidTransactionContext implements PhoenixTransactionContext { } @Override - public long getMaxTransactionsPerSecond() { - // TODO Auto-generated method stub - return 0; - } - - @Override - public boolean isPreExistingVersion(long version) { - // TODO Auto-generated method stub - return false; + public Provider getProvider() { + return Provider.OMID; } @Override - public BaseRegionObserver getCoprocessor() { - // TODO Auto-generated method stub + public PhoenixTransactionContext newTransactionContext(PhoenixTransactionContext contex, boolean subTask) { return null; } @Override - public void setInMemoryTransactionClient(Configuration config) { + public void markDMLFence(PTable dataTable) { // TODO Auto-generated method stub } @Override - public ZKClientService setTransactionClient(Configuration config, ReadOnlyProps props, - ConnectionInfo connectionInfo) { - // TODO Auto-generated method stub - - return null; - - } - - @Override - public byte[] getFamilyDeleteMarker() { + public HTableInterface getTransactionalTable(HTableInterface htable, boolean isImmutable) { // TODO Auto-generated method stub return null; } - - @Override - public void setTxnConfigs(Configuration config, String tmpFolder, int defaultTxnTimeoutSeconds) throws IOException { - // TODO Auto-generated method stub - - } - - @Override - public void setupTxManager(Configuration config, String url) throws SQLException { - // TODO Auto-generated method stub - - } - - @Override - public void tearDownTxManager() { - // TODO Auto-generated method stub - - } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/02715adc/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java index b0c1bfe..c211661 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java @@ -19,34 +19,28 @@ package org.apache.phoenix.transaction; import java.io.IOException; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; +import org.apache.phoenix.transaction.TransactionFactory.Provider; -public class OmidTransactionProvider implements TransactionProvider { +public class OmidTransactionProvider implements PhoenixTransactionProvider { private static final OmidTransactionProvider INSTANCE = new OmidTransactionProvider(); - + public static final OmidTransactionProvider getInstance() { return INSTANCE; } - + private OmidTransactionProvider() { } - - @Override - public PhoenixTransactionContext getTransactionContext() { - return new OmidTransactionContext(); - } @Override public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException { //return new OmidTransactionContext(txnBytes); return null; } - + @Override public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection) { //return new OmidTransactionContext(connection); @@ -54,25 +48,37 @@ public class OmidTransactionProvider implements TransactionProvider { } @Override - public PhoenixTransactionContext getTransactionContext(PhoenixTransactionContext contex, PhoenixConnection connection, boolean subTask) { - //return new OmidTransactionContext(contex, connection, subTask); + public PhoenixTransactionClient getTransactionClient(Configuration config, ConnectionInfo connectionInfo) { + // TODO Auto-generated method stub return null; } @Override - public PhoenixTransactionalTable getTransactionalTable(PhoenixTransactionContext ctx, HTableInterface htable) { - //return new OmidTransactionTable(ctx, htable); + public PhoenixTransactionService getTransactionService(Configuration config, ConnectionInfo connectionInfo) { + // TODO Auto-generated method stub return null; } - + @Override - public Cell newDeleteFamilyMarker(byte[] row, byte[] family, long timestamp) { - return CellUtil.createCell(row, family, HConstants.EMPTY_BYTE_ARRAY, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY); + public Class<? extends RegionObserver> getCoprocessor() { + // TODO Auto-generated method stub + return null; } - + @Override - public Cell newDeleteColumnMarker(byte[] row, byte[] family, byte[] qualifier, long timestamp) { - return CellUtil.createCell(row, family, qualifier, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY); + public Provider getProvider() { + return TransactionFactory.Provider.OMID; } + @Override + public boolean isUnsupported(Feature feature) { + // FIXME: if we initialize a Set with the unsupported features + // and check for containment, we run into a test failure + // in SetPropertyOnEncodedTableIT.testSpecifyingColumnFamilyForTTLFails() + // due to TableProperty.colFamSpecifiedException being null + // (though it's set in the constructor). I suspect some + // mysterious class loader issue. The below works fine + // as a workaround. + return (feature == Feature.ALTER_NONTX_TO_TX); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/02715adc/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java deleted file mode 100644 index 0662555..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java +++ /dev/null @@ -1,363 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.transaction; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Append; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Increment; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Row; -import org.apache.hadoop.hbase.client.RowMutations; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; -import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; - -import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.Message; -import com.google.protobuf.Service; -import com.google.protobuf.ServiceException; - -public class OmidTransactionTable implements PhoenixTransactionalTable { - - public OmidTransactionTable(PhoenixTransactionContext ctx, HTableInterface hTable) { - // TODO Auto-generated constructor stub - } - - @Override - public Result get(Get get) throws IOException { - // TODO Auto-generated method stub - return null; - } - - @Override - public void put(Put put) throws IOException { - // TODO Auto-generated method stub - - } - - @Override - public void delete(Delete delete) throws IOException { - // TODO Auto-generated method stub - - } - - @Override - public ResultScanner getScanner(Scan scan) throws IOException { - // TODO Auto-generated method stub - return null; - } - - @Override - public byte[] getTableName() { - // TODO Auto-generated method stub - return null; - } - - @Override - public Configuration getConfiguration() { - // TODO Auto-generated method stub - return null; - } - - @Override - public HTableDescriptor getTableDescriptor() throws IOException { - // TODO Auto-generated method stub - return null; - } - - @Override - public boolean exists(Get get) throws IOException { - // TODO Auto-generated method stub - return false; - } - - @Override - public Result[] get(List<Get> gets) throws IOException { - // TODO Auto-generated method stub - return null; - } - - @Override - public ResultScanner getScanner(byte[] family) throws IOException { - // TODO Auto-generated method stub - return null; - } - - @Override - public ResultScanner getScanner(byte[] family, byte[] qualifier) - throws IOException { - // TODO Auto-generated method stub - return null; - } - - @Override - public void put(List<Put> puts) throws IOException { - // TODO Auto-generated method stub - } - - @Override - public void delete(List<Delete> deletes) throws IOException { - // TODO Auto-generated method stub - } - - @Override - public void setAutoFlush(boolean autoFlush) { - // TODO Auto-generated method stub - } - - @Override - public boolean isAutoFlush() { - // TODO Auto-generated method stub - return false; - } - - @Override - public long getWriteBufferSize() { - // TODO Auto-generated method stub - return 0; - } - - @Override - public void setWriteBufferSize(long writeBufferSize) throws IOException { - // TODO Auto-generated method stub - } - - @Override - public void flushCommits() throws IOException { - // TODO Auto-generated method stub - } - - @Override - public void close() throws IOException { - // TODO Auto-generated method stub - } - - @Override - public long incrementColumnValue(byte[] row, byte[] family, - byte[] qualifier, long amount, boolean writeToWAL) - throws IOException { - // TODO Auto-generated method stub - return 0; - } - - @Override - public Boolean[] exists(List<Get> gets) throws IOException { - // TODO Auto-generated method stub - return null; - } - - @Override - public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) { - // TODO Auto-generated method stub - } - - @Override - public void setAutoFlushTo(boolean autoFlush) { - // TODO Auto-generated method stub - } - - @Override - public Result getRowOrBefore(byte[] row, byte[] family) throws IOException { - // TODO Auto-generated method stub - return null; - } - - @Override - public TableName getName() { - // TODO Auto-generated method stub - return null; - } - - @Override - public boolean[] existsAll(List<Get> gets) throws IOException { - // TODO Auto-generated method stub - return null; - } - - @Override - public void batch(List<? extends Row> actions, Object[] results) - throws IOException, InterruptedException { - // TODO Auto-generated method stub - } - - @Override - public Object[] batch(List<? extends Row> actions) throws IOException, - InterruptedException { - // TODO Auto-generated method stub - return null; - } - - @Override - public <R> void batchCallback(List<? extends Row> actions, - Object[] results, Callback<R> callback) throws IOException, - InterruptedException { - // TODO Auto-generated method stub - } - - @Override - public <R> Object[] batchCallback(List<? extends Row> actions, - Callback<R> callback) throws IOException, InterruptedException { - // TODO Auto-generated method stub - return null; - } - - @Override - public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, - byte[] value, Put put) throws IOException { - // TODO Auto-generated method stub - return false; - } - - @Override - public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, - CompareOp compareOp, byte[] value, Put put) throws IOException { - // TODO Auto-generated method stub - return false; - } - - @Override - public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, - byte[] value, Delete delete) throws IOException { - // TODO Auto-generated method stub - return false; - } - - @Override - public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, - CompareOp compareOp, byte[] value, Delete delete) - throws IOException { - // TODO Auto-generated method stub - return false; - } - - @Override - public void mutateRow(RowMutations rm) throws IOException { - // TODO Auto-generated method stub - } - - @Override - public Result append(Append append) throws IOException { - // TODO Auto-generated method stub - return null; - } - - @Override - public Result increment(Increment increment) throws IOException { - // TODO Auto-generated method stub - return null; - } - - @Override - public long incrementColumnValue(byte[] row, byte[] family, - byte[] qualifier, long amount) throws IOException { - // TODO Auto-generated method stub - return 0; - } - - @Override - public long incrementColumnValue(byte[] row, byte[] family, - byte[] qualifier, long amount, Durability durability) - throws IOException { - // TODO Auto-generated method stub - return 0; - } - - @Override - public CoprocessorRpcChannel coprocessorService(byte[] row) { - // TODO Auto-generated method stub - return null; - } - - @Override - public <T extends Service, R> Map<byte[], R> coprocessorService( - Class<T> service, byte[] startKey, byte[] endKey, - Call<T, R> callable) throws ServiceException, Throwable { - // TODO Auto-generated method stub - return null; - } - - @Override - public <T extends Service, R> void coprocessorService(Class<T> service, - byte[] startKey, byte[] endKey, Call<T, R> callable, - Callback<R> callback) throws ServiceException, Throwable { - // TODO Auto-generated method stub - } - - @Override - public <R extends Message> Map<byte[], R> batchCoprocessorService( - MethodDescriptor methodDescriptor, Message request, - byte[] startKey, byte[] endKey, R responsePrototype) - throws ServiceException, Throwable { - // TODO Auto-generated method stub - return null; - } - - @Override - public <R extends Message> void batchCoprocessorService( - MethodDescriptor methodDescriptor, Message request, - byte[] startKey, byte[] endKey, R responsePrototype, - Callback<R> callback) throws ServiceException, Throwable { - // TODO Auto-generated method stub - } - - @Override - public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, - CompareOp compareOp, byte[] value, RowMutations mutation) - throws IOException { - // TODO Auto-generated method stub - return false; - } - - @Override - public int getOperationTimeout() { - // TODO Auto-generated method stub - return 0; - } - - @Override - public int getRpcTimeout() { - // TODO Auto-generated method stub - return 0; - } - - @Override - public void setOperationTimeout(int arg0) { - // TODO Auto-generated method stub - - } - - @Override - public void setRpcTimeout(int arg0) { - // TODO Auto-generated method stub - - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/02715adc/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionClient.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionClient.java new file mode 100644 index 0000000..f12f818 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionClient.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.transaction; + +import java.io.Closeable; + +public interface PhoenixTransactionClient extends Closeable { +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/02715adc/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java index 52ff2f9..f3ad42f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java @@ -17,20 +17,98 @@ */ package org.apache.phoenix.transaction; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; -import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; -import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.util.ReadOnlyProps; -import org.apache.twill.zookeeper.ZKClientService; -import org.slf4j.Logger; - -import java.io.IOException; import java.sql.SQLException; import java.util.concurrent.TimeoutException; -public interface PhoenixTransactionContext { +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.transaction.TransactionFactory.Provider; +public interface PhoenixTransactionContext { + public static PhoenixTransactionContext NULL_CONTEXT = new PhoenixTransactionContext() { + + @Override + public void begin() throws SQLException { + } + + @Override + public void commit() throws SQLException { + } + + @Override + public void abort() throws SQLException { + } + + @Override + public void checkpoint(boolean hasUncommittedData) throws SQLException { + } + + @Override + public void commitDDLFence(PTable dataTable) throws SQLException { + } + + @Override + public void join(PhoenixTransactionContext ctx) { + } + + @Override + public boolean isTransactionRunning() { + return false; + } + + @Override + public void reset() { + } + + @Override + public long getTransactionId() { + return 0; + } + + @Override + public long getReadPointer() { + return 0; + } + + @Override + public long getWritePointer() { + return 0; + } + + @Override + public void setVisibilityLevel(PhoenixVisibilityLevel visibilityLevel) { + } + + @Override + public PhoenixVisibilityLevel getVisibilityLevel() { + return null; + } + + @Override + public byte[] encodeTransaction() throws SQLException { + return null; + } + + @Override + public Provider getProvider() { + return null; + } + + @Override + public PhoenixTransactionContext newTransactionContext(PhoenixTransactionContext contex, boolean subTask) { + return NULL_CONTEXT; + } + + @Override + public void markDMLFence(PTable dataTable) { + + } + + @Override + public HTableInterface getTransactionalTable(HTableInterface htable, boolean isImmutable) { + return null; + } + }; /** * * Visibility levels needed for checkpointing and @@ -49,22 +127,6 @@ public interface PhoenixTransactionContext { public static final String READ_NON_TX_DATA = "data.tx.read.pre.existing"; /** - * Set the in memory client connection to the transaction manager (for testing purpose) - * - * @param config - */ - public void setInMemoryTransactionClient(Configuration config); - - /** - * Set the client connection to the transaction manager - * - * @param config - * @param props - * @param connectionInfo - */ - public ZKClientService setTransactionClient(Configuration config, ReadOnlyProps props, ConnectionInfo connectionInfo); - - /** * Starts a transaction * * @throws SQLException @@ -84,7 +146,7 @@ public interface PhoenixTransactionContext { * @throws SQLException */ public void abort() throws SQLException; - + /** * Create a checkpoint in a transaction as defined in [TEPHRA-96] * @throws SQLException @@ -100,9 +162,17 @@ public interface PhoenixTransactionContext { * @throws InterruptedException * @throws TimeoutException */ - public void commitDDLFence(PTable dataTable, Logger logger) + public void commitDDLFence(PTable dataTable) throws SQLException; + + /** + * Mark the start of DML go ensure that updates to indexed rows are not + * missed. + * @param dataTable the table on which DML command is working + */ + public void markDMLFence(PTable dataTable); + /** * Augment the current context with ctx modified keys * @@ -121,7 +191,8 @@ public interface PhoenixTransactionContext { public void reset(); /** - * Returns transaction unique identifier + * Returns transaction unique identifier which is also + * assumed to be the earliest write pointer. */ public long getTransactionId(); @@ -150,42 +221,8 @@ public interface PhoenixTransactionContext { */ public byte[] encodeTransaction() throws SQLException; - /** - * - * @return max transactions per second - */ - public long getMaxTransactionsPerSecond(); + public Provider getProvider(); + public PhoenixTransactionContext newTransactionContext(PhoenixTransactionContext contex, boolean subTask); - /** - * - * @param version - */ - public boolean isPreExistingVersion(long version); - - /** - * - * @return the coprocessor - */ - public BaseRegionObserver getCoprocessor(); - - /** - * - * @return the family delete marker - */ - public byte[] getFamilyDeleteMarker(); - - /** - * Setup transaction manager's configuration for testing - */ - public void setTxnConfigs(Configuration config, String tmpFolder, int defaultTxnTimeoutSeconds) throws IOException; - - /** - * Setup transaction manager for testing - */ - public void setupTxManager(Configuration config, String url) throws SQLException; - - /** - * Tear down transaction manager for testing - */ - public void tearDownTxManager(); + public HTableInterface getTransactionalTable(HTableInterface htable, boolean isImmutable); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/02715adc/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java new file mode 100644 index 0000000..cdc6058 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.transaction; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; + +public interface PhoenixTransactionProvider { + public enum Feature { + ALTER_NONTX_TO_TX(SQLExceptionCode.CANNOT_ALTER_TABLE_FROM_NON_TXN_TO_TXNL); + + private final SQLExceptionCode code; + + Feature(SQLExceptionCode code) { + this.code = code; + } + + public SQLExceptionCode getCode() { + return code; + } + } + public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException; + public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection); + + public PhoenixTransactionClient getTransactionClient(Configuration config, ConnectionInfo connectionInfo); + public PhoenixTransactionService getTransactionService(Configuration config, ConnectionInfo connectionInfo); + public Class<? extends RegionObserver> getCoprocessor(); + + public TransactionFactory.Provider getProvider(); + public boolean isUnsupported(Feature feature); +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/02715adc/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionService.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionService.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionService.java new file mode 100644 index 0000000..10c46e1 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionService.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.transaction; + +import java.io.Closeable; + +public interface PhoenixTransactionService extends Closeable { + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/02715adc/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java deleted file mode 100644 index 7af1c08..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.transaction; - -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HTableDescriptor; - -import java.io.IOException; -import java.util.List; - -public interface PhoenixTransactionalTable extends HTableInterface { - - /** - * Transaction version of {@link HTableInterface#get(Get get)} - * @param get - * @throws IOException - */ - public Result get(Get get) throws IOException; - - /** - * Transactional version of {@link HTableInterface#put(Put put)} - * @param put - * @throws IOException - */ - public void put(Put put) throws IOException; - - /** - * Transactional version of {@link HTableInterface#delete(Delete delete)} - * - * @param delete - * @throws IOException - */ - public void delete(Delete delete) throws IOException; - - /** - * Transactional version of {@link HTableInterface#getScanner(Scan scan)} - * - * @param scan - * @return ResultScanner - * @throws IOException - */ - public ResultScanner getScanner(Scan scan) throws IOException; - - /** - * Returns Htable name - */ - public byte[] getTableName(); - - /** - * Returns Htable configuration object - */ - public Configuration getConfiguration(); - - /** - * Returns HTableDescriptor of Htable - * @throws IOException - */ - public HTableDescriptor getTableDescriptor() throws IOException; - - /** - * Checks if cell exists - * @throws IOException - */ - public boolean exists(Get get) throws IOException; - - /** - * Transactional version of {@link HTableInterface#get(List gets)} - * @throws IOException - */ - public Result[] get(List<Get> gets) throws IOException; - - /** - * Transactional version of {@link HTableInterface#getScanner(byte[] family)} - * @throws IOException - */ - public ResultScanner getScanner(byte[] family) throws IOException; - - /** - * Transactional version of {@link HTableInterface#getScanner(byte[] family, byte[] qualifier)} - * @throws IOException - */ - public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException; - - /** - * Transactional version of {@link HTableInterface#put(List puts)} - * @throws IOException - */ - public void put(List<Put> puts) throws IOException; - - /** - * Transactional version of {@link HTableInterface#delete(List deletes)} - * @throws IOException - */ - public void delete(List<Delete> deletes) throws IOException; - - /** - * Delegates to {@link HTable#setAutoFlush(boolean autoFlush)} - */ - public void setAutoFlush(boolean autoFlush); - - /** - * Delegates to {@link HTable#isAutoFlush()} - */ - public boolean isAutoFlush(); - - /** - * Delegates to see HTable.getWriteBufferSize() - */ - public long getWriteBufferSize(); - - /** - * Delegates to see HTable.setWriteBufferSize() - */ - public void setWriteBufferSize(long writeBufferSize) throws IOException; - - /** - * Delegates to see HTable.flushCommits() - */ - public void flushCommits() throws IOException; - - /** - * Releases resources - * @throws IOException - */ - public void close() throws IOException; -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/02715adc/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java index 77c3ab6..8b16210 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java @@ -19,71 +19,46 @@ package org.apache.phoenix.transaction; import java.io.IOException; import java.sql.SQLException; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.conf.Configuration; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.transaction.TephraTransactionProvider.TephraTransactionClient; import org.apache.tephra.Transaction; +import org.apache.tephra.Transaction.VisibilityLevel; import org.apache.tephra.TransactionAware; import org.apache.tephra.TransactionCodec; import org.apache.tephra.TransactionConflictException; import org.apache.tephra.TransactionContext; import org.apache.tephra.TransactionFailureException; -import org.apache.tephra.TransactionManager; import org.apache.tephra.TransactionSystemClient; -import org.apache.tephra.Transaction.VisibilityLevel; import org.apache.tephra.TxConstants; -import org.apache.tephra.distributed.PooledClientProvider; -import org.apache.tephra.distributed.TransactionServiceClient; -import org.apache.tephra.hbase.coprocessor.TransactionProcessor; -import org.apache.tephra.inmemory.InMemoryTxSystemClient; -import org.apache.tephra.util.TxUtils; +import org.apache.tephra.hbase.TransactionAwareHTable; import org.apache.tephra.visibility.FenceWait; import org.apache.tephra.visibility.VisibilityFence; -import org.apache.tephra.zookeeper.TephraZKClientService; -import org.apache.tephra.distributed.TransactionService; -import org.apache.tephra.metrics.TxMetricsCollector; -import org.apache.tephra.persist.HDFSTransactionStateStorage; -import org.apache.tephra.snapshot.SnapshotCodecProvider; -import org.apache.twill.discovery.DiscoveryService; -import org.apache.twill.discovery.ZKDiscoveryService; -import org.apache.twill.internal.utils.Networks; -import org.apache.twill.zookeeper.RetryStrategies; -import org.apache.twill.zookeeper.ZKClientService; -import org.apache.twill.zookeeper.ZKClientServices; -import org.apache.twill.zookeeper.ZKClients; - -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.Lists; -import com.google.inject.util.Providers; - import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class TephraTransactionContext implements PhoenixTransactionContext { +import com.google.common.collect.Lists; +public class TephraTransactionContext implements PhoenixTransactionContext { + private static final Logger logger = LoggerFactory.getLogger(TephraTransactionContext.class); private static final TransactionCodec CODEC = new TransactionCodec(); - private static TransactionSystemClient txClient = null; - private static ZKClientService zkClient = null; - private static TransactionService txService = null; - private static TransactionManager txManager = null; - private final List<TransactionAware> txAwares; private final TransactionContext txContext; private Transaction tx; private TransactionSystemClient txServiceClient; - private TransactionFailureException e; public TephraTransactionContext() { this.txServiceClient = null; @@ -93,21 +68,21 @@ public class TephraTransactionContext implements PhoenixTransactionContext { public TephraTransactionContext(byte[] txnBytes) throws IOException { this(); - this.tx = (txnBytes != null && txnBytes.length > 0) ? CODEC - .decode(txnBytes) : null; + this.tx = CODEC.decode(txnBytes); } public TephraTransactionContext(PhoenixConnection connection) { - this.txServiceClient = txClient; + PhoenixTransactionClient client = connection.getQueryServices().initTransactionClient(getProvider()); + assert (client instanceof TephraTransactionClient); + this.txServiceClient = ((TephraTransactionClient)client).getTransactionClient(); this.txAwares = Collections.emptyList(); this.txContext = new TransactionContext(txServiceClient); } - public TephraTransactionContext(PhoenixTransactionContext ctx, - PhoenixConnection connection, boolean subTask) { - this.txServiceClient = txClient; + private TephraTransactionContext(PhoenixTransactionContext ctx, boolean subTask) { assert (ctx instanceof TephraTransactionContext); TephraTransactionContext tephraTransactionContext = (TephraTransactionContext) ctx; + this.txServiceClient = tephraTransactionContext.txServiceClient; if (subTask) { this.tx = tephraTransactionContext.getTransaction(); @@ -117,42 +92,13 @@ public class TephraTransactionContext implements PhoenixTransactionContext { this.txAwares = Collections.emptyList(); this.txContext = tephraTransactionContext.getContext(); } - - this.e = null; - } - - @Override - public void setInMemoryTransactionClient(Configuration config) { - TransactionManager txnManager = new TransactionManager(config); - txClient = this.txServiceClient = new InMemoryTxSystemClient(txnManager); } @Override - public ZKClientService setTransactionClient(Configuration config, ReadOnlyProps props, ConnectionInfo connectionInfo) { - String zkQuorumServersString = props.get(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM); - if (zkQuorumServersString==null) { - zkQuorumServersString = connectionInfo.getZookeeperQuorum()+":"+connectionInfo.getPort(); - } - - int timeOut = props.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); - // Create instance of the tephra zookeeper client - ZKClientService txZKClientService = ZKClientServices.delegate( - ZKClients.reWatchOnExpire( - ZKClients.retryOnFailure( - new TephraZKClientService(zkQuorumServersString, timeOut, null, - ArrayListMultimap.<String, byte[]>create()), - RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)) - ) - ); - txZKClientService.startAndWait(); - ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(txZKClientService); - PooledClientProvider pooledClientProvider = new PooledClientProvider( - config, zkDiscoveryService); - txClient = this.txServiceClient = new TransactionServiceClient(config,pooledClientProvider); - - return txZKClientService; + public TransactionFactory.Provider getProvider() { + return TransactionFactory.Provider.TEPHRA; } - + @Override public void begin() throws SQLException { if (txContext == null) { @@ -181,8 +127,6 @@ public class TephraTransactionContext implements PhoenixTransactionContext { try { txContext.finish(); } catch (TransactionFailureException e) { - this.e = e; - if (e instanceof TransactionConflictException) { throw new SQLExceptionInfo.Builder( SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION) @@ -204,14 +148,8 @@ public class TephraTransactionContext implements PhoenixTransactionContext { } try { - if (e != null) { - txContext.abort(e); - e = null; - } else { - txContext.abort(); - } + txContext.abort(); } catch (TransactionFailureException e) { - this.e = null; throw new SQLExceptionInfo.Builder( SQLExceptionCode.TRANSACTION_FAILED) .setMessage(e.getMessage()).setRootCause(e).build() @@ -249,7 +187,7 @@ public class TephraTransactionContext implements PhoenixTransactionContext { } @Override - public void commitDDLFence(PTable dataTable, Logger logger) + public void commitDDLFence(PTable dataTable) throws SQLException { byte[] key = dataTable.getName().getBytes(); @@ -276,7 +214,12 @@ public class TephraTransactionContext implements PhoenixTransactionContext { } } + @Override public void markDMLFence(PTable table) { + if (table.getType() == PTableType.INDEX) { + return; + } + byte[] logicalKey = table.getName().getBytes(); TransactionAware logicalTxAware = VisibilityFence.create(logicalKey); @@ -300,6 +243,9 @@ public class TephraTransactionContext implements PhoenixTransactionContext { @Override public void join(PhoenixTransactionContext ctx) { + if (ctx == PhoenixTransactionContext.NULL_CONTEXT) { + return; + } assert (ctx instanceof TephraTransactionContext); TephraTransactionContext tephraContext = (TephraTransactionContext) ctx; @@ -325,7 +271,6 @@ public class TephraTransactionContext implements PhoenixTransactionContext { public void reset() { tx = null; txAwares.clear(); - this.e = null; } @Override @@ -406,86 +351,15 @@ public class TephraTransactionContext implements PhoenixTransactionContext { assert (tx != null); try { - return CODEC.encode(tx); + byte[] encodedTxBytes = CODEC.encode(tx); + encodedTxBytes = Arrays.copyOf(encodedTxBytes, encodedTxBytes.length + 1); + encodedTxBytes[encodedTxBytes.length - 1] = getProvider().getCode(); + return encodedTxBytes; } catch (IOException e) { throw new SQLException(e); } } - @Override - public long getMaxTransactionsPerSecond() { - return TxConstants.MAX_TX_PER_MS; - } - - @Override - public boolean isPreExistingVersion(long version) { - return TxUtils.isPreExistingVersion(version); - } - - @Override - public BaseRegionObserver getCoprocessor() { - return new TransactionProcessor(); - } - - @Override - public byte[] getFamilyDeleteMarker() { - return TxConstants.FAMILY_DELETE_QUALIFIER; - } - - @Override - public void setTxnConfigs(Configuration config, String tmpFolder, int defaultTxnTimeoutSeconds) throws IOException { - config.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false); - config.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times"); - config.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1); - config.setInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, Networks.getRandomPort()); - config.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder); - config.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, defaultTxnTimeoutSeconds); - config.unset(TxConstants.Manager.CFG_TX_HDFS_USER); - config.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5L); - } - - @Override - public void setupTxManager(Configuration config, String url) throws SQLException { - - if (txService != null) { - return; - } - - ConnectionInfo connInfo = ConnectionInfo.create(url); - zkClient = ZKClientServices.delegate( - ZKClients.reWatchOnExpire( - ZKClients.retryOnFailure( - ZKClientService.Builder.of(connInfo.getZookeeperConnectionString()) - .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT, - HConstants.DEFAULT_ZK_SESSION_TIMEOUT)) - .build(), - RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS) - ) - ) - ); - zkClient.startAndWait(); - - DiscoveryService discovery = new ZKDiscoveryService(zkClient); - txManager = new TransactionManager(config, new HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config), new TxMetricsCollector()), new TxMetricsCollector()); - txService = new TransactionService(config, zkClient, discovery, Providers.of(txManager)); - txService.startAndWait(); - } - - @Override - public void tearDownTxManager() { - try { - if (txService != null) txService.stopAndWait(); - } finally { - try { - if (zkClient != null) zkClient.stopAndWait(); - } finally { - txService = null; - zkClient = null; - txManager = null; - } - } - } - /** * TephraTransactionContext specific functions */ @@ -511,4 +385,17 @@ public class TephraTransactionContext implements PhoenixTransactionContext { txAware.startTx(tx); } } + + @Override + public PhoenixTransactionContext newTransactionContext(PhoenixTransactionContext context, boolean subTask) { + return new TephraTransactionContext(context, subTask); + } + + @Override + public HTableInterface getTransactionalTable(HTableInterface htable, boolean isImmutable) { + TransactionAwareHTable transactionAwareHTable = new TransactionAwareHTable(htable, isImmutable ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW); + this.addTransactionAware(transactionAwareHTable); + return transactionAwareHTable; + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/02715adc/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java index 795be9f..2e52efa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java @@ -18,16 +18,37 @@ package org.apache.phoenix.transaction; import java.io.IOException; +import java.util.concurrent.TimeUnit; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.phoenix.coprocessor.TephraTransactionalProcessor; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; +import org.apache.phoenix.transaction.TransactionFactory.Provider; +import org.apache.tephra.TransactionManager; +import org.apache.tephra.TransactionSystemClient; import org.apache.tephra.TxConstants; +import org.apache.tephra.distributed.PooledClientProvider; +import org.apache.tephra.distributed.TransactionService; +import org.apache.tephra.distributed.TransactionServiceClient; +import org.apache.tephra.inmemory.InMemoryTxSystemClient; +import org.apache.tephra.metrics.TxMetricsCollector; +import org.apache.tephra.persist.HDFSTransactionStateStorage; +import org.apache.tephra.snapshot.SnapshotCodecProvider; +import org.apache.tephra.zookeeper.TephraZKClientService; +import org.apache.twill.discovery.DiscoveryService; +import org.apache.twill.discovery.ZKDiscoveryService; +import org.apache.twill.zookeeper.RetryStrategies; +import org.apache.twill.zookeeper.ZKClientService; +import org.apache.twill.zookeeper.ZKClientServices; +import org.apache.twill.zookeeper.ZKClients; -public class TephraTransactionProvider implements TransactionProvider { +import com.google.common.collect.ArrayListMultimap; +import com.google.inject.util.Providers; + +public class TephraTransactionProvider implements PhoenixTransactionProvider { private static final TephraTransactionProvider INSTANCE = new TephraTransactionProvider(); public static final TephraTransactionProvider getInstance() { @@ -37,12 +58,6 @@ public class TephraTransactionProvider implements TransactionProvider { private TephraTransactionProvider() { } - - @Override - public PhoenixTransactionContext getTransactionContext() { - return new TephraTransactionContext(); - } - @Override public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException { return new TephraTransactionContext(txnBytes); @@ -54,23 +69,129 @@ public class TephraTransactionProvider implements TransactionProvider { } @Override - public PhoenixTransactionContext getTransactionContext(PhoenixTransactionContext contex, PhoenixConnection connection, boolean subTask) { - return new TephraTransactionContext(contex, connection, subTask); + public PhoenixTransactionClient getTransactionClient(Configuration config, ConnectionInfo connectionInfo) { + if (connectionInfo.isConnectionless()) { + TransactionManager txnManager = new TransactionManager(config); + TransactionSystemClient txClient = new InMemoryTxSystemClient(txnManager); + return new TephraTransactionClient(txClient); + + } + String zkQuorumServersString = config.get(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM); + if (zkQuorumServersString==null) { + zkQuorumServersString = connectionInfo.getZookeeperConnectionString(); + } + + int timeOut = config.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); + // Create instance of the tephra zookeeper client + ZKClientService zkClientService = ZKClientServices.delegate( + ZKClients.reWatchOnExpire( + ZKClients.retryOnFailure( + new TephraZKClientService(zkQuorumServersString, timeOut, null, + ArrayListMultimap.<String, byte[]>create()), + RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)) + ) + ); + //txZKClientService.startAndWait(); + ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(zkClientService); + PooledClientProvider pooledClientProvider = new PooledClientProvider( + config, zkDiscoveryService); + TransactionServiceClient txClient = new TransactionServiceClient(config,pooledClientProvider); + TephraTransactionClient client = new TephraTransactionClient(zkClientService, txClient); + client.start(); + + return client; } @Override - public PhoenixTransactionalTable getTransactionalTable(PhoenixTransactionContext ctx, HTableInterface htable) { - return new TephraTransactionTable(ctx, htable); + public PhoenixTransactionService getTransactionService(Configuration config, ConnectionInfo connInfo) { + ZKClientService zkClient = ZKClientServices.delegate( + ZKClients.reWatchOnExpire( + ZKClients.retryOnFailure( + ZKClientService.Builder.of(connInfo.getZookeeperConnectionString()) + .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT, + HConstants.DEFAULT_ZK_SESSION_TIMEOUT)) + .build(), + RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS) + ) + ) + ); + + //zkClient.startAndWait(); + DiscoveryService discovery = new ZKDiscoveryService(zkClient); + TransactionManager txManager = new TransactionManager(config, new HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config), new TxMetricsCollector()), new TxMetricsCollector()); + TransactionService txService = new TransactionService(config, zkClient, discovery, Providers.of(txManager)); + TephraTransactionService service = new TephraTransactionService(zkClient, txService); + //txService.startAndWait(); + service.start(); + return service; + } + + static class TephraTransactionService implements PhoenixTransactionService { + private final ZKClientService zkClient; + private final TransactionService txService; + + public TephraTransactionService(ZKClientService zkClient, TransactionService txService) { + this.zkClient = zkClient; + this.txService = txService; + } + + public void start() { + zkClient.startAndWait(); + txService.startAndWait(); + } + + @Override + public void close() throws IOException { + try { + if (txService != null) txService.stopAndWait(); + } finally { + if (zkClient != null) zkClient.stopAndWait(); + } + } + } - @Override - public Cell newDeleteFamilyMarker(byte[] row, byte[] family, long timestamp) { - return CellUtil.createCell(row, family, TxConstants.FAMILY_DELETE_QUALIFIER, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY); + static class TephraTransactionClient implements PhoenixTransactionClient { + private final ZKClientService zkClient; + private final TransactionSystemClient txClient; + + public TephraTransactionClient(TransactionSystemClient txClient) { + this(null, txClient); + } + + public TephraTransactionClient(ZKClientService zkClient, TransactionSystemClient txClient) { + this.zkClient = zkClient; + this.txClient = txClient; + } + + public void start() { + zkClient.startAndWait(); + } + + public TransactionSystemClient getTransactionClient() { + return txClient; + } + + @Override + public void close() throws IOException { + zkClient.stopAndWait(); + } + } @Override - public Cell newDeleteColumnMarker(byte[] row, byte[] family, byte[] qualifier, long timestamp) { - return CellUtil.createCell(row, family, qualifier, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY); + public Class<? extends RegionObserver> getCoprocessor() { + return TephraTransactionalProcessor.class; + } + + @Override + public Provider getProvider() { + return TransactionFactory.Provider.TEPHRA; + } + + @Override + public boolean isUnsupported(Feature feature) { + return false; } }