http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index 54c688a..65a43de 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -26,7 +26,7 @@ import java.sql.SQLException;
 import java.util.List;
 import java.util.Set;
 
-import com.google.common.collect.Sets;
+import co.cask.tephra.Transaction;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
@@ -58,8 +59,10 @@ import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.TransactionUtil;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 
 /**
@@ -191,6 +194,7 @@ public class ScanRegionObserver extends 
BaseScannerRegionObserver {
         Region dataRegion = null;
         IndexMaintainer indexMaintainer = null;
         byte[][] viewConstants = null;
+        Transaction tx = null;
         ColumnReference[] dataColumns = 
IndexUtil.deserializeDataTableColumnsToJoin(scan);
         if (dataColumns != null) {
             tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns);
@@ -199,14 +203,16 @@ public class ScanRegionObserver extends 
BaseScannerRegionObserver {
             List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? 
null : IndexMaintainer.deserialize(localIndexBytes);
             indexMaintainer = indexMaintainers.get(0);
             viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan);
+            byte[] txState = 
scan.getAttribute(BaseScannerRegionObserver.TX_STATE);
+            tx = MutationState.decodeTransaction(txState);
         }
 
         final TupleProjector p = 
TupleProjector.deserializeProjectorFromScan(scan);
         final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
         innerScanner =
                 getWrappedScanner(c, innerScanner, arrayKVRefs, arrayFuncRefs, 
offset, scan,
-                    dataColumns, tupleProjector, dataRegion, indexMaintainer, 
viewConstants,
-                    kvSchema, kvSchemaBitSet, j == null ? p : null, ptr);
+                    dataColumns, tupleProjector, dataRegion, indexMaintainer, 
tx,
+                    viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : 
null, ptr);
 
         final ImmutableBytesWritable tenantId = ScanUtil.getTenantId(scan);
         if (j != null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
index 2f31b08..9f3bdb4 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
@@ -26,18 +26,17 @@ import 
org.apache.hadoop.hbase.coprocessor.CoprocessorException;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
 import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.cache.TenantCache;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-
 import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
 import 
org.apache.phoenix.coprocessor.generated.ServerCachingProtos.AddServerCacheRequest;
 import 
org.apache.phoenix.coprocessor.generated.ServerCachingProtos.AddServerCacheResponse;
 import 
org.apache.phoenix.coprocessor.generated.ServerCachingProtos.RemoveServerCacheRequest;
 import 
org.apache.phoenix.coprocessor.generated.ServerCachingProtos.RemoveServerCacheResponse;
 import 
org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ServerCachingService;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.protobuf.ProtobufUtil;
+import org.apache.phoenix.util.ByteUtil;
 
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
@@ -66,6 +65,7 @@ public class ServerCachingEndpointImpl extends 
ServerCachingService implements C
     ImmutableBytesWritable cachePtr =
         org.apache.phoenix.protobuf.ProtobufUtil
             .toImmutableBytesWritable(request.getCachePtr());
+    byte[] txState = request.hasTxState() ? request.getTxState().toByteArray() 
: ByteUtil.EMPTY_BYTE_ARRAY;
 
     try {
       @SuppressWarnings("unchecked")
@@ -73,7 +73,7 @@ public class ServerCachingEndpointImpl extends 
ServerCachingService implements C
           (Class<ServerCacheFactory>) 
Class.forName(request.getCacheFactory().getClassName());
       ServerCacheFactory cacheFactory = serverCacheFactoryClass.newInstance();
       tenantCache.addServerCache(new 
ImmutableBytesPtr(request.getCacheId().toByteArray()),
-        cachePtr, cacheFactory);
+        cachePtr, txState, cacheFactory);
     } catch (Throwable e) {
       ProtobufUtil.setControllerException(controller, new IOException(e));
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
index 4fdfe99..b201c8e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingProtocol.java
@@ -36,18 +36,19 @@ import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
  */
 public interface ServerCachingProtocol {
     public static interface ServerCacheFactory extends Writable {
-        public Closeable newCache(ImmutableBytesWritable cachePtr, MemoryChunk 
chunk) throws SQLException;
+        public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] 
txState, MemoryChunk chunk) throws SQLException;
     }
     /**
      * Add the cache to the region server cache.  
      * @param tenantId the tenantId or null if not applicable
      * @param cacheId unique identifier of the cache
      * @param cachePtr pointer to the byte array of the cache
+     * @param txState TODO
      * @param cacheFactory factory that converts from byte array to object 
representation on the server side
      * @return true on success and otherwise throws
      * @throws SQLException 
      */
-    public boolean addServerCache(byte[] tenantId, byte[] cacheId, 
ImmutableBytesWritable cachePtr, ServerCacheFactory cacheFactory) throws 
SQLException;
+    public boolean addServerCache(byte[] tenantId, byte[] cacheId, 
ImmutableBytesWritable cachePtr, byte[] txState, ServerCacheFactory 
cacheFactory) throws SQLException;
     /**
      * Remove the cache from the region server cache.  Called upon completion 
of
      * the operation when cache is no longer needed.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 5133cb2..ffdc05f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -105,6 +105,8 @@ import org.apache.phoenix.util.TimeKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import co.cask.tephra.TxConstants;
+
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -403,6 +405,8 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
                               Delete delete = new Delete(firstKV.getRowArray(),
                                   firstKV.getRowOffset(), 
firstKV.getRowLength(),ts);
                               mutations.add(delete);
+                              // force tephra to ignore this deletes
+                              
delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
                             } else if (isUpsert) {
                                 Arrays.fill(values, null);
                                 int i = 0;
@@ -462,6 +466,8 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
                                         results.get(0).getRowOffset(),
                                         results.get(0).getRowLength());
                                     delete.deleteColumns(deleteCF,  deleteCQ, 
ts);
+                                    // force tephra to ignore this deletes
+                                    
delete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
                                     mutations.add(delete);
                                 }
                             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
index 25f8271..7e71cd9 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/PTableProtos.java
@@ -3218,6 +3218,16 @@ public final class PTableProtos {
      * <code>optional bool rowKeyOrderOptimizable = 26;</code>
      */
     boolean getRowKeyOrderOptimizable();
+
+    // optional bool transactional = 27;
+    /**
+     * <code>optional bool transactional = 27;</code>
+     */
+    boolean hasTransactional();
+    /**
+     * <code>optional bool transactional = 27;</code>
+     */
+    boolean getTransactional();
   }
   /**
    * Protobuf type {@code PTable}
@@ -3418,6 +3428,11 @@ public final class PTableProtos {
               rowKeyOrderOptimizable_ = input.readBool();
               break;
             }
+            case 216: {
+              bitField0_ |= 0x00400000;
+              transactional_ = input.readBool();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -3980,6 +3995,22 @@ public final class PTableProtos {
       return rowKeyOrderOptimizable_;
     }
 
+    // optional bool transactional = 27;
+    public static final int TRANSACTIONAL_FIELD_NUMBER = 27;
+    private boolean transactional_;
+    /**
+     * <code>optional bool transactional = 27;</code>
+     */
+    public boolean hasTransactional() {
+      return ((bitField0_ & 0x00400000) == 0x00400000);
+    }
+    /**
+     * <code>optional bool transactional = 27;</code>
+     */
+    public boolean getTransactional() {
+      return transactional_;
+    }
+
     private void initFields() {
       schemaNameBytes_ = com.google.protobuf.ByteString.EMPTY;
       tableNameBytes_ = com.google.protobuf.ByteString.EMPTY;
@@ -4007,6 +4038,7 @@ public final class PTableProtos {
       storeNulls_ = false;
       baseColumnCount_ = 0;
       rowKeyOrderOptimizable_ = false;
+      transactional_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -4152,6 +4184,9 @@ public final class PTableProtos {
       if (((bitField0_ & 0x00200000) == 0x00200000)) {
         output.writeBool(26, rowKeyOrderOptimizable_);
       }
+      if (((bitField0_ & 0x00400000) == 0x00400000)) {
+        output.writeBool(27, transactional_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -4270,6 +4305,10 @@ public final class PTableProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(26, rowKeyOrderOptimizable_);
       }
+      if (((bitField0_ & 0x00400000) == 0x00400000)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(27, transactional_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -4411,6 +4450,11 @@ public final class PTableProtos {
         result = result && (getRowKeyOrderOptimizable()
             == other.getRowKeyOrderOptimizable());
       }
+      result = result && (hasTransactional() == other.hasTransactional());
+      if (hasTransactional()) {
+        result = result && (getTransactional()
+            == other.getTransactional());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -4528,6 +4572,10 @@ public final class PTableProtos {
         hash = (37 * hash) + ROWKEYORDEROPTIMIZABLE_FIELD_NUMBER;
         hash = (53 * hash) + hashBoolean(getRowKeyOrderOptimizable());
       }
+      if (hasTransactional()) {
+        hash = (37 * hash) + TRANSACTIONAL_FIELD_NUMBER;
+        hash = (53 * hash) + hashBoolean(getTransactional());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -4704,6 +4752,8 @@ public final class PTableProtos {
         bitField0_ = (bitField0_ & ~0x01000000);
         rowKeyOrderOptimizable_ = false;
         bitField0_ = (bitField0_ & ~0x02000000);
+        transactional_ = false;
+        bitField0_ = (bitField0_ & ~0x04000000);
         return this;
       }
 
@@ -4852,6 +4902,10 @@ public final class PTableProtos {
           to_bitField0_ |= 0x00200000;
         }
         result.rowKeyOrderOptimizable_ = rowKeyOrderOptimizable_;
+        if (((from_bitField0_ & 0x04000000) == 0x04000000)) {
+          to_bitField0_ |= 0x00400000;
+        }
+        result.transactional_ = transactional_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -5024,6 +5078,9 @@ public final class PTableProtos {
         if (other.hasRowKeyOrderOptimizable()) {
           setRowKeyOrderOptimizable(other.getRowKeyOrderOptimizable());
         }
+        if (other.hasTransactional()) {
+          setTransactional(other.getTransactional());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -6694,6 +6751,39 @@ public final class PTableProtos {
         return this;
       }
 
+      // optional bool transactional = 27;
+      private boolean transactional_ ;
+      /**
+       * <code>optional bool transactional = 27;</code>
+       */
+      public boolean hasTransactional() {
+        return ((bitField0_ & 0x04000000) == 0x04000000);
+      }
+      /**
+       * <code>optional bool transactional = 27;</code>
+       */
+      public boolean getTransactional() {
+        return transactional_;
+      }
+      /**
+       * <code>optional bool transactional = 27;</code>
+       */
+      public Builder setTransactional(boolean value) {
+        bitField0_ |= 0x04000000;
+        transactional_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bool transactional = 27;</code>
+       */
+      public Builder clearTransactional() {
+        bitField0_ = (bitField0_ & ~0x04000000);
+        transactional_ = false;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:PTable)
     }
 
@@ -6740,7 +6830,7 @@ public final class PTableProtos {
       "leStats\022\013\n\003key\030\001 \002(\014\022\016\n\006values\030\002 
\003(\014\022\033\n\023" +
       "guidePostsByteCount\030\003 \001(\003\022\025\n\rkeyBytesCou",
       "nt\030\004 \001(\003\022\027\n\017guidePostsCount\030\005 
\001(\005\022!\n\013pGu" +
-      "idePosts\030\006 \001(\0132\014.PGuidePosts\"\357\004\n\006PTable\022" +
+      "idePosts\030\006 \001(\0132\014.PGuidePosts\"\206\005\n\006PTable\022" +
       "\027\n\017schemaNameBytes\030\001 \002(\014\022\026\n\016tableNameByt" +
       "es\030\002 \002(\014\022\036\n\ttableType\030\003 
\002(\0162\013.PTableType" +
       "\022\022\n\nindexState\030\004 
\001(\t\022\026\n\016sequenceNumber\030\005" +
@@ -6756,10 +6846,11 @@ public final class PTableProtos {
       "wIndexId\030\025 \001(\005\022\021\n\tindexType\030\026 
\001(\014\022\026\n\016sta" +
       "tsTimeStamp\030\027 \001(\003\022\022\n\nstoreNulls\030\030 
\001(\010\022\027\n" +
       "\017baseColumnCount\030\031 \001(\005\022\036\n\026rowKeyOrderOpt" +
-      "imizable\030\032 \001(\010*A\n\nPTableType\022\n\n\006SYSTEM\020\000" +
-      
"\022\010\n\004USER\020\001\022\010\n\004VIEW\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020"
 +
-      "\004B@\n(org.apache.phoenix.coprocessor.gene",
-      "ratedB\014PTableProtosH\001\210\001\001\240\001\001"
+      "imizable\030\032 \001(\010\022\025\n\rtransactional\030\033 
\001(\010*A\n" +
+      
"\nPTableType\022\n\n\006SYSTEM\020\000\022\010\n\004USER\020\001\022\010\n\004VIE"
 +
+      
"W\020\002\022\t\n\005INDEX\020\003\022\010\n\004JOIN\020\004B@\n(org.apache.p",
+      "hoenix.coprocessor.generatedB\014PTableProt" +
+      "osH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -6783,7 +6874,7 @@ public final class PTableProtos {
           internal_static_PTable_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_PTable_descriptor,
-              new java.lang.String[] { "SchemaNameBytes", "TableNameBytes", 
"TableType", "IndexState", "SequenceNumber", "TimeStamp", "PkNameBytes", 
"BucketNum", "Columns", "Indexes", "IsImmutableRows", "GuidePosts", 
"DataTableNameBytes", "DefaultFamilyName", "DisableWAL", "MultiTenant", 
"ViewType", "ViewStatement", "PhysicalNames", "TenantId", "ViewIndexId", 
"IndexType", "StatsTimeStamp", "StoreNulls", "BaseColumnCount", 
"RowKeyOrderOptimizable", });
+              new java.lang.String[] { "SchemaNameBytes", "TableNameBytes", 
"TableType", "IndexState", "SequenceNumber", "TimeStamp", "PkNameBytes", 
"BucketNum", "Columns", "Indexes", "IsImmutableRows", "GuidePosts", 
"DataTableNameBytes", "DefaultFamilyName", "DisableWAL", "MultiTenant", 
"ViewType", "ViewStatement", "PhysicalNames", "TenantId", "ViewIndexId", 
"IndexType", "StatsTimeStamp", "StoreNulls", "BaseColumnCount", 
"RowKeyOrderOptimizable", "Transactional", });
           return null;
         }
       };

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
index 69db21b..5ee1dfb 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/ServerCachingProtos.java
@@ -699,6 +699,16 @@ public final class ServerCachingProtos {
      * <code>required .ServerCacheFactory cacheFactory = 4;</code>
      */
     
org.apache.phoenix.coprocessor.generated.ServerCacheFactoryProtos.ServerCacheFactoryOrBuilder
 getCacheFactoryOrBuilder();
+
+    // optional bytes txState = 5;
+    /**
+     * <code>optional bytes txState = 5;</code>
+     */
+    boolean hasTxState();
+    /**
+     * <code>optional bytes txState = 5;</code>
+     */
+    com.google.protobuf.ByteString getTxState();
   }
   /**
    * Protobuf type {@code AddServerCacheRequest}
@@ -787,6 +797,11 @@ public final class ServerCachingProtos {
               bitField0_ |= 0x00000008;
               break;
             }
+            case 42: {
+              bitField0_ |= 0x00000010;
+              txState_ = input.readBytes();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -903,11 +918,28 @@ public final class ServerCachingProtos {
       return cacheFactory_;
     }
 
+    // optional bytes txState = 5;
+    public static final int TXSTATE_FIELD_NUMBER = 5;
+    private com.google.protobuf.ByteString txState_;
+    /**
+     * <code>optional bytes txState = 5;</code>
+     */
+    public boolean hasTxState() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    /**
+     * <code>optional bytes txState = 5;</code>
+     */
+    public com.google.protobuf.ByteString getTxState() {
+      return txState_;
+    }
+
     private void initFields() {
       tenantId_ = com.google.protobuf.ByteString.EMPTY;
       cacheId_ = com.google.protobuf.ByteString.EMPTY;
       cachePtr_ = 
org.apache.phoenix.coprocessor.generated.ServerCachingProtos.ImmutableBytesWritable.getDefaultInstance();
       cacheFactory_ = 
org.apache.phoenix.coprocessor.generated.ServerCacheFactoryProtos.ServerCacheFactory.getDefaultInstance();
+      txState_ = com.google.protobuf.ByteString.EMPTY;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -953,6 +985,9 @@ public final class ServerCachingProtos {
       if (((bitField0_ & 0x00000008) == 0x00000008)) {
         output.writeMessage(4, cacheFactory_);
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeBytes(5, txState_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -978,6 +1013,10 @@ public final class ServerCachingProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(4, cacheFactory_);
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(5, txState_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -1021,6 +1060,11 @@ public final class ServerCachingProtos {
         result = result && getCacheFactory()
             .equals(other.getCacheFactory());
       }
+      result = result && (hasTxState() == other.hasTxState());
+      if (hasTxState()) {
+        result = result && getTxState()
+            .equals(other.getTxState());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -1050,6 +1094,10 @@ public final class ServerCachingProtos {
         hash = (37 * hash) + CACHEFACTORY_FIELD_NUMBER;
         hash = (53 * hash) + getCacheFactory().hashCode();
       }
+      if (hasTxState()) {
+        hash = (37 * hash) + TXSTATE_FIELD_NUMBER;
+        hash = (53 * hash) + getTxState().hashCode();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -1177,6 +1225,8 @@ public final class ServerCachingProtos {
           cacheFactoryBuilder_.clear();
         }
         bitField0_ = (bitField0_ & ~0x00000008);
+        txState_ = com.google.protobuf.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000010);
         return this;
       }
 
@@ -1229,6 +1279,10 @@ public final class ServerCachingProtos {
         } else {
           result.cacheFactory_ = cacheFactoryBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        result.txState_ = txState_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -1257,6 +1311,9 @@ public final class ServerCachingProtos {
         if (other.hasCacheFactory()) {
           mergeCacheFactory(other.getCacheFactory());
         }
+        if (other.hasTxState()) {
+          setTxState(other.getTxState());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1610,6 +1667,42 @@ public final class ServerCachingProtos {
         return cacheFactoryBuilder_;
       }
 
+      // optional bytes txState = 5;
+      private com.google.protobuf.ByteString txState_ = 
com.google.protobuf.ByteString.EMPTY;
+      /**
+       * <code>optional bytes txState = 5;</code>
+       */
+      public boolean hasTxState() {
+        return ((bitField0_ & 0x00000010) == 0x00000010);
+      }
+      /**
+       * <code>optional bytes txState = 5;</code>
+       */
+      public com.google.protobuf.ByteString getTxState() {
+        return txState_;
+      }
+      /**
+       * <code>optional bytes txState = 5;</code>
+       */
+      public Builder setTxState(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000010;
+        txState_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bytes txState = 5;</code>
+       */
+      public Builder clearTxState() {
+        bitField0_ = (bitField0_ & ~0x00000010);
+        txState_ = getDefaultInstance().getTxState();
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:AddServerCacheRequest)
     }
 
@@ -3383,20 +3476,21 @@ public final class ServerCachingProtos {
       "\n\032ServerCachingService.proto\032\030ServerCach" +
       "eFactory.proto\"K\n\026ImmutableBytesWritable" +
       "\022\021\n\tbyteArray\030\001 \002(\014\022\016\n\006offset\030\002 
\002(\005\022\016\n\006l" +
-      "ength\030\003 \002(\005\"\220\001\n\025AddServerCacheRequest\022\020\n" 
+
+      "ength\030\003 \002(\005\"\241\001\n\025AddServerCacheRequest\022\020\n" 
+
       "\010tenantId\030\001 \001(\014\022\017\n\007cacheId\030\002 
\002(\014\022)\n\010cach" +
       "ePtr\030\003 \002(\0132\027.ImmutableBytesWritable\022)\n\014c" +
-      "acheFactory\030\004 \002(\0132\023.ServerCacheFactory\"(" +
-      "\n\026AddServerCacheResponse\022\016\n\006return\030\001 \002(\010" +
-      "\"=\n\030RemoveServerCacheRequest\022\020\n\010tenantId" +
-      "\030\001 \001(\014\022\017\n\007cacheId\030\002 
\002(\014\"+\n\031RemoveServerC",
-      "acheResponse\022\016\n\006return\030\001 
\002(\0102\245\001\n\024ServerC" +
-      "achingService\022A\n\016addServerCache\022\026.AddSer" +
-      "verCacheRequest\032\027.AddServerCacheResponse" +
-      "\022J\n\021removeServerCache\022\031.RemoveServerCach" +
-      "eRequest\032\032.RemoveServerCacheResponseBG\n(" +
-      "org.apache.phoenix.coprocessor.generated" +
-      "B\023ServerCachingProtosH\001\210\001\001\240\001\001"
+      "acheFactory\030\004 \002(\0132\023.ServerCacheFactory\022\017" +
+      "\n\007txState\030\005 \001(\014\"(\n\026AddServerCacheRespons" +
+      "e\022\016\n\006return\030\001 \002(\010\"=\n\030RemoveServerCacheRe" +
+      "quest\022\020\n\010tenantId\030\001 
\001(\014\022\017\n\007cacheId\030\002 \002(\014",
+      "\"+\n\031RemoveServerCacheResponse\022\016\n\006return\030" +
+      "\001 \002(\0102\245\001\n\024ServerCachingService\022A\n\016addSer" +
+      "verCache\022\026.AddServerCacheRequest\032\027.AddSe" +
+      "rverCacheResponse\022J\n\021removeServerCache\022\031" +
+      ".RemoveServerCacheRequest\032\032.RemoveServer" +
+      "CacheResponseBG\n(org.apache.phoenix.copr" +
+      "ocessor.generatedB\023ServerCachingProtosH\001" +
+      "\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -3414,7 +3508,7 @@ public final class ServerCachingProtos {
           internal_static_AddServerCacheRequest_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_AddServerCacheRequest_descriptor,
-              new java.lang.String[] { "TenantId", "CacheId", "CachePtr", 
"CacheFactory", });
+              new java.lang.String[] { "TenantId", "CacheId", "CachePtr", 
"CacheFactory", "TxState", });
           internal_static_AddServerCacheResponse_descriptor =
             getDescriptor().getMessageTypes().get(2);
           internal_static_AddServerCacheResponse_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index bb76ccb..3613c95 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -159,6 +159,12 @@ public enum SQLExceptionCode {
      AGGREGATE_EXPRESSION_NOT_ALLOWED_IN_INDEX(520, "42897", "Aggregate 
expression not allowed in an index"),
      NON_DETERMINISTIC_EXPRESSION_NOT_ALLOWED_IN_INDEX(521, "42898", 
"Non-deterministic expression not allowed in an index"),
      STATELESS_EXPRESSION_NOT_ALLOWED_IN_INDEX(522, "42899", "Stateless 
expression not allowed in an index"),
+     
+     /**
+      *  Transaction exceptions.
+      */
+     TRANSACTION_CONFLICT_EXCEPTION(523, "42900", "Transaction aborted due to 
conflict with other mutations"),
+     TRANSACTION_EXCEPTION(524, "42901", "Transaction aborted due to error"),
 
      /**
       * Union All related errors
@@ -258,7 +264,18 @@ public enum SQLExceptionCode {
     NO_LOCAL_INDEXES(1054, "43A11", "Local secondary indexes are not supported 
for HBase versions " +
         
MetaDataUtil.decodeHBaseVersionAsString(PhoenixDatabaseMetaData.MIN_LOCAL_SI_VERSION_DISALLOW)
 + " through " + 
MetaDataUtil.decodeHBaseVersionAsString(PhoenixDatabaseMetaData.MAX_LOCAL_SI_VERSION_DISALLOW)
 + " inclusive."),
     UNALLOWED_LOCAL_INDEXES(1055, "43A12", "Local secondary indexes are 
configured to not be allowed."),
+    
     DESC_VARBINARY_NOT_SUPPORTED(1056, "43A13", "Descending VARBINARY columns 
not supported"),
+    
+    DEFAULT_COLUMN_FAMILY_ON_SHARED_TABLE(1069, "43A13", "Default column 
family not allowed on VIEW or shared INDEX"),
+    ONLY_TABLE_MAY_BE_DECLARED_TRANSACTIONAL(1070, "44A01", "Only tables may 
be declared as transactional"),
+    TX_MAY_NOT_SWITCH_TO_NON_TX(1071, "44A02", "A transactional table may not 
be switched to non transactional"),
+       STORE_NULLS_MUST_BE_TRUE_FOR_TRANSACTIONAL(1072, "44A03", "Store nulls 
must be true when a table is transactional"),
+    CANNOT_START_TRANSACTION_WITH_SCN_SET(1073, "44A04", "Cannot start a 
transaction on a connection with SCN set"),
+    TX_MAX_VERSIONS_MUST_BE_GREATER_THAN_ONE(1074, "44A05", "A transactional 
table must define VERSION of greater than one"),
+    CANNOT_SPECIFY_SCN_FOR_TXN_TABLE(1075, "44A06", "Cannot use a connection 
with SCN set for a transactional table"),
+    NULL_TRANSACTION_CONTEXT(1076, "44A07", "No Tranasction Context 
available"),
+    TRANSACTION_FAILED(1077, "44A08", "Transaction Failure "),
 
     /** Sequence related */
     SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new 
Factory() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index e292dd8..0bdb65a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -223,23 +223,26 @@ public abstract class BaseQueryPlan implements QueryPlan {
         if (table.getType() != PTableType.SYSTEM) {
             scan.setConsistency(connection.getConsistency());
         }
-        // Get the time range of row_timestamp column
-        TimeRange rowTimestampRange = 
context.getScanRanges().getRowTimestampRange();
-        // Get the already existing time range on the scan.
-        TimeRange scanTimeRange = scan.getTimeRange();
-        Long scn = connection.getSCN();
-        if (scn == null) {
-            scn = context.getCurrentTime();
-        }
-        try {
-            TimeRange timeRangeToUse = 
ScanUtil.intersectTimeRange(rowTimestampRange, scanTimeRange, scn);
-            if (timeRangeToUse == null) {
-                return ResultIterator.EMPTY_ITERATOR;
-            }
-            scan.setTimeRange(timeRangeToUse.getMin(), 
timeRangeToUse.getMax());
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
+        // TODO fix this in PHOENIX-2415 Support ROW_TIMESTAMP with 
transactional tables
+        if (!table.isTransactional()) {
+                       // Get the time range of row_timestamp column
+               TimeRange rowTimestampRange = 
context.getScanRanges().getRowTimestampRange();
+               // Get the already existing time range on the scan.
+               TimeRange scanTimeRange = scan.getTimeRange();
+               Long scn = connection.getSCN();
+               if (scn == null) {
+                   scn = context.getCurrentTime();
+               }
+               try {
+                   TimeRange timeRangeToUse = 
ScanUtil.intersectTimeRange(rowTimestampRange, scanTimeRange, scn);
+                   if (timeRangeToUse == null) {
+                       return ResultIterator.EMPTY_ITERATOR;
+                   }
+                   scan.setTimeRange(timeRangeToUse.getMin(), 
timeRangeToUse.getMax());
+               } catch (IOException e) {
+                   throw new RuntimeException(e);
+               }
+           }
         byte[] tenantIdBytes;
         if( table.isMultiTenant() == true ) {
             tenantIdBytes = connection.getTenantId() == null ? null :
@@ -326,6 +329,9 @@ public abstract class BaseQueryPlan implements QueryPlan {
         ImmutableBytesWritable ptr = new ImmutableBytesWritable();
         IndexMaintainer.serialize(dataTable, ptr, indexes, 
context.getConnection());
         scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD, 
ByteUtil.copyKeyBytesIfNecessary(ptr));
+        if (dataTable.isTransactional()) {
+            scan.setAttribute(BaseScannerRegionObserver.TX_STATE, 
context.getConnection().getMutationState().encodeTransaction());
+        }
     }
 
     private void serializeViewConstantsIntoScan(Scan scan, PTable dataTable) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTable.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTable.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTable.java
new file mode 100644
index 0000000..6b3f9ca
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTable.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+
+public class DelegateHTable implements HTableInterface {
+    protected final HTableInterface delegate;
+
+    public DelegateHTable(HTableInterface delegate) {
+        this.delegate = delegate;
+    }
+
+    @Override
+    public byte[] getTableName() {
+        return delegate.getTableName();
+    }
+
+    @Override
+    public TableName getName() {
+        return delegate.getName();
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+        return delegate.getConfiguration();
+    }
+
+    @Override
+    public HTableDescriptor getTableDescriptor() throws IOException {
+        return delegate.getTableDescriptor();
+    }
+
+    @Override
+    public boolean exists(Get get) throws IOException {
+        return delegate.exists(get);
+    }
+
+    @Override
+    public Boolean[] exists(List<Get> gets) throws IOException {
+        return delegate.exists(gets);
+    }
+
+    @Override
+    public void batch(List<? extends Row> actions, Object[] results) throws 
IOException, InterruptedException {
+        delegate.batch(actions, results);
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public Object[] batch(List<? extends Row> actions) throws IOException, 
InterruptedException {
+        return delegate.batch(actions);
+    }
+
+    @Override
+    public <R> void batchCallback(List<? extends Row> actions, Object[] 
results, Callback<R> callback)
+            throws IOException, InterruptedException {
+        delegate.batchCallback(actions, results, callback);
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public <R> Object[] batchCallback(List<? extends Row> actions, Callback<R> 
callback) throws IOException,
+            InterruptedException {
+        return delegate.batchCallback(actions, callback);
+    }
+
+    @Override
+    public Result get(Get get) throws IOException {
+        return delegate.get(get);
+    }
+
+    @Override
+    public Result[] get(List<Get> gets) throws IOException {
+        return delegate.get(gets);
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public Result getRowOrBefore(byte[] row, byte[] family) throws IOException 
{
+        return delegate.getRowOrBefore(row, family);
+    }
+
+    @Override
+    public ResultScanner getScanner(Scan scan) throws IOException {
+        return delegate.getScanner(scan);
+    }
+
+    @Override
+    public ResultScanner getScanner(byte[] family) throws IOException {
+        return delegate.getScanner(family);
+    }
+
+    @Override
+    public ResultScanner getScanner(byte[] family, byte[] qualifier) throws 
IOException {
+        return delegate.getScanner(family, qualifier);
+    }
+
+    @Override
+    public void put(Put put) throws IOException {
+        delegate.put(put);
+    }
+
+    @Override
+    public void put(List<Put> puts) throws IOException {
+        delegate.put(puts);
+    }
+
+    @Override
+    public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, 
byte[] value, Put put) throws IOException {
+        return delegate.checkAndPut(row, family, qualifier, value, put);
+    }
+
+    @Override
+    public void delete(Delete delete) throws IOException {
+        delegate.delete(delete);
+    }
+
+    @Override
+    public void delete(List<Delete> deletes) throws IOException {
+        delegate.delete(deletes);
+    }
+
+    @Override
+    public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, 
byte[] value, Delete delete)
+            throws IOException {
+        return delegate.checkAndDelete(row, family, qualifier, value, delete);
+    }
+
+    @Override
+    public void mutateRow(RowMutations rm) throws IOException {
+        delegate.mutateRow(rm);
+    }
+
+    @Override
+    public Result append(Append append) throws IOException {
+        return delegate.append(append);
+    }
+
+    @Override
+    public Result increment(Increment increment) throws IOException {
+        return delegate.increment(increment);
+    }
+
+    @Override
+    public long incrementColumnValue(byte[] row, byte[] family, byte[] 
qualifier, long amount) throws IOException {
+        return delegate.incrementColumnValue(row, family, qualifier, amount);
+    }
+
+    @Override
+    public long incrementColumnValue(byte[] row, byte[] family, byte[] 
qualifier, long amount, Durability durability)
+            throws IOException {
+        return delegate.incrementColumnValue(row, family, qualifier, amount, 
durability);
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public long incrementColumnValue(byte[] row, byte[] family, byte[] 
qualifier, long amount, boolean writeToWAL)
+            throws IOException {
+        return delegate.incrementColumnValue(row, family, qualifier, amount, 
writeToWAL);
+    }
+
+    @Override
+    public boolean isAutoFlush() {
+        return delegate.isAutoFlush();
+    }
+
+    @Override
+    public void flushCommits() throws IOException {
+        delegate.flushCommits();
+    }
+
+    @Override
+    public void close() throws IOException {
+        delegate.close();
+    }
+
+    @Override
+    public CoprocessorRpcChannel coprocessorService(byte[] row) {
+        return delegate.coprocessorService(row);
+    }
+
+    @Override
+    public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> 
service, byte[] startKey, byte[] endKey,
+            Call<T, R> callable) throws ServiceException, Throwable {
+        return delegate.coprocessorService(service, startKey, endKey, 
callable);
+    }
+
+    @Override
+    public <T extends Service, R> void coprocessorService(Class<T> service, 
byte[] startKey, byte[] endKey,
+            Call<T, R> callable, Callback<R> callback) throws 
ServiceException, Throwable {
+        delegate.coprocessorService(service, startKey, endKey, callable, 
callback);
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public void setAutoFlush(boolean autoFlush) {
+        delegate.setAutoFlush(autoFlush);
+    }
+
+    @Override
+    public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
+        delegate.setAutoFlush(autoFlush, clearBufferOnFail);
+    }
+
+    @Override
+    public void setAutoFlushTo(boolean autoFlush) {
+        delegate.setAutoFlushTo(autoFlush);
+    }
+
+    @Override
+    public long getWriteBufferSize() {
+        return delegate.getWriteBufferSize();
+    }
+
+    @Override
+    public void setWriteBufferSize(long writeBufferSize) throws IOException {
+        delegate.setWriteBufferSize(writeBufferSize);
+    }
+
+    @Override
+    public <R extends Message> Map<byte[], R> 
batchCoprocessorService(MethodDescriptor methodDescriptor,
+            Message request, byte[] startKey, byte[] endKey, R 
responsePrototype) throws ServiceException, Throwable {
+        return delegate.batchCoprocessorService(methodDescriptor, request, 
startKey, endKey, responsePrototype);
+    }
+
+    @Override
+    public <R extends Message> void batchCoprocessorService(MethodDescriptor 
methodDescriptor, Message request,
+            byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> 
callback) throws ServiceException,
+            Throwable {
+        delegate.batchCoprocessorService(methodDescriptor, request, startKey, 
endKey, responsePrototype, callback);
+    }
+
+    @Override
+    public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, 
CompareOp compareOp, byte[] value,
+            RowMutations mutation) throws IOException {
+        return delegate.checkAndMutate(row, family, qualifier, compareOp, 
value, mutation);
+    }
+
+       @Override
+       public boolean[] existsAll(List<Get> gets) throws IOException {
+               return delegate.existsAll(gets);
+       }
+
+       @Override
+       public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
+                       CompareOp compareOp, byte[] value, Put put) throws 
IOException {
+               return delegate.checkAndPut(row, family, qualifier, value, put);
+       }
+
+       @Override
+       public boolean checkAndDelete(byte[] row, byte[] family, byte[] 
qualifier,
+                       CompareOp compareOp, byte[] value, Delete delete)
+                       throws IOException {
+               return delegate.checkAndDelete(row, family, qualifier, 
compareOp, value, delete);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 72920b2..cf89380 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -23,6 +23,7 @@ import static 
org.apache.phoenix.util.LogUtil.addCustomAnnotations;
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -66,6 +67,7 @@ import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PArrayDataType;
 import org.apache.phoenix.schema.types.PBoolean;
@@ -75,6 +77,7 @@ import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.SQLCloseables;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 public class HashJoinPlan extends DelegateQueryPlan {
     private static final Log LOG = LogFactory.getLog(HashJoinPlan.class);
@@ -83,6 +86,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
     private final HashJoinInfo joinInfo;
     private final SubPlan[] subPlans;
     private final boolean recompileWhereClause;
+    private final Set<TableRef> tableRefs;
     private final int maxServerCacheTimeToLive;
     private List<SQLCloseable> dependencies;
     private HashCacheClient hashClient;
@@ -114,11 +118,21 @@ public class HashJoinPlan extends DelegateQueryPlan {
         this.joinInfo = joinInfo;
         this.subPlans = subPlans;
         this.recompileWhereClause = recompileWhereClause;
+        this.tableRefs = Sets.newHashSetWithExpectedSize(subPlans.length + 
plan.getSourceRefs().size());
+        this.tableRefs.addAll(plan.getSourceRefs());
+        for (SubPlan subPlan : subPlans) {
+            tableRefs.addAll(subPlan.getInnerPlan().getSourceRefs());
+        }
         this.maxServerCacheTimeToLive = 
plan.getContext().getConnection().getQueryServices().getProps().getInt(
                 QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, 
QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
     }
     
     @Override
+    public Set<TableRef> getSourceRefs() {
+        return tableRefs;
+    }
+    
+    @Override
     public ResultIterator iterator() throws SQLException {
        return iterator(DefaultParallelScanGrouper.getInstance());
     }
@@ -251,6 +265,7 @@ public class HashJoinPlan extends DelegateQueryPlan {
         public void postProcess(Object result, HashJoinPlan parent) throws 
SQLException;
         public List<String> getPreSteps(HashJoinPlan parent) throws 
SQLException;
         public List<String> getPostSteps(HashJoinPlan parent) throws 
SQLException;
+        public QueryPlan getInnerPlan();
     }
     
     public static class WhereClauseSubPlan implements SubPlan {
@@ -321,6 +336,11 @@ public class HashJoinPlan extends DelegateQueryPlan {
         public List<String> getPostSteps(HashJoinPlan parent) throws 
SQLException {
             return Collections.<String>emptyList();
         }
+
+        @Override
+        public QueryPlan getInnerPlan() {
+            return plan;
+        }
     }
     
     public static class HashSubPlan implements SubPlan {        
@@ -411,7 +431,12 @@ public class HashJoinPlan extends DelegateQueryPlan {
                     + " IN (" + keyRangeRhsExpression.toString() + ")";
             return Collections.<String> singletonList(step);
         }
-        
+
+
+        @Override
+        public QueryPlan getInnerPlan() {
+            return plan;
+        }
     }
 }
 

Reply via email to