http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java index 54c688a..65a43de 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java @@ -26,7 +26,7 @@ import java.sql.SQLException; import java.util.List; import java.util.Set; -import com.google.common.collect.Sets; +import co.cask.tephra.Transaction; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HRegionInfo; @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.cache.GlobalCache; import org.apache.phoenix.cache.TenantCache; +import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.KeyValueColumnExpression; @@ -58,8 +59,10 @@ import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; +import org.apache.phoenix.util.TransactionUtil; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; /** @@ -191,6 +194,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { Region dataRegion = null; IndexMaintainer indexMaintainer = null; byte[][] viewConstants = null; + Transaction tx = null; ColumnReference[] dataColumns = IndexUtil.deserializeDataTableColumnsToJoin(scan); if (dataColumns != null) { tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns); @@ -199,14 +203,16 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes); indexMaintainer = indexMaintainers.get(0); viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan); + byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE); + tx = MutationState.decodeTransaction(txState); } final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan); final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan); innerScanner = getWrappedScanner(c, innerScanner, arrayKVRefs, arrayFuncRefs, offset, scan, - dataColumns, tupleProjector, dataRegion, indexMaintainer, viewConstants, - kvSchema, kvSchemaBitSet, j == null ? p : null, ptr); + dataColumns, tupleProjector, dataRegion, indexMaintainer, tx, + viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : null, ptr); final ImmutableBytesWritable tenantId = ScanUtil.getTenantId(scan); if (j != null) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java index 2f31b08..9f3bdb4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java @@ -26,18 +26,17 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorException; import org.apache.hadoop.hbase.coprocessor.CoprocessorService; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; - import org.apache.phoenix.cache.GlobalCache; import org.apache.phoenix.cache.TenantCache; -import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; - import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory; import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.AddServerCacheRequest; import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.AddServerCacheResponse; import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.RemoveServerCacheRequest; import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.RemoveServerCacheResponse; import org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ServerCachingService; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.protobuf.ProtobufUtil; +import org.apache.phoenix.util.ByteUtil; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; @@ -66,6 +65,7 @@ public class ServerCachingEndpointImpl extends ServerCachingService implements C ImmutableBytesWritable cachePtr = org.apache.phoenix.protobuf.ProtobufUtil .toImmutableBytesWritable(request.getCachePtr()); + byte[] txState = request.hasTxState() ? request.getTxState().toByteArray() : ByteUtil.EMPTY_BYTE_ARRAY; try { @SuppressWarnings("unchecked") @@ -73,7 +73,7 @@ public class ServerCachingEndpointImpl extends ServerCachingService implements C (Class<ServerCacheFactory>) Class.forName(request.getCacheFactory().getClassName()); ServerCacheFactory cacheFactory = serverCacheFactoryClass.newInstance(); tenantCache.addServerCache(new ImmutableBytesPtr(request.getCacheId().toByteArray()), - cachePtr, cacheFactory); + cachePtr, txState, cacheFactory); } catch (Throwable e) { ProtobufUtil.setControllerException(controller, new IOException(e)); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java index 4fdfe99..b201c8e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java @@ -36,18 +36,19 @@ import org.apache.phoenix.memory.MemoryManager.MemoryChunk; */ public interface ServerCachingProtocol { public static interface ServerCacheFactory extends Writable { - public Closeable newCache(ImmutableBytesWritable cachePtr, MemoryChunk chunk) throws SQLException; + public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk chunk) throws SQLException; } /** * Add the cache to the region server cache. * @param tenantId the tenantId or null if not applicable * @param cacheId unique identifier of the cache * @param cachePtr pointer to the byte array of the cache + * @param txState TODO * @param cacheFactory factory that converts from byte array to object representation on the server side * @return true on success and otherwise throws * @throws SQLException */ - public boolean addServerCache(byte[] tenantId, byte[] cacheId, ImmutableBytesWritable cachePtr, ServerCacheFactory cacheFactory) throws SQLException; + public boolean addServerCache(byte[] tenantId, byte[] cacheId, ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory cacheFactory) throws SQLException; /** * Remove the cache from the region server cache. Called upon completion of * the operation when cache is no longer needed. http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 5133cb2..ffdc05f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -105,6 +105,8 @@ import org.apache.phoenix.util.TimeKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import co.cask.tephra.TxConstants; + import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -403,6 +405,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ Delete delete = new Delete(firstKV.getRowArray(), firstKV.getRowOffset(), firstKV.getRowLength(),ts); mutations.add(delete); + // force tephra to ignore this deletes + delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); } else if (isUpsert) { Arrays.fill(values, null); int i = 0; @@ -462,6 +466,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ results.get(0).getRowOffset(), results.get(0).getRowLength()); delete.deleteColumns(deleteCF, deleteCQ, ts); + // force tephra to ignore this deletes + delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); mutations.add(delete); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java index 25f8271..7e71cd9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java @@ -3218,6 +3218,16 @@ public final class PTableProtos { * <code>optional bool rowKeyOrderOptimizable = 26;</code> */ boolean getRowKeyOrderOptimizable(); + + // optional bool transactional = 27; + /** + * <code>optional bool transactional = 27;</code> + */ + boolean hasTransactional(); + /** + * <code>optional bool transactional = 27;</code> + */ + boolean getTransactional(); } /** * Protobuf type {@code PTable} @@ -3418,6 +3428,11 @@ public final class PTableProtos { rowKeyOrderOptimizable_ = input.readBool(); break; } + case 216: { + bitField0_ |= 0x00400000; + transactional_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -3980,6 +3995,22 @@ public final class PTableProtos { return rowKeyOrderOptimizable_; } + // optional bool transactional = 27; + public static final int TRANSACTIONAL_FIELD_NUMBER = 27; + private boolean transactional_; + /** + * <code>optional bool transactional = 27;</code> + */ + public boolean hasTransactional() { + return ((bitField0_ & 0x00400000) == 0x00400000); + } + /** + * <code>optional bool transactional = 27;</code> + */ + public boolean getTransactional() { + return transactional_; + } + private void initFields() { schemaNameBytes_ = com.google.protobuf.ByteString.EMPTY; tableNameBytes_ = com.google.protobuf.ByteString.EMPTY; @@ -4007,6 +4038,7 @@ public final class PTableProtos { storeNulls_ = false; baseColumnCount_ = 0; rowKeyOrderOptimizable_ = false; + transactional_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -4152,6 +4184,9 @@ public final class PTableProtos { if (((bitField0_ & 0x00200000) == 0x00200000)) { output.writeBool(26, rowKeyOrderOptimizable_); } + if (((bitField0_ & 0x00400000) == 0x00400000)) { + output.writeBool(27, transactional_); + } getUnknownFields().writeTo(output); } @@ -4270,6 +4305,10 @@ public final class PTableProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(26, rowKeyOrderOptimizable_); } + if (((bitField0_ & 0x00400000) == 0x00400000)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(27, transactional_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -4411,6 +4450,11 @@ public final class PTableProtos { result = result && (getRowKeyOrderOptimizable() == other.getRowKeyOrderOptimizable()); } + result = result && (hasTransactional() == other.hasTransactional()); + if (hasTransactional()) { + result = result && (getTransactional() + == other.getTransactional()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -4528,6 +4572,10 @@ public final class PTableProtos { hash = (37 * hash) + ROWKEYORDEROPTIMIZABLE_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getRowKeyOrderOptimizable()); } + if (hasTransactional()) { + hash = (37 * hash) + TRANSACTIONAL_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getTransactional()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -4704,6 +4752,8 @@ public final class PTableProtos { bitField0_ = (bitField0_ & ~0x01000000); rowKeyOrderOptimizable_ = false; bitField0_ = (bitField0_ & ~0x02000000); + transactional_ = false; + bitField0_ = (bitField0_ & ~0x04000000); return this; } @@ -4852,6 +4902,10 @@ public final class PTableProtos { to_bitField0_ |= 0x00200000; } result.rowKeyOrderOptimizable_ = rowKeyOrderOptimizable_; + if (((from_bitField0_ & 0x04000000) == 0x04000000)) { + to_bitField0_ |= 0x00400000; + } + result.transactional_ = transactional_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -5024,6 +5078,9 @@ public final class PTableProtos { if (other.hasRowKeyOrderOptimizable()) { setRowKeyOrderOptimizable(other.getRowKeyOrderOptimizable()); } + if (other.hasTransactional()) { + setTransactional(other.getTransactional()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -6694,6 +6751,39 @@ public final class PTableProtos { return this; } + // optional bool transactional = 27; + private boolean transactional_ ; + /** + * <code>optional bool transactional = 27;</code> + */ + public boolean hasTransactional() { + return ((bitField0_ & 0x04000000) == 0x04000000); + } + /** + * <code>optional bool transactional = 27;</code> + */ + public boolean getTransactional() { + return transactional_; + } + /** + * <code>optional bool transactional = 27;</code> + */ + public Builder setTransactional(boolean value) { + bitField0_ |= 0x04000000; + transactional_ = value; + onChanged(); + return this; + } + /** + * <code>optional bool transactional = 27;</code> + */ + public Builder clearTransactional() { + bitField0_ = (bitField0_ & ~0x04000000); + transactional_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:PTable) } @@ -6740,7 +6830,7 @@ public final class PTableProtos { "leStats\022\013\n\003key\030\001 \002(\014\022\016\n\006values\030\002 \003(\014\022\033\n\023" + "guidePostsByteCount\030\003 \001(\003\022\025\n\rkeyBytesCou", "nt\030\004 \001(\003\022\027\n\017guidePostsCount\030\005 \001(\005\022!\n\013pGu" + - "idePosts\030\006 \001(\0132\014.PGuidePosts\"\357\004\n\006PTable\022" + + "idePosts\030\006 \001(\0132\014.PGuidePosts\"\206\005\n\006PTable\022" + "\027\n\017schemaNameBytes\030\001 \002(\014\022\026\n\016tableNameByt" + "es\030\002 \002(\014\022\036\n\ttableType\030\003 \002(\0162\013.PTableType" + "\022\022\n\nindexState\030\004 \001(\t\022\026\n\016sequenceNumber\030\005" + @@ -6756,10 +6846,11 @@ public final class PTableProtos { "wIndexId\030\025 \001(\005\022\021\n\tindexType\030\026 \001(\014\022\026\n\016sta" + "tsTimeStamp\030\027 \001(\003\022\022\n\nstoreNulls\030\030 \001(\010\022\027\n" + "\017baseColumnCount\030\031 \001(\005\022\036\n\026rowKeyOrderOpt" + - "imizable\030\032 \001(\010*A\n\nPTableType\022\n\n\006SYSTEM\020\000" + - "\022\010\n\004USER\020\001\022\010\n\004VIEW\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020" + - "\004B@\n(org.apache.phoenix.coprocessor.gene", - "ratedB\014PTableProtosH\001\210\001\001\240\001\001" + "imizable\030\032 \001(\010\022\025\n\rtransactional\030\033 \001(\010*A\n" + + "\nPTableType\022\n\n\006SYSTEM\020\000\022\010\n\004USER\020\001\022\010\n\004VIE" + + "W\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020\004B@\n(org.apache.p", + "hoenix.coprocessor.generatedB\014PTableProt" + + "osH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -6783,7 +6874,7 @@ public final class PTableProtos { internal_static_PTable_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_PTable_descriptor, - new java.lang.String[] { "SchemaNameBytes", "TableNameBytes", "TableType", "IndexState", "SequenceNumber", "TimeStamp", "PkNameBytes", "BucketNum", "Columns", "Indexes", "IsImmutableRows", "GuidePosts", "DataTableNameBytes", "DefaultFamilyName", "DisableWAL", "MultiTenant", "ViewType", "ViewStatement", "PhysicalNames", "TenantId", "ViewIndexId", "IndexType", "StatsTimeStamp", "StoreNulls", "BaseColumnCount", "RowKeyOrderOptimizable", }); + new java.lang.String[] { "SchemaNameBytes", "TableNameBytes", "TableType", "IndexState", "SequenceNumber", "TimeStamp", "PkNameBytes", "BucketNum", "Columns", "Indexes", "IsImmutableRows", "GuidePosts", "DataTableNameBytes", "DefaultFamilyName", "DisableWAL", "MultiTenant", "ViewType", "ViewStatement", "PhysicalNames", "TenantId", "ViewIndexId", "IndexType", "StatsTimeStamp", "StoreNulls", "BaseColumnCount", "RowKeyOrderOptimizable", "Transactional", }); return null; } }; http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java index 69db21b..5ee1dfb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java @@ -699,6 +699,16 @@ public final class ServerCachingProtos { * <code>required .ServerCacheFactory cacheFactory = 4;</code> */ org.apache.phoenix.coprocessor.generated.ServerCacheFactoryProtos.ServerCacheFactoryOrBuilder getCacheFactoryOrBuilder(); + + // optional bytes txState = 5; + /** + * <code>optional bytes txState = 5;</code> + */ + boolean hasTxState(); + /** + * <code>optional bytes txState = 5;</code> + */ + com.google.protobuf.ByteString getTxState(); } /** * Protobuf type {@code AddServerCacheRequest} @@ -787,6 +797,11 @@ public final class ServerCachingProtos { bitField0_ |= 0x00000008; break; } + case 42: { + bitField0_ |= 0x00000010; + txState_ = input.readBytes(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -903,11 +918,28 @@ public final class ServerCachingProtos { return cacheFactory_; } + // optional bytes txState = 5; + public static final int TXSTATE_FIELD_NUMBER = 5; + private com.google.protobuf.ByteString txState_; + /** + * <code>optional bytes txState = 5;</code> + */ + public boolean hasTxState() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * <code>optional bytes txState = 5;</code> + */ + public com.google.protobuf.ByteString getTxState() { + return txState_; + } + private void initFields() { tenantId_ = com.google.protobuf.ByteString.EMPTY; cacheId_ = com.google.protobuf.ByteString.EMPTY; cachePtr_ = org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ImmutableBytesWritable.getDefaultInstance(); cacheFactory_ = org.apache.phoenix.coprocessor.generated.ServerCacheFactoryProtos.ServerCacheFactory.getDefaultInstance(); + txState_ = com.google.protobuf.ByteString.EMPTY; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -953,6 +985,9 @@ public final class ServerCachingProtos { if (((bitField0_ & 0x00000008) == 0x00000008)) { output.writeMessage(4, cacheFactory_); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + output.writeBytes(5, txState_); + } getUnknownFields().writeTo(output); } @@ -978,6 +1013,10 @@ public final class ServerCachingProtos { size += com.google.protobuf.CodedOutputStream .computeMessageSize(4, cacheFactory_); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(5, txState_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -1021,6 +1060,11 @@ public final class ServerCachingProtos { result = result && getCacheFactory() .equals(other.getCacheFactory()); } + result = result && (hasTxState() == other.hasTxState()); + if (hasTxState()) { + result = result && getTxState() + .equals(other.getTxState()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -1050,6 +1094,10 @@ public final class ServerCachingProtos { hash = (37 * hash) + CACHEFACTORY_FIELD_NUMBER; hash = (53 * hash) + getCacheFactory().hashCode(); } + if (hasTxState()) { + hash = (37 * hash) + TXSTATE_FIELD_NUMBER; + hash = (53 * hash) + getTxState().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -1177,6 +1225,8 @@ public final class ServerCachingProtos { cacheFactoryBuilder_.clear(); } bitField0_ = (bitField0_ & ~0x00000008); + txState_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000010); return this; } @@ -1229,6 +1279,10 @@ public final class ServerCachingProtos { } else { result.cacheFactory_ = cacheFactoryBuilder_.build(); } + if (((from_bitField0_ & 0x00000010) == 0x00000010)) { + to_bitField0_ |= 0x00000010; + } + result.txState_ = txState_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -1257,6 +1311,9 @@ public final class ServerCachingProtos { if (other.hasCacheFactory()) { mergeCacheFactory(other.getCacheFactory()); } + if (other.hasTxState()) { + setTxState(other.getTxState()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -1610,6 +1667,42 @@ public final class ServerCachingProtos { return cacheFactoryBuilder_; } + // optional bytes txState = 5; + private com.google.protobuf.ByteString txState_ = com.google.protobuf.ByteString.EMPTY; + /** + * <code>optional bytes txState = 5;</code> + */ + public boolean hasTxState() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * <code>optional bytes txState = 5;</code> + */ + public com.google.protobuf.ByteString getTxState() { + return txState_; + } + /** + * <code>optional bytes txState = 5;</code> + */ + public Builder setTxState(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000010; + txState_ = value; + onChanged(); + return this; + } + /** + * <code>optional bytes txState = 5;</code> + */ + public Builder clearTxState() { + bitField0_ = (bitField0_ & ~0x00000010); + txState_ = getDefaultInstance().getTxState(); + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:AddServerCacheRequest) } @@ -3383,20 +3476,21 @@ public final class ServerCachingProtos { "\n\032ServerCachingService.proto\032\030ServerCach" + "eFactory.proto\"K\n\026ImmutableBytesWritable" + "\022\021\n\tbyteArray\030\001 \002(\014\022\016\n\006offset\030\002 \002(\005\022\016\n\006l" + - "ength\030\003 \002(\005\"\220\001\n\025AddServerCacheRequest\022\020\n" + + "ength\030\003 \002(\005\"\241\001\n\025AddServerCacheRequest\022\020\n" + "\010tenantId\030\001 \001(\014\022\017\n\007cacheId\030\002 \002(\014\022)\n\010cach" + "ePtr\030\003 \002(\0132\027.ImmutableBytesWritable\022)\n\014c" + - "acheFactory\030\004 \002(\0132\023.ServerCacheFactory\"(" + - "\n\026AddServerCacheResponse\022\016\n\006return\030\001 \002(\010" + - "\"=\n\030RemoveServerCacheRequest\022\020\n\010tenantId" + - "\030\001 \001(\014\022\017\n\007cacheId\030\002 \002(\014\"+\n\031RemoveServerC", - "acheResponse\022\016\n\006return\030\001 \002(\0102\245\001\n\024ServerC" + - "achingService\022A\n\016addServerCache\022\026.AddSer" + - "verCacheRequest\032\027.AddServerCacheResponse" + - "\022J\n\021removeServerCache\022\031.RemoveServerCach" + - "eRequest\032\032.RemoveServerCacheResponseBG\n(" + - "org.apache.phoenix.coprocessor.generated" + - "B\023ServerCachingProtosH\001\210\001\001\240\001\001" + "acheFactory\030\004 \002(\0132\023.ServerCacheFactory\022\017" + + "\n\007txState\030\005 \001(\014\"(\n\026AddServerCacheRespons" + + "e\022\016\n\006return\030\001 \002(\010\"=\n\030RemoveServerCacheRe" + + "quest\022\020\n\010tenantId\030\001 \001(\014\022\017\n\007cacheId\030\002 \002(\014", + "\"+\n\031RemoveServerCacheResponse\022\016\n\006return\030" + + "\001 \002(\0102\245\001\n\024ServerCachingService\022A\n\016addSer" + + "verCache\022\026.AddServerCacheRequest\032\027.AddSe" + + "rverCacheResponse\022J\n\021removeServerCache\022\031" + + ".RemoveServerCacheRequest\032\032.RemoveServer" + + "CacheResponseBG\n(org.apache.phoenix.copr" + + "ocessor.generatedB\023ServerCachingProtosH\001" + + "\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -3414,7 +3508,7 @@ public final class ServerCachingProtos { internal_static_AddServerCacheRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_AddServerCacheRequest_descriptor, - new java.lang.String[] { "TenantId", "CacheId", "CachePtr", "CacheFactory", }); + new java.lang.String[] { "TenantId", "CacheId", "CachePtr", "CacheFactory", "TxState", }); internal_static_AddServerCacheResponse_descriptor = getDescriptor().getMessageTypes().get(2); internal_static_AddServerCacheResponse_fieldAccessorTable = new http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index bb76ccb..3613c95 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -159,6 +159,12 @@ public enum SQLExceptionCode { AGGREGATE_EXPRESSION_NOT_ALLOWED_IN_INDEX(520, "42897", "Aggregate expression not allowed in an index"), NON_DETERMINISTIC_EXPRESSION_NOT_ALLOWED_IN_INDEX(521, "42898", "Non-deterministic expression not allowed in an index"), STATELESS_EXPRESSION_NOT_ALLOWED_IN_INDEX(522, "42899", "Stateless expression not allowed in an index"), + + /** + * Transaction exceptions. + */ + TRANSACTION_CONFLICT_EXCEPTION(523, "42900", "Transaction aborted due to conflict with other mutations"), + TRANSACTION_EXCEPTION(524, "42901", "Transaction aborted due to error"), /** * Union All related errors @@ -258,7 +264,18 @@ public enum SQLExceptionCode { NO_LOCAL_INDEXES(1054, "43A11", "Local secondary indexes are not supported for HBase versions " + MetaDataUtil.decodeHBaseVersionAsString(PhoenixDatabaseMetaData.MIN_LOCAL_SI_VERSION_DISALLOW) + " through " + MetaDataUtil.decodeHBaseVersionAsString(PhoenixDatabaseMetaData.MAX_LOCAL_SI_VERSION_DISALLOW) + " inclusive."), UNALLOWED_LOCAL_INDEXES(1055, "43A12", "Local secondary indexes are configured to not be allowed."), + DESC_VARBINARY_NOT_SUPPORTED(1056, "43A13", "Descending VARBINARY columns not supported"), + + DEFAULT_COLUMN_FAMILY_ON_SHARED_TABLE(1069, "43A13", "Default column family not allowed on VIEW or shared INDEX"), + ONLY_TABLE_MAY_BE_DECLARED_TRANSACTIONAL(1070, "44A01", "Only tables may be declared as transactional"), + TX_MAY_NOT_SWITCH_TO_NON_TX(1071, "44A02", "A transactional table may not be switched to non transactional"), + STORE_NULLS_MUST_BE_TRUE_FOR_TRANSACTIONAL(1072, "44A03", "Store nulls must be true when a table is transactional"), + CANNOT_START_TRANSACTION_WITH_SCN_SET(1073, "44A04", "Cannot start a transaction on a connection with SCN set"), + TX_MAX_VERSIONS_MUST_BE_GREATER_THAN_ONE(1074, "44A05", "A transactional table must define VERSION of greater than one"), + CANNOT_SPECIFY_SCN_FOR_TXN_TABLE(1075, "44A06", "Cannot use a connection with SCN set for a transactional table"), + NULL_TRANSACTION_CONTEXT(1076, "44A07", "No Tranasction Context available"), + TRANSACTION_FAILED(1077, "44A08", "Transaction Failure "), /** Sequence related */ SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java index e292dd8..0bdb65a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java @@ -223,23 +223,26 @@ public abstract class BaseQueryPlan implements QueryPlan { if (table.getType() != PTableType.SYSTEM) { scan.setConsistency(connection.getConsistency()); } - // Get the time range of row_timestamp column - TimeRange rowTimestampRange = context.getScanRanges().getRowTimestampRange(); - // Get the already existing time range on the scan. - TimeRange scanTimeRange = scan.getTimeRange(); - Long scn = connection.getSCN(); - if (scn == null) { - scn = context.getCurrentTime(); - } - try { - TimeRange timeRangeToUse = ScanUtil.intersectTimeRange(rowTimestampRange, scanTimeRange, scn); - if (timeRangeToUse == null) { - return ResultIterator.EMPTY_ITERATOR; - } - scan.setTimeRange(timeRangeToUse.getMin(), timeRangeToUse.getMax()); - } catch (IOException e) { - throw new RuntimeException(e); - } + // TODO fix this in PHOENIX-2415 Support ROW_TIMESTAMP with transactional tables + if (!table.isTransactional()) { + // Get the time range of row_timestamp column + TimeRange rowTimestampRange = context.getScanRanges().getRowTimestampRange(); + // Get the already existing time range on the scan. + TimeRange scanTimeRange = scan.getTimeRange(); + Long scn = connection.getSCN(); + if (scn == null) { + scn = context.getCurrentTime(); + } + try { + TimeRange timeRangeToUse = ScanUtil.intersectTimeRange(rowTimestampRange, scanTimeRange, scn); + if (timeRangeToUse == null) { + return ResultIterator.EMPTY_ITERATOR; + } + scan.setTimeRange(timeRangeToUse.getMin(), timeRangeToUse.getMax()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } byte[] tenantIdBytes; if( table.isMultiTenant() == true ) { tenantIdBytes = connection.getTenantId() == null ? null : @@ -326,6 +329,9 @@ public abstract class BaseQueryPlan implements QueryPlan { ImmutableBytesWritable ptr = new ImmutableBytesWritable(); IndexMaintainer.serialize(dataTable, ptr, indexes, context.getConnection()); scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD, ByteUtil.copyKeyBytesIfNecessary(ptr)); + if (dataTable.isTransactional()) { + scan.setAttribute(BaseScannerRegionObserver.TX_STATE, context.getConnection().getMutationState().encodeTransaction()); + } } private void serializeViewConstantsIntoScan(Scan scan, PTable dataTable) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTable.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTable.java new file mode 100644 index 0000000..6b3f9ca --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTable.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.execute; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; + +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; + +public class DelegateHTable implements HTableInterface { + protected final HTableInterface delegate; + + public DelegateHTable(HTableInterface delegate) { + this.delegate = delegate; + } + + @Override + public byte[] getTableName() { + return delegate.getTableName(); + } + + @Override + public TableName getName() { + return delegate.getName(); + } + + @Override + public Configuration getConfiguration() { + return delegate.getConfiguration(); + } + + @Override + public HTableDescriptor getTableDescriptor() throws IOException { + return delegate.getTableDescriptor(); + } + + @Override + public boolean exists(Get get) throws IOException { + return delegate.exists(get); + } + + @Override + public Boolean[] exists(List<Get> gets) throws IOException { + return delegate.exists(gets); + } + + @Override + public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException { + delegate.batch(actions, results); + } + + @SuppressWarnings("deprecation") + @Override + public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException { + return delegate.batch(actions); + } + + @Override + public <R> void batchCallback(List<? extends Row> actions, Object[] results, Callback<R> callback) + throws IOException, InterruptedException { + delegate.batchCallback(actions, results, callback); + } + + @SuppressWarnings("deprecation") + @Override + public <R> Object[] batchCallback(List<? extends Row> actions, Callback<R> callback) throws IOException, + InterruptedException { + return delegate.batchCallback(actions, callback); + } + + @Override + public Result get(Get get) throws IOException { + return delegate.get(get); + } + + @Override + public Result[] get(List<Get> gets) throws IOException { + return delegate.get(gets); + } + + @SuppressWarnings("deprecation") + @Override + public Result getRowOrBefore(byte[] row, byte[] family) throws IOException { + return delegate.getRowOrBefore(row, family); + } + + @Override + public ResultScanner getScanner(Scan scan) throws IOException { + return delegate.getScanner(scan); + } + + @Override + public ResultScanner getScanner(byte[] family) throws IOException { + return delegate.getScanner(family); + } + + @Override + public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { + return delegate.getScanner(family, qualifier); + } + + @Override + public void put(Put put) throws IOException { + delegate.put(put); + } + + @Override + public void put(List<Put> puts) throws IOException { + delegate.put(puts); + } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException { + return delegate.checkAndPut(row, family, qualifier, value, put); + } + + @Override + public void delete(Delete delete) throws IOException { + delegate.delete(delete); + } + + @Override + public void delete(List<Delete> deletes) throws IOException { + delegate.delete(deletes); + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete) + throws IOException { + return delegate.checkAndDelete(row, family, qualifier, value, delete); + } + + @Override + public void mutateRow(RowMutations rm) throws IOException { + delegate.mutateRow(rm); + } + + @Override + public Result append(Append append) throws IOException { + return delegate.append(append); + } + + @Override + public Result increment(Increment increment) throws IOException { + return delegate.increment(increment); + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException { + return delegate.incrementColumnValue(row, family, qualifier, amount); + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability) + throws IOException { + return delegate.incrementColumnValue(row, family, qualifier, amount, durability); + } + + @SuppressWarnings("deprecation") + @Override + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL) + throws IOException { + return delegate.incrementColumnValue(row, family, qualifier, amount, writeToWAL); + } + + @Override + public boolean isAutoFlush() { + return delegate.isAutoFlush(); + } + + @Override + public void flushCommits() throws IOException { + delegate.flushCommits(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @Override + public CoprocessorRpcChannel coprocessorService(byte[] row) { + return delegate.coprocessorService(row); + } + + @Override + public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, + Call<T, R> callable) throws ServiceException, Throwable { + return delegate.coprocessorService(service, startKey, endKey, callable); + } + + @Override + public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, + Call<T, R> callable, Callback<R> callback) throws ServiceException, Throwable { + delegate.coprocessorService(service, startKey, endKey, callable, callback); + } + + @SuppressWarnings("deprecation") + @Override + public void setAutoFlush(boolean autoFlush) { + delegate.setAutoFlush(autoFlush); + } + + @Override + public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) { + delegate.setAutoFlush(autoFlush, clearBufferOnFail); + } + + @Override + public void setAutoFlushTo(boolean autoFlush) { + delegate.setAutoFlushTo(autoFlush); + } + + @Override + public long getWriteBufferSize() { + return delegate.getWriteBufferSize(); + } + + @Override + public void setWriteBufferSize(long writeBufferSize) throws IOException { + delegate.setWriteBufferSize(writeBufferSize); + } + + @Override + public <R extends Message> Map<byte[], R> batchCoprocessorService(MethodDescriptor methodDescriptor, + Message request, byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable { + return delegate.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype); + } + + @Override + public <R extends Message> void batchCoprocessorService(MethodDescriptor methodDescriptor, Message request, + byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback) throws ServiceException, + Throwable { + delegate.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype, callback); + } + + @Override + public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, byte[] value, + RowMutations mutation) throws IOException { + return delegate.checkAndMutate(row, family, qualifier, compareOp, value, mutation); + } + + @Override + public boolean[] existsAll(List<Get> gets) throws IOException { + return delegate.existsAll(gets); + } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Put put) throws IOException { + return delegate.checkAndPut(row, family, qualifier, value, put); + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Delete delete) + throws IOException { + return delegate.checkAndDelete(row, family, qualifier, compareOp, value, delete); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java index 72920b2..cf89380 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java @@ -23,6 +23,7 @@ import static org.apache.phoenix.util.LogUtil.addCustomAnnotations; import java.sql.SQLException; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -66,6 +67,7 @@ import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PArrayDataType; import org.apache.phoenix.schema.types.PBoolean; @@ -75,6 +77,7 @@ import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.SQLCloseables; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; public class HashJoinPlan extends DelegateQueryPlan { private static final Log LOG = LogFactory.getLog(HashJoinPlan.class); @@ -83,6 +86,7 @@ public class HashJoinPlan extends DelegateQueryPlan { private final HashJoinInfo joinInfo; private final SubPlan[] subPlans; private final boolean recompileWhereClause; + private final Set<TableRef> tableRefs; private final int maxServerCacheTimeToLive; private List<SQLCloseable> dependencies; private HashCacheClient hashClient; @@ -114,11 +118,21 @@ public class HashJoinPlan extends DelegateQueryPlan { this.joinInfo = joinInfo; this.subPlans = subPlans; this.recompileWhereClause = recompileWhereClause; + this.tableRefs = Sets.newHashSetWithExpectedSize(subPlans.length + plan.getSourceRefs().size()); + this.tableRefs.addAll(plan.getSourceRefs()); + for (SubPlan subPlan : subPlans) { + tableRefs.addAll(subPlan.getInnerPlan().getSourceRefs()); + } this.maxServerCacheTimeToLive = plan.getContext().getConnection().getQueryServices().getProps().getInt( QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS); } @Override + public Set<TableRef> getSourceRefs() { + return tableRefs; + } + + @Override public ResultIterator iterator() throws SQLException { return iterator(DefaultParallelScanGrouper.getInstance()); } @@ -251,6 +265,7 @@ public class HashJoinPlan extends DelegateQueryPlan { public void postProcess(Object result, HashJoinPlan parent) throws SQLException; public List<String> getPreSteps(HashJoinPlan parent) throws SQLException; public List<String> getPostSteps(HashJoinPlan parent) throws SQLException; + public QueryPlan getInnerPlan(); } public static class WhereClauseSubPlan implements SubPlan { @@ -321,6 +336,11 @@ public class HashJoinPlan extends DelegateQueryPlan { public List<String> getPostSteps(HashJoinPlan parent) throws SQLException { return Collections.<String>emptyList(); } + + @Override + public QueryPlan getInnerPlan() { + return plan; + } } public static class HashSubPlan implements SubPlan { @@ -411,7 +431,12 @@ public class HashJoinPlan extends DelegateQueryPlan { + " IN (" + keyRangeRhsExpression.toString() + ")"; return Collections.<String> singletonList(step); } - + + + @Override + public QueryPlan getInnerPlan() { + return plan; + } } }