http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
deleted file mode 100644
index deceac6..0000000
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java
+++ /dev/null
@@ -1,798 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.transaction;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CompareOperator;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Append;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
-import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.client.RowMutations;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableBuilder;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.client.coprocessor.Batch;
-import org.apache.hadoop.hbase.filter.CompareFilter;
-import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
-
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.Message;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
-
-public interface PhoenixTransactionalTable extends Table {
-    /**
-     * Gets the fully qualified table name instance of this table.
-     */
-    TableName getName();
-
-    /**
-     * Returns the {@link org.apache.hadoop.conf.Configuration} object used by 
this instance.
-     * <p>
-     * The reference returned is not a copy, so any change made to it will
-     * affect this instance.
-     */
-    Configuration getConfiguration();
-
-    /**
-     * Gets the {@link org.apache.hadoop.hbase.HTableDescriptor table 
descriptor} for this table.
-     * @throws java.io.IOException if a remote or network exception occurs.
-     * @deprecated since 2.0 version and will be removed in 3.0 version.
-     *             use {@link #getDescriptor()}
-     */
-    @Deprecated
-    HTableDescriptor getTableDescriptor() throws IOException;
-
-    /**
-     * Gets the {@link org.apache.hadoop.hbase.client.TableDescriptor table 
descriptor} for this table.
-     * @throws java.io.IOException if a remote or network exception occurs.
-     */
-    TableDescriptor getDescriptor() throws IOException;
-
-    /**
-     * Test for the existence of columns in the table, as specified by the Get.
-     * <p>
-     *
-     * This will return true if the Get matches one or more keys, false if not.
-     * <p>
-     *
-     * This is a server-side call so it prevents any data from being 
transfered to
-     * the client.
-     *
-     * @param get the Get
-     * @return true if the specified Get matches one or more keys, false if not
-     * @throws IOException e
-     */
-    boolean exists(Get get) throws IOException;
-
-    /**
-     * Test for the existence of columns in the table, as specified by the 
Gets.
-     * <p>
-     *
-     * This will return an array of booleans. Each value will be true if the 
related Get matches
-     * one or more keys, false if not.
-     * <p>
-     *
-     * This is a server-side call so it prevents any data from being 
transferred to
-     * the client.
-     *
-     * @param gets the Gets
-     * @return Array of boolean.  True if the specified Get matches one or 
more keys, false if not.
-     * @throws IOException e
-     */
-    boolean[] exists(List<Get> gets) throws IOException;
-
-    /**
-     * Test for the existence of columns in the table, as specified by the 
Gets.
-     * This will return an array of booleans. Each value will be true if the 
related Get matches
-     * one or more keys, false if not.
-     * This is a server-side call so it prevents any data from being 
transferred to
-     * the client.
-     *
-     * @param gets the Gets
-     * @return Array of boolean.  True if the specified Get matches one or 
more keys, false if not.
-     * @throws IOException e
-     * @deprecated since 2.0 version and will be removed in 3.0 version.
-     *             use {@link #exists(List)}
-     */
-    @Deprecated
-    default boolean[] existsAll(List<Get> gets) throws IOException {
-      return exists(gets);
-    }
-
-    /**
-     * Method that does a batch call on Deletes, Gets, Puts, Increments, 
Appends, RowMutations.
-     * The ordering of execution of the actions is not defined. Meaning if you 
do a Put and a
-     * Get in the same {@link #batch} call, you will not necessarily be
-     * guaranteed that the Get returns what the Put had put.
-     *
-     * @param actions list of Get, Put, Delete, Increment, Append, 
RowMutations.
-     * @param results Empty Object[], same size as actions. Provides access to 
partial
-     *                results, in case an exception is thrown. A null in the 
result array means that
-     *                the call for that action failed, even after retries. The 
order of the objects
-     *                in the results array corresponds to the order of actions 
in the request list.
-     * @throws IOException
-     * @since 0.90.0
-     */
-    void batch(final List<? extends Row> actions, final Object[] results) 
throws IOException,
-      InterruptedException;
-
-    /**
-     * Same as {@link #batch(List, Object[])}, but with a callback.
-     * @since 0.96.0
-     */
-    <R> void batchCallback(
-      final List<? extends Row> actions, final Object[] results, final 
Batch.Callback<R> callback
-    ) throws IOException, InterruptedException;
-
-    /**
-     * Extracts certain cells from a given row.
-     * @param get The object that specifies what data to fetch and from which 
row.
-     * @return The data coming from the specified row, if it exists.  If the 
row
-     * specified doesn't exist, the {@link Result} instance returned won't
-     * contain any {@link org.apache.hadoop.hbase.KeyValue}, as indicated by 
{@link Result#isEmpty()}.
-     * @throws IOException if a remote or network exception occurs.
-     * @since 0.20.0
-     */
-    Result get(Get get) throws IOException;
-
-    /**
-     * Extracts specified cells from the given rows, as a batch.
-     *
-     * @param gets The objects that specify what data to fetch and from which 
rows.
-     * @return The data coming from the specified rows, if it exists.  If the 
row specified doesn't
-     * exist, the {@link Result} instance returned won't contain any {@link
-     * org.apache.hadoop.hbase.Cell}s, as indicated by {@link 
Result#isEmpty()}. If there are any
-     * failures even after retries, there will be a <code>null</code> in the 
results' array for those
-     * Gets, AND an exception will be thrown. The ordering of the Result array 
corresponds to the order
-     * of the list of passed in Gets.
-     * @throws IOException if a remote or network exception occurs.
-     * @since 0.90.0
-     * @apiNote {@link #put(List)} runs pre-flight validations on the input 
list on client.
-     * Currently {@link #get(List)} doesn't run any validations on the 
client-side, currently there
-     * is no need, but this may change in the future. An
-     * {@link IllegalArgumentException} will be thrown in this case.
-     */
-    Result[] get(List<Get> gets) throws IOException;
-
-    /**
-     * Returns a scanner on the current table as specified by the {@link Scan}
-     * object.
-     * Note that the passed {@link Scan}'s start row and caching properties
-     * maybe changed.
-     *
-     * @param scan A configured {@link Scan} object.
-     * @return A scanner.
-     * @throws IOException if a remote or network exception occurs.
-     * @since 0.20.0
-     */
-    ResultScanner getScanner(Scan scan) throws IOException;
-
-    /**
-     * Gets a scanner on the current table for the given family.
-     *
-     * @param family The column family to scan.
-     * @return A scanner.
-     * @throws IOException if a remote or network exception occurs.
-     * @since 0.20.0
-     */
-    ResultScanner getScanner(byte[] family) throws IOException;
-
-    /**
-     * Gets a scanner on the current table for the given family and qualifier.
-     *
-     * @param family The column family to scan.
-     * @param qualifier The column qualifier to scan.
-     * @return A scanner.
-     * @throws IOException if a remote or network exception occurs.
-     * @since 0.20.0
-     */
-    ResultScanner getScanner(byte[] family, byte[] qualifier) throws 
IOException;
-
-
-    /**
-     * Puts some data in the table.
-     *
-     * @param put The data to put.
-     * @throws IOException if a remote or network exception occurs.
-     * @since 0.20.0
-     */
-    void put(Put put) throws IOException;
-
-    /**
-     * Batch puts the specified data into the table.
-     * <p>
-     * This can be used for group commit, or for submitting user defined 
batches. Before sending
-     * a batch of mutations to the server, the client runs a few validations 
on the input list. If an
-     * error is found, for example, a mutation was supplied but was missing 
it's column an
-     * {@link IllegalArgumentException} will be thrown and no mutations will 
be applied. If there
-     * are any failures even after retries, a {@link 
RetriesExhaustedWithDetailsException} will be
-     * thrown. RetriesExhaustedWithDetailsException contains lists of failed 
mutations and
-     * corresponding remote exceptions. The ordering of mutations and 
exceptions in the
-     * encapsulating exception corresponds to the order of the input list of 
Put requests.
-     *
-     * @param puts The list of mutations to apply.
-     * @throws IOException if a remote or network exception occurs.
-     * @since 0.20.0
-     */
-    void put(List<Put> puts) throws IOException;
-
-    /**
-     * Atomically checks if a row/family/qualifier value matches the expected
-     * value. If it does, it adds the put.  If the passed value is null, the 
check
-     * is for the lack of column (ie: non-existance)
-     *
-     * @param row to check
-     * @param family column family to check
-     * @param qualifier column qualifier to check
-     * @param value the expected value
-     * @param put data to put if check succeeds
-     * @throws IOException e
-     * @return true if the new put was executed, false otherwise
-     */
-    boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
-      byte[] value, Put put) throws IOException;
-
-    /**
-     * Atomically checks if a row/family/qualifier value matches the expected
-     * value. If it does, it adds the put.  If the passed value is null, the 
check
-     * is for the lack of column (ie: non-existence)
-     *
-     * The expected value argument of this call is on the left and the current
-     * value of the cell is on the right side of the comparison operator.
-     *
-     * Ie. eg. GREATER operator means expected value > existing <=> add the 
put.
-     *
-     * @param row to check
-     * @param family column family to check
-     * @param qualifier column qualifier to check
-     * @param compareOp comparison operator to use
-     * @param value the expected value
-     * @param put data to put if check succeeds
-     * @throws IOException e
-     * @return true if the new put was executed, false otherwise
-     * @deprecated Since 2.0.0. Will be removed in 3.0.0. Use
-     *  {@link #checkAndPut(byte[], byte[], byte[], CompareOperator, byte[], 
Put)}}
-     */
-    @Deprecated
-    boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
-      CompareFilter.CompareOp compareOp, byte[] value, Put put) throws 
IOException;
-
-    /**
-     * Atomically checks if a row/family/qualifier value matches the expected
-     * value. If it does, it adds the put.  If the passed value is null, the 
check
-     * is for the lack of column (ie: non-existence)
-     *
-     * The expected value argument of this call is on the left and the current
-     * value of the cell is on the right side of the comparison operator.
-     *
-     * Ie. eg. GREATER operator means expected value > existing <=> add the 
put.
-     *
-     * @param row to check
-     * @param family column family to check
-     * @param qualifier column qualifier to check
-     * @param op comparison operator to use
-     * @param value the expected value
-     * @param put data to put if check succeeds
-     * @throws IOException e
-     * @return true if the new put was executed, false otherwise
-     */
-    boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
-                        CompareOperator op, byte[] value, Put put) throws 
IOException;
-
-    /**
-     * Deletes the specified cells/row.
-     *
-     * @param delete The object that specifies what to delete.
-     * @throws IOException if a remote or network exception occurs.
-     * @since 0.20.0
-     */
-    void delete(Delete delete) throws IOException;
-
-    /**
-     * Batch Deletes the specified cells/rows from the table.
-     * <p>
-     * If a specified row does not exist, {@link Delete} will report as though 
sucessful
-     * delete; no exception will be thrown. If there are any failures even 
after retries,
-     * a * {@link RetriesExhaustedWithDetailsException} will be thrown.
-     * RetriesExhaustedWithDetailsException contains lists of failed {@link 
Delete}s and
-     * corresponding remote exceptions.
-     *
-     * @param deletes List of things to delete. The input list gets modified 
by this
-     * method. All successfully applied {@link Delete}s in the list are 
removed (in particular it
-     * gets re-ordered, so the order in which the elements are inserted in the 
list gives no
-     * guarantee as to the order in which the {@link Delete}s are executed).
-     * @throws IOException if a remote or network exception occurs. In that 
case
-     * the {@code deletes} argument will contain the {@link Delete} instances
-     * that have not be successfully applied.
-     * @since 0.20.1
-     * @apiNote In 3.0.0 version, the input list {@code deletes} will no 
longer be modified. Also,
-     * {@link #put(List)} runs pre-flight validations on the input list on 
client. Currently
-     * {@link #delete(List)} doesn't run validations on the client, there is 
no need currently,
-     * but this may change in the future. An * {@link 
IllegalArgumentException} will be thrown
-     * in this case.
-     */
-    void delete(List<Delete> deletes) throws IOException;
-
-    /**
-     * Atomically checks if a row/family/qualifier value matches the expected
-     * value. If it does, it adds the delete.  If the passed value is null, the
-     * check is for the lack of column (ie: non-existance)
-     *
-     * @param row to check
-     * @param family column family to check
-     * @param qualifier column qualifier to check
-     * @param value the expected value
-     * @param delete data to delete if check succeeds
-     * @throws IOException e
-     * @return true if the new delete was executed, false otherwise
-     */
-    boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
-      byte[] value, Delete delete) throws IOException;
-
-    /**
-     * Atomically checks if a row/family/qualifier value matches the expected
-     * value. If it does, it adds the delete.  If the passed value is null, the
-     * check is for the lack of column (ie: non-existence)
-     *
-     * The expected value argument of this call is on the left and the current
-     * value of the cell is on the right side of the comparison operator.
-     *
-     * Ie. eg. GREATER operator means expected value > existing <=> add the 
delete.
-     *
-     * @param row to check
-     * @param family column family to check
-     * @param qualifier column qualifier to check
-     * @param compareOp comparison operator to use
-     * @param value the expected value
-     * @param delete data to delete if check succeeds
-     * @throws IOException e
-     * @return true if the new delete was executed, false otherwise
-     * @deprecated Since 2.0.0. Will be removed in 3.0.0. Use
-     *  {@link #checkAndDelete(byte[], byte[], byte[], byte[], Delete)}
-     */
-    @Deprecated
-    boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
-      CompareFilter.CompareOp compareOp, byte[] value, Delete delete) throws 
IOException;
-
-    /**
-     * Atomically checks if a row/family/qualifier value matches the expected
-     * value. If it does, it adds the delete.  If the passed value is null, the
-     * check is for the lack of column (ie: non-existence)
-     *
-     * The expected value argument of this call is on the left and the current
-     * value of the cell is on the right side of the comparison operator.
-     *
-     * Ie. eg. GREATER operator means expected value > existing <=> add the 
delete.
-     *
-     * @param row to check
-     * @param family column family to check
-     * @param qualifier column qualifier to check
-     * @param op comparison operator to use
-     * @param value the expected value
-     * @param delete data to delete if check succeeds
-     * @throws IOException e
-     * @return true if the new delete was executed, false otherwise
-     */
-    boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
-                           CompareOperator op, byte[] value, Delete delete) 
throws IOException;
-
-    /**
-     * Performs multiple mutations atomically on a single row. Currently
-     * {@link Put} and {@link Delete} are supported.
-     *
-     * @param rm object that specifies the set of mutations to perform 
atomically
-     * @throws IOException
-     */
-    void mutateRow(final RowMutations rm) throws IOException;
-
-    /**
-     * Appends values to one or more columns within a single row.
-     * <p>
-     * This operation guaranteed atomicity to readers. Appends are done
-     * under a single row lock, so write operations to a row are synchronized, 
and
-     * readers are guaranteed to see this operation fully completed.
-     *
-     * @param append object that specifies the columns and amounts to be used
-     *                  for the increment operations
-     * @throws IOException e
-     * @return values of columns after the append operation (maybe null)
-     */
-    Result append(final Append append) throws IOException;
-
-    /**
-     * Increments one or more columns within a single row.
-     * <p>
-     * This operation ensures atomicity to readers. Increments are done
-     * under a single row lock, so write operations to a row are synchronized, 
and
-     * readers are guaranteed to see this operation fully completed.
-     *
-     * @param increment object that specifies the columns and amounts to be 
used
-     *                  for the increment operations
-     * @throws IOException e
-     * @return values of columns after the increment
-     */
-    Result increment(final Increment increment) throws IOException;
-
-    /**
-     * See {@link #incrementColumnValue(byte[], byte[], byte[], long, 
Durability)}
-     * <p>
-     * The {@link Durability} is defaulted to {@link Durability#SYNC_WAL}.
-     * @param row The row that contains the cell to increment.
-     * @param family The column family of the cell to increment.
-     * @param qualifier The column qualifier of the cell to increment.
-     * @param amount The amount to increment the cell with (or decrement, if 
the
-     * amount is negative).
-     * @return The new value, post increment.
-     * @throws IOException if a remote or network exception occurs.
-     */
-    long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
-      long amount) throws IOException;
-
-    /**
-     * Atomically increments a column value. If the column value already exists
-     * and is not a big-endian long, this could throw an exception. If the 
column
-     * value does not yet exist it is initialized to <code>amount</code> and
-     * written to the specified column.
-     *
-     * <p>Setting durability to {@link Durability#SKIP_WAL} means that in a 
fail
-     * scenario you will lose any increments that have not been flushed.
-     * @param row The row that contains the cell to increment.
-     * @param family The column family of the cell to increment.
-     * @param qualifier The column qualifier of the cell to increment.
-     * @param amount The amount to increment the cell with (or decrement, if 
the
-     * amount is negative).
-     * @param durability The persistence guarantee for this increment.
-     * @return The new value, post increment.
-     * @throws IOException if a remote or network exception occurs.
-     */
-    long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
-      long amount, Durability durability) throws IOException;
-
-    /**
-     * Releases any resources held or pending changes in internal buffers.
-     *
-     * @throws IOException if a remote or network exception occurs.
-     */
-    @Override
-    void close() throws IOException;
-
-    /**
-     * Creates and returns a {@link com.google.protobuf.RpcChannel} instance 
connected to the
-     * table region containing the specified row.  The row given does not 
actually have
-     * to exist.  Whichever region would contain the row based on start and 
end keys will
-     * be used.  Note that the {@code row} parameter is also not passed to the
-     * coprocessor handler registered for this protocol, unless the {@code row}
-     * is separately passed as an argument in the service request.  The 
parameter
-     * here is only used to locate the region used to handle the call.
-     *
-     * <p>
-     * The obtained {@link com.google.protobuf.RpcChannel} instance can be 
used to access a published
-     * coprocessor {@link com.google.protobuf.Service} using standard protobuf 
service invocations:
-     * </p>
-     *
-     * <div style="background-color: #cccccc; padding: 2px">
-     * <blockquote><pre>
-     * CoprocessorRpcChannel channel = myTable.coprocessorService(rowkey);
-     * MyService.BlockingInterface service = 
MyService.newBlockingStub(channel);
-     * MyCallRequest request = MyCallRequest.newBuilder()
-     *     ...
-     *     .build();
-     * MyCallResponse response = service.myCall(null, request);
-     * </pre></blockquote></div>
-     *
-     * @param row The row key used to identify the remote region location
-     * @return A CoprocessorRpcChannel instance
-     */
-    CoprocessorRpcChannel coprocessorService(byte[] row);
-
-    /**
-     * Creates an instance of the given {@link com.google.protobuf.Service} 
subclass for each table
-     * region spanning the range from the {@code startKey} row to {@code 
endKey} row (inclusive), and
-     * invokes the passed {@link 
org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} method
-     * with each {@link com.google.protobuf.Service} instance.
-     *
-     * @param service the protocol buffer {@code Service} implementation to 
call
-     * @param startKey start region selection with region containing this row. 
 If {@code null}, the
-     * selection will start with the first table region.
-     * @param endKey select regions up to and including the region containing 
this row. If {@code
-     * null}, selection will continue through the last table region.
-     * @param callable this instance's {@link 
org.apache.hadoop.hbase.client.coprocessor.Batch
-     * .Call#call}
-     * method will be invoked once per table region, using the {@link 
com.google.protobuf.Service}
-     * instance connected to that region.
-     * @param <T> the {@link com.google.protobuf.Service} subclass to connect 
to
-     * @param <R> Return type for the {@code callable} parameter's {@link
-     * org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} method
-     * @return a map of result values keyed by region name
-     */
-    <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> 
service,
-      byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
-      throws ServiceException, Throwable;
-
-    /**
-     * Creates an instance of the given {@link com.google.protobuf.Service} 
subclass for each table
-     * region spanning the range from the {@code startKey} row to {@code 
endKey} row (inclusive), and
-     * invokes the passed {@link 
org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} method
-     * with each {@link Service} instance.
-     *
-     * <p> The given {@link 
org.apache.hadoop.hbase.client.coprocessor.Batch.Callback#update(byte[],
-     * byte[], Object)} method will be called with the return value from each 
region's {@link
-     * org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} invocation. 
</p>
-     *
-     * @param service the protocol buffer {@code Service} implementation to 
call
-     * @param startKey start region selection with region containing this row. 
 If {@code null}, the
-     * selection will start with the first table region.
-     * @param endKey select regions up to and including the region containing 
this row. If {@code
-     * null}, selection will continue through the last table region.
-     * @param callable this instance's {@link 
org.apache.hadoop.hbase.client.coprocessor.Batch
-     * .Call#call}
-     * method will be invoked once per table region, using the {@link Service} 
instance connected to
-     * that region.
-     * @param callback
-     * @param <T> the {@link Service} subclass to connect to
-     * @param <R> Return type for the {@code callable} parameter's {@link
-     * org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} method
-     */
-    <T extends Service, R> void coprocessorService(final Class<T> service,
-      byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
-      final Batch.Callback<R> callback) throws ServiceException, Throwable;
-
-    /**
-     * Creates an instance of the given {@link com.google.protobuf.Service} 
subclass for each table
-     * region spanning the range from the {@code startKey} row to {@code 
endKey} row (inclusive), all
-     * the invocations to the same region server will be batched into one 
call. The coprocessor
-     * service is invoked according to the service instance, method name and 
parameters.
-     *
-     * @param methodDescriptor
-     *          the descriptor for the protobuf service method to call.
-     * @param request
-     *          the method call parameters
-     * @param startKey
-     *          start region selection with region containing this row. If 
{@code null}, the
-     *          selection will start with the first table region.
-     * @param endKey
-     *          select regions up to and including the region containing this 
row. If {@code null},
-     *          selection will continue through the last table region.
-     * @param responsePrototype
-     *          the proto type of the response of the method in Service.
-     * @param <R>
-     *          the response type for the coprocessor Service method
-     * @throws ServiceException
-     * @throws Throwable
-     * @return a map of result values keyed by region name
-     */
-    <R extends Message> Map<byte[], R> batchCoprocessorService(
-      Descriptors.MethodDescriptor methodDescriptor, Message request,
-      byte[] startKey, byte[] endKey, R responsePrototype) throws 
ServiceException, Throwable;
-
-    /**
-     * Creates an instance of the given {@link com.google.protobuf.Service} 
subclass for each table
-     * region spanning the range from the {@code startKey} row to {@code 
endKey} row (inclusive), all
-     * the invocations to the same region server will be batched into one 
call. The coprocessor
-     * service is invoked according to the service instance, method name and 
parameters.
-     *
-     * <p>
-     * The given
-     * {@link 
org.apache.hadoop.hbase.client.coprocessor.Batch.Callback#update(byte[],byte[],Object)}
-     * method will be called with the return value from each region's 
invocation.
-     * </p>
-     *
-     * @param methodDescriptor
-     *          the descriptor for the protobuf service method to call.
-     * @param request
-     *          the method call parameters
-     * @param startKey
-     *          start region selection with region containing this row. If 
{@code null}, the
-     *          selection will start with the first table region.
-     * @param endKey
-     *          select regions up to and including the region containing this 
row. If {@code null},
-     *          selection will continue through the last table region.
-     * @param responsePrototype
-     *          the proto type of the response of the method in Service.
-     * @param callback
-     *          callback to invoke with the response for each region
-     * @param <R>
-     *          the response type for the coprocessor Service method
-     * @throws ServiceException
-     * @throws Throwable
-     */
-    <R extends Message> void 
batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor,
-      Message request, byte[] startKey, byte[] endKey, R responsePrototype,
-      Batch.Callback<R> callback) throws ServiceException, Throwable;
-
-    /**
-     * Atomically checks if a row/family/qualifier value matches the expected 
value.
-     * If it does, it performs the row mutations.  If the passed value is 
null, the check
-     * is for the lack of column (ie: non-existence)
-     *
-     * The expected value argument of this call is on the left and the current
-     * value of the cell is on the right side of the comparison operator.
-     *
-     * Ie. eg. GREATER operator means expected value > existing <=> perform 
row mutations.
-     *
-     * @param row to check
-     * @param family column family to check
-     * @param qualifier column qualifier to check
-     * @param compareOp the comparison operator
-     * @param value the expected value
-     * @param mutation  mutations to perform if check succeeds
-     * @throws IOException e
-     * @return true if the new put was executed, false otherwise
-     * @deprecated Since 2.0.0. Will be removed in 3.0.0. Use
-     * {@link #checkAndMutate(byte[], byte[], byte[], CompareOperator, byte[], 
RowMutations)}
-     */
-    @Deprecated
-    boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
-        CompareFilter.CompareOp compareOp, byte[] value, RowMutations 
mutation) throws IOException;
-
-    /**
-     * Atomically checks if a row/family/qualifier value matches the expected 
value.
-     * If it does, it performs the row mutations.  If the passed value is 
null, the check
-     * is for the lack of column (ie: non-existence)
-     *
-     * The expected value argument of this call is on the left and the current
-     * value of the cell is on the right side of the comparison operator.
-     *
-     * Ie. eg. GREATER operator means expected value > existing <=> perform 
row mutations.
-     *
-     * @param row to check
-     * @param family column family to check
-     * @param qualifier column qualifier to check
-     * @param op the comparison operator
-     * @param value the expected value
-     * @param mutation  mutations to perform if check succeeds
-     * @throws IOException e
-     * @return true if the new put was executed, false otherwise
-     */
-    boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, 
CompareOperator op,
-                           byte[] value, RowMutations mutation) throws 
IOException;
-
-    /**
-     * Get timeout of each rpc request in this Table instance. It will be 
overridden by a more
-     * specific rpc timeout config such as readRpcTimeout or writeRpcTimeout.
-     * @see #getReadRpcTimeout(TimeUnit)
-     * @see #getWriteRpcTimeout(TimeUnit)
-     * @param unit the unit of time the timeout to be represented in
-     * @return rpc timeout in the specified time unit
-     */
-    long getRpcTimeout(TimeUnit unit);
-
-    /**
-     * Get timeout (millisecond) of each rpc request in this Table instance.
-     *
-     * @return Currently configured read timeout
-     * @deprecated use {@link #getReadRpcTimeout(TimeUnit)} or
-     *             {@link #getWriteRpcTimeout(TimeUnit)} instead
-     */
-    @Deprecated
-    int getRpcTimeout();
-
-    /**
-     * Set timeout (millisecond) of each rpc request in operations of this 
Table instance, will
-     * override the value of hbase.rpc.timeout in configuration.
-     * If a rpc request waiting too long, it will stop waiting and send a new 
request to retry until
-     * retries exhausted or operation timeout reached.
-     * <p>
-     * NOTE: This will set both the read and write timeout settings to the 
provided value.
-     *
-     * @param rpcTimeout the timeout of each rpc request in millisecond.
-     *
-     * @deprecated Use setReadRpcTimeout or setWriteRpcTimeout instead
-     */
-    @Deprecated
-    void setRpcTimeout(int rpcTimeout);
-
-    /**
-     * Get timeout of each rpc read request in this Table instance.
-     * @param unit the unit of time the timeout to be represented in
-     * @return read rpc timeout in the specified time unit
-     */
-    long getReadRpcTimeout(TimeUnit unit);
-
-    /**
-     * Get timeout (millisecond) of each rpc read request in this Table 
instance.
-     * @deprecated since 2.0 and will be removed in 3.0 version
-     *             use {@link #getReadRpcTimeout(TimeUnit)} instead
-     */
-    @Deprecated
-    int getReadRpcTimeout();
-
-    /**
-     * Set timeout (millisecond) of each rpc read request in operations of 
this Table instance, will
-     * override the value of hbase.rpc.read.timeout in configuration.
-     * If a rpc read request waiting too long, it will stop waiting and send a 
new request to retry
-     * until retries exhausted or operation timeout reached.
-     *
-     * @param readRpcTimeout the timeout for read rpc request in milliseconds
-     * @deprecated since 2.0.0, use {@link TableBuilder#setReadRpcTimeout} 
instead
-     */
-    @Deprecated
-    void setReadRpcTimeout(int readRpcTimeout);
-
-    /**
-     * Get timeout of each rpc write request in this Table instance.
-     * @param unit the unit of time the timeout to be represented in
-     * @return write rpc timeout in the specified time unit
-     */
-    long getWriteRpcTimeout(TimeUnit unit);
-
-    /**
-     * Get timeout (millisecond) of each rpc write request in this Table 
instance.
-     * @deprecated since 2.0 and will be removed in 3.0 version
-     *             use {@link #getWriteRpcTimeout(TimeUnit)} instead
-     */
-    @Deprecated
-    int getWriteRpcTimeout();
-
-    /**
-     * Set timeout (millisecond) of each rpc write request in operations of 
this Table instance, will
-     * override the value of hbase.rpc.write.timeout in configuration.
-     * If a rpc write request waiting too long, it will stop waiting and send 
a new request to retry
-     * until retries exhausted or operation timeout reached.
-     *
-     * @param writeRpcTimeout the timeout for write rpc request in milliseconds
-     * @deprecated since 2.0.0, use {@link TableBuilder#setWriteRpcTimeout} 
instead
-     */
-    @Deprecated
-    void setWriteRpcTimeout(int writeRpcTimeout);
-
-    /**
-     * Get timeout of each operation in Table instance.
-     * @param unit the unit of time the timeout to be represented in
-     * @return operation rpc timeout in the specified time unit
-     */
-    long getOperationTimeout(TimeUnit unit);
-
-    /**
-     * Get timeout (millisecond) of each operation for in Table instance.
-     * @deprecated since 2.0 and will be removed in 3.0 version
-     *             use {@link #getOperationTimeout(TimeUnit)} instead
-     */
-    @Deprecated
-    int getOperationTimeout();
-
-    /**
-     * Set timeout (millisecond) of each operation in this Table instance, 
will override the value
-     * of hbase.client.operation.timeout in configuration.
-     * Operation timeout is a top-level restriction that makes sure a blocking 
method will not be
-     * blocked more than this. In each operation, if rpc request fails because 
of timeout or
-     * other reason, it will retry until success or throw a 
RetriesExhaustedException. But if the
-     * total time being blocking reach the operation timeout before retries 
exhausted, it will break
-     * early and throw SocketTimeoutException.
-     * @param operationTimeout the total timeout of each operation in 
millisecond.
-     * @deprecated since 2.0.0, use {@link TableBuilder#setOperationTimeout} 
instead
-     */
-    @Deprecated
-    void setOperationTimeout(int operationTimeout);
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
index 8d19ae5..add403a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java
@@ -19,21 +19,21 @@ package org.apache.phoenix.transaction;
 
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.schema.PTableType;
+import 
org.apache.phoenix.transaction.TephraTransactionProvider.TephraTransactionClient;
 import org.apache.tephra.Transaction;
 import org.apache.tephra.Transaction.VisibilityLevel;
 import org.apache.tephra.TransactionAware;
@@ -41,48 +41,24 @@ import org.apache.tephra.TransactionCodec;
 import org.apache.tephra.TransactionConflictException;
 import org.apache.tephra.TransactionContext;
 import org.apache.tephra.TransactionFailureException;
-import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TransactionSystemClient;
 import org.apache.tephra.TxConstants;
-import org.apache.tephra.distributed.PooledClientProvider;
-import org.apache.tephra.distributed.TransactionService;
-import org.apache.tephra.distributed.TransactionServiceClient;
-import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
-import org.apache.tephra.inmemory.InMemoryTxSystemClient;
-import org.apache.tephra.metrics.TxMetricsCollector;
-import org.apache.tephra.persist.HDFSTransactionStateStorage;
-import org.apache.tephra.snapshot.SnapshotCodecProvider;
-import org.apache.tephra.util.TxUtils;
 import org.apache.tephra.visibility.FenceWait;
 import org.apache.tephra.visibility.VisibilityFence;
-import org.apache.tephra.zookeeper.TephraZKClientService;
-import org.apache.twill.discovery.DiscoveryService;
-import org.apache.twill.discovery.ZKDiscoveryService;
-import org.apache.twill.internal.utils.Networks;
-import org.apache.twill.zookeeper.RetryStrategies;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.apache.twill.zookeeper.ZKClientServices;
-import org.apache.twill.zookeeper.ZKClients;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Lists;
-import com.google.inject.util.Providers;
 
-public class TephraTransactionContext implements PhoenixTransactionContext {
 
+public class TephraTransactionContext implements PhoenixTransactionContext {
+    private static final Logger logger = 
LoggerFactory.getLogger(TephraTransactionContext.class);
     private static final TransactionCodec CODEC = new TransactionCodec();
 
-    private static TransactionSystemClient txClient = null;
-    private static ZKClientService zkClient = null;
-    private static TransactionService txService = null;
-    private static TransactionManager txManager = null;
-
     private final List<TransactionAware> txAwares;
     private final TransactionContext txContext;
     private Transaction tx;
     private TransactionSystemClient txServiceClient;
-    private TransactionFailureException e;
 
     public TephraTransactionContext() {
         this.txServiceClient = null;
@@ -92,21 +68,21 @@ public class TephraTransactionContext implements 
PhoenixTransactionContext {
 
     public TephraTransactionContext(byte[] txnBytes) throws IOException {
         this();
-        this.tx = (txnBytes != null && txnBytes.length > 0) ? CODEC
-                .decode(txnBytes) : null;
+        this.tx = CODEC.decode(txnBytes);
     }
 
     public TephraTransactionContext(PhoenixConnection connection) {
-        this.txServiceClient = txClient;  
+        PhoenixTransactionClient client = 
connection.getQueryServices().initTransactionClient(getProvider());  
+        assert (client instanceof TephraTransactionClient);
+        this.txServiceClient = 
((TephraTransactionClient)client).getTransactionClient();
         this.txAwares = Collections.emptyList();
         this.txContext = new TransactionContext(txServiceClient);
     }
 
-    public TephraTransactionContext(PhoenixTransactionContext ctx,
-            PhoenixConnection connection, boolean subTask) {
-        this.txServiceClient = txClient;  
+    private TephraTransactionContext(PhoenixTransactionContext ctx, boolean 
subTask) {
         assert (ctx instanceof TephraTransactionContext);
         TephraTransactionContext tephraTransactionContext = 
(TephraTransactionContext) ctx;
+        this.txServiceClient = tephraTransactionContext.txServiceClient;  
 
         if (subTask) {
             this.tx = tephraTransactionContext.getTransaction();
@@ -116,42 +92,13 @@ public class TephraTransactionContext implements 
PhoenixTransactionContext {
             this.txAwares = Collections.emptyList();
             this.txContext = tephraTransactionContext.getContext();
         }
-
-        this.e = null;
-    }
-
-    @Override
-    public void setInMemoryTransactionClient(Configuration config) {
-        TransactionManager txnManager = new TransactionManager(config);
-        txClient = this.txServiceClient = new 
InMemoryTxSystemClient(txnManager);
     }
 
     @Override
-    public ZKClientService setTransactionClient(Configuration config, 
ReadOnlyProps props, ConnectionInfo connectionInfo) {
-        String zkQuorumServersString = 
props.get(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM);
-        if (zkQuorumServersString==null) {
-            zkQuorumServersString = 
connectionInfo.getZookeeperQuorum()+":"+connectionInfo.getPort();
-        }
-
-        int timeOut = props.getInt(HConstants.ZK_SESSION_TIMEOUT, 
HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
-        // Create instance of the tephra zookeeper client
-        ZKClientService txZKClientService  = ZKClientServices.delegate(
-            ZKClients.reWatchOnExpire(
-                ZKClients.retryOnFailure(
-                     new TephraZKClientService(zkQuorumServersString, timeOut, 
null,
-                             ArrayListMultimap.<String, byte[]>create()), 
-                         RetryStrategies.exponentialDelay(500, 2000, 
TimeUnit.MILLISECONDS))
-                     )
-                );
-        txZKClientService.startAndWait();
-        ZKDiscoveryService zkDiscoveryService = new 
ZKDiscoveryService(txZKClientService);
-        PooledClientProvider pooledClientProvider = new PooledClientProvider(
-                config, zkDiscoveryService);
-        txClient = this.txServiceClient = new 
TransactionServiceClient(config,pooledClientProvider);
-        
-        return txZKClientService;
+    public TransactionFactory.Provider getProvider() {
+        return TransactionFactory.Provider.TEPHRA;
     }
-
+    
     @Override
     public void begin() throws SQLException {
         if (txContext == null) {
@@ -180,8 +127,6 @@ public class TephraTransactionContext implements 
PhoenixTransactionContext {
         try {
             txContext.finish();
         } catch (TransactionFailureException e) {
-            this.e = e;
-
             if (e instanceof TransactionConflictException) {
                 throw new SQLExceptionInfo.Builder(
                         SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION)
@@ -203,14 +148,8 @@ public class TephraTransactionContext implements 
PhoenixTransactionContext {
         }
 
         try {
-            if (e != null) {
-                txContext.abort(e);
-                e = null;
-            } else {
-                txContext.abort();
-            }
+            txContext.abort();
         } catch (TransactionFailureException e) {
-            this.e = null;
             throw new SQLExceptionInfo.Builder(
                     SQLExceptionCode.TRANSACTION_FAILED)
                     .setMessage(e.getMessage()).setRootCause(e).build()
@@ -248,7 +187,7 @@ public class TephraTransactionContext implements 
PhoenixTransactionContext {
     }
 
     @Override
-    public void commitDDLFence(PTable dataTable, Logger logger)
+    public void commitDDLFence(PTable dataTable)
             throws SQLException {
         byte[] key = dataTable.getName().getBytes();
 
@@ -275,7 +214,12 @@ public class TephraTransactionContext implements 
PhoenixTransactionContext {
         }
     }
 
+    @Override
     public void markDMLFence(PTable table) {
+        if (table.getType() == PTableType.INDEX) {
+            return;
+        }
+        
         byte[] logicalKey = table.getName().getBytes();
         TransactionAware logicalTxAware = VisibilityFence.create(logicalKey);
 
@@ -299,6 +243,9 @@ public class TephraTransactionContext implements 
PhoenixTransactionContext {
 
     @Override
     public void join(PhoenixTransactionContext ctx) {
+        if (ctx == PhoenixTransactionContext.NULL_CONTEXT) {
+            return;
+        }
         assert (ctx instanceof TephraTransactionContext);
         TephraTransactionContext tephraContext = (TephraTransactionContext) 
ctx;
 
@@ -324,7 +271,6 @@ public class TephraTransactionContext implements 
PhoenixTransactionContext {
     public void reset() {
         tx = null;
         txAwares.clear();
-        this.e = null;
     }
 
     @Override
@@ -405,86 +351,15 @@ public class TephraTransactionContext implements 
PhoenixTransactionContext {
         assert (tx != null);
 
         try {
-            return CODEC.encode(tx);
+            byte[] encodedTxBytes = CODEC.encode(tx);
+            encodedTxBytes = Arrays.copyOf(encodedTxBytes, 
encodedTxBytes.length + 1);
+            encodedTxBytes[encodedTxBytes.length - 1] = 
getProvider().getCode();
+            return encodedTxBytes;
         } catch (IOException e) {
             throw new SQLException(e);
         }
     }
 
-    @Override
-    public long getMaxTransactionsPerSecond() {
-        return TxConstants.MAX_TX_PER_MS;
-    }
-
-    @Override
-    public boolean isPreExistingVersion(long version) {
-        return TxUtils.isPreExistingVersion(version);
-    }
-
-    @Override
-    public RegionObserver getCoprocessor() {
-        return new TransactionProcessor();
-    }
-
-    @Override
-    public byte[] getFamilyDeleteMarker() {
-        return TxConstants.FAMILY_DELETE_QUALIFIER;
-    }
-
-    @Override
-    public void setTxnConfigs(Configuration config, String tmpFolder, int 
defaultTxnTimeoutSeconds) throws IOException {
-        config.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
-        config.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, 
"n-times");
-        config.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1);
-        config.setInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, 
Networks.getRandomPort());
-        config.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder);
-        config.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, 
defaultTxnTimeoutSeconds);
-        config.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
-        config.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5L);
-    }
-
-    @Override
-    public void setupTxManager(Configuration config, String url) throws 
SQLException {
-
-        if (txService != null) {
-            return;
-        }
-
-        ConnectionInfo connInfo = ConnectionInfo.create(url);
-        zkClient = ZKClientServices.delegate(
-          ZKClients.reWatchOnExpire(
-            ZKClients.retryOnFailure(
-              
ZKClientService.Builder.of(connInfo.getZookeeperConnectionString())
-                .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT,
-                        HConstants.DEFAULT_ZK_SESSION_TIMEOUT))
-                .build(),
-              RetryStrategies.exponentialDelay(500, 2000, 
TimeUnit.MILLISECONDS)
-            )
-          )
-        );
-        zkClient.startAndWait();
-
-        DiscoveryService discovery = new ZKDiscoveryService(zkClient);
-        txManager = new TransactionManager(config, new 
HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config), new 
TxMetricsCollector()), new TxMetricsCollector());
-        txService = new TransactionService(config, zkClient, discovery, 
Providers.of(txManager));
-        txService.startAndWait();
-    }
-
-    @Override
-    public void tearDownTxManager() {
-        try {
-            if (txService != null) txService.stopAndWait();
-        } finally {
-            try {
-                if (zkClient != null) zkClient.stopAndWait();
-            } finally {
-                txService = null;
-                zkClient = null;
-                txManager = null;
-            }
-        }
-    }
-
     /**
      * TephraTransactionContext specific functions
      */
@@ -510,4 +385,17 @@ public class TephraTransactionContext implements 
PhoenixTransactionContext {
             txAware.startTx(tx);
         }
     }
+
+    @Override
+    public PhoenixTransactionContext 
newTransactionContext(PhoenixTransactionContext context, boolean subTask) {
+        return new TephraTransactionContext(context, subTask);
+    }
+    
+    @Override
+    public Table getTransactionalTable(Table htable, boolean isImmutable) {
+        TransactionAwareHTable transactionAwareHTable = new 
TransactionAwareHTable(htable, isImmutable ? TxConstants.ConflictDetection.NONE 
: TxConstants.ConflictDetection.ROW);
+        this.addTransactionAware(transactionAwareHTable);
+        return transactionAwareHTable;
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java
index 075c922..2e52efa 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java
@@ -18,16 +18,37 @@
 package org.apache.phoenix.transaction;
 
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.phoenix.coprocessor.TephraTransactionalProcessor;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
+import org.apache.phoenix.transaction.TransactionFactory.Provider;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionSystemClient;
 import org.apache.tephra.TxConstants;
+import org.apache.tephra.distributed.PooledClientProvider;
+import org.apache.tephra.distributed.TransactionService;
+import org.apache.tephra.distributed.TransactionServiceClient;
+import org.apache.tephra.inmemory.InMemoryTxSystemClient;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.HDFSTransactionStateStorage;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
+import org.apache.tephra.zookeeper.TephraZKClientService;
+import org.apache.twill.discovery.DiscoveryService;
+import org.apache.twill.discovery.ZKDiscoveryService;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
 
-public class TephraTransactionProvider implements TransactionProvider {
+import com.google.common.collect.ArrayListMultimap;
+import com.google.inject.util.Providers;
+
+public class TephraTransactionProvider implements PhoenixTransactionProvider {
     private static final TephraTransactionProvider INSTANCE = new 
TephraTransactionProvider();
     
     public static final TephraTransactionProvider getInstance() {
@@ -37,12 +58,6 @@ public class TephraTransactionProvider implements 
TransactionProvider {
     private TephraTransactionProvider() {
     }
     
-    
-    @Override
-    public PhoenixTransactionContext getTransactionContext()  {
-        return new TephraTransactionContext();
-    }
-
     @Override
     public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) 
throws IOException {
        return new TephraTransactionContext(txnBytes);
@@ -54,23 +69,129 @@ public class TephraTransactionProvider implements 
TransactionProvider {
     }
 
     @Override
-    public PhoenixTransactionContext 
getTransactionContext(PhoenixTransactionContext contex, PhoenixConnection 
connection, boolean subTask) {
-        return new TephraTransactionContext(contex, connection, subTask);
+    public PhoenixTransactionClient getTransactionClient(Configuration config, 
ConnectionInfo connectionInfo) {
+        if (connectionInfo.isConnectionless()) {
+            TransactionManager txnManager = new TransactionManager(config);
+            TransactionSystemClient txClient = new 
InMemoryTxSystemClient(txnManager);
+            return new TephraTransactionClient(txClient);
+            
+        }
+        String zkQuorumServersString = 
config.get(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM);
+        if (zkQuorumServersString==null) {
+            zkQuorumServersString = 
connectionInfo.getZookeeperConnectionString();
+        }
+
+        int timeOut = config.getInt(HConstants.ZK_SESSION_TIMEOUT, 
HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
+        // Create instance of the tephra zookeeper client
+        ZKClientService zkClientService  = ZKClientServices.delegate(
+            ZKClients.reWatchOnExpire(
+                ZKClients.retryOnFailure(
+                     new TephraZKClientService(zkQuorumServersString, timeOut, 
null,
+                             ArrayListMultimap.<String, byte[]>create()), 
+                         RetryStrategies.exponentialDelay(500, 2000, 
TimeUnit.MILLISECONDS))
+                     )
+                );
+        //txZKClientService.startAndWait();
+        ZKDiscoveryService zkDiscoveryService = new 
ZKDiscoveryService(zkClientService);
+        PooledClientProvider pooledClientProvider = new PooledClientProvider(
+                config, zkDiscoveryService);
+        TransactionServiceClient txClient = new 
TransactionServiceClient(config,pooledClientProvider);
+        TephraTransactionClient client = new 
TephraTransactionClient(zkClientService, txClient);
+        client.start();
+        
+        return client;
     }
 
     @Override
-    public PhoenixTransactionalTable 
getTransactionalTable(PhoenixTransactionContext ctx, Table htable) {
-        return new TephraTransactionTable(ctx, htable);
+    public PhoenixTransactionService getTransactionService(Configuration 
config, ConnectionInfo connInfo) {
+        ZKClientService zkClient = ZKClientServices.delegate(
+          ZKClients.reWatchOnExpire(
+            ZKClients.retryOnFailure(
+              
ZKClientService.Builder.of(connInfo.getZookeeperConnectionString())
+                .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT,
+                        HConstants.DEFAULT_ZK_SESSION_TIMEOUT))
+                .build(),
+              RetryStrategies.exponentialDelay(500, 2000, 
TimeUnit.MILLISECONDS)
+            )
+          )
+        );
+
+        //zkClient.startAndWait();
+        DiscoveryService discovery = new ZKDiscoveryService(zkClient);
+        TransactionManager txManager = new TransactionManager(config, new 
HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config), new 
TxMetricsCollector()), new TxMetricsCollector());
+        TransactionService txService = new TransactionService(config, 
zkClient, discovery, Providers.of(txManager));
+        TephraTransactionService service = new 
TephraTransactionService(zkClient, txService);
+        //txService.startAndWait();            
+        service.start();
+        return service;
+    }
+
+    static class TephraTransactionService implements PhoenixTransactionService 
{
+        private final ZKClientService zkClient;
+        private final TransactionService txService;
+
+        public TephraTransactionService(ZKClientService zkClient, 
TransactionService txService) {
+            this.zkClient = zkClient;
+            this.txService = txService;
+        }
+        
+        public void start() {
+            zkClient.startAndWait();
+            txService.startAndWait();            
+        }
+        
+        @Override
+        public void close() throws IOException {
+            try {
+                if (txService != null) txService.stopAndWait();
+            } finally {
+                if (zkClient != null) zkClient.stopAndWait();
+            }
+        }
+        
     }
     
-    @Override
-    public Cell newDeleteFamilyMarker(byte[] row, byte[] family, long 
timestamp) {
-        return CellUtil.createCell(row, family, 
TxConstants.FAMILY_DELETE_QUALIFIER, timestamp, KeyValue.Type.Put.getCode(), 
HConstants.EMPTY_BYTE_ARRAY);
+    static class TephraTransactionClient implements PhoenixTransactionClient {
+        private final ZKClientService zkClient;
+        private final TransactionSystemClient txClient;
+
+        public TephraTransactionClient(TransactionSystemClient txClient) {
+            this(null, txClient);
+        }
+        
+        public TephraTransactionClient(ZKClientService zkClient, 
TransactionSystemClient txClient) {
+            this.zkClient = zkClient;
+            this.txClient = txClient;
+        }
+        
+        public void start() {
+            zkClient.startAndWait();
+        }
+        
+        public TransactionSystemClient getTransactionClient() {
+            return txClient;
+        }
+        
+        @Override
+        public void close() throws IOException {
+            zkClient.stopAndWait();
+        }
+        
     }
     
     @Override
-    public Cell newDeleteColumnMarker(byte[] row, byte[] family, byte[] 
qualifier, long timestamp) {
-        return CellUtil.createCell(row, family, qualifier, timestamp, 
KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY);
+    public Class<? extends RegionObserver> getCoprocessor() {
+        return TephraTransactionalProcessor.class;
+    }
+
+    @Override
+    public Provider getProvider() {
+        return TransactionFactory.Provider.TEPHRA;
+    }
+
+    @Override
+    public boolean isUnsupported(Feature feature) {
+        return false;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
deleted file mode 100644
index e28e98b..0000000
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java
+++ /dev/null
@@ -1,360 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.transaction;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CompareOperator;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Append;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.client.RowMutations;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
-import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
-import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.PTableType;
-import org.apache.tephra.TxConstants;
-
-import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
-
-public class TephraTransactionTable implements PhoenixTransactionalTable {
-
-    private TransactionAwareHTable transactionAwareHTable;
-
-    private TephraTransactionContext tephraTransactionContext;
-
-    public TephraTransactionTable(PhoenixTransactionContext ctx, Table hTable) 
{
-        this(ctx, hTable, null);
-    }
-
-    public TephraTransactionTable(PhoenixTransactionContext ctx, Table hTable, 
PTable pTable) {
-
-        assert(ctx instanceof TephraTransactionContext);
-
-        tephraTransactionContext = (TephraTransactionContext) ctx;
-
-        transactionAwareHTable = new TransactionAwareHTable(hTable, (pTable != 
null && pTable.isImmutableRows())
-                ? TxConstants.ConflictDetection.NONE : 
TxConstants.ConflictDetection.ROW);
-
-        tephraTransactionContext.addTransactionAware(transactionAwareHTable);
-
-        if (pTable != null && pTable.getType() != PTableType.INDEX) {
-            tephraTransactionContext.markDMLFence(pTable);
-        }
-    }
-
-    @Override
-    public Result get(Get get) throws IOException {
-        return transactionAwareHTable.get(get);
-    }
-
-    @Override
-    public void put(Put put) throws IOException {
-        transactionAwareHTable.put(put);
-    }
-
-    @Override
-    public void delete(Delete delete) throws IOException {
-        transactionAwareHTable.delete(delete);
-    }
-
-    @Override
-    public ResultScanner getScanner(Scan scan) throws IOException {
-        return transactionAwareHTable.getScanner(scan);
-    }
-
-
-    @Override
-    public Configuration getConfiguration() {
-        return transactionAwareHTable.getConfiguration();
-    }
-
-    @Override
-    public HTableDescriptor getTableDescriptor() throws IOException {
-        return transactionAwareHTable.getTableDescriptor();
-    }
-
-    @Override
-    public boolean exists(Get get) throws IOException {
-        return transactionAwareHTable.exists(get);
-    }
-
-    @Override
-    public Result[] get(List<Get> gets) throws IOException {
-        return transactionAwareHTable.get(gets);
-    }
-
-    @Override
-    public ResultScanner getScanner(byte[] family) throws IOException {
-        return transactionAwareHTable.getScanner(family);
-    }
-
-    @Override
-    public ResultScanner getScanner(byte[] family, byte[] qualifier)
-            throws IOException {
-        return transactionAwareHTable.getScanner(family, qualifier);
-    }
-
-    @Override
-    public void put(List<Put> puts) throws IOException {
-        transactionAwareHTable.put(puts);
-    }
-
-    @Override
-    public void delete(List<Delete> deletes) throws IOException {
-        transactionAwareHTable.delete(deletes);
-    }
-
-    @Override
-    public void close() throws IOException {
-        transactionAwareHTable.close();
-    }
-
-
-    @Override
-    public TableName getName() {
-        return transactionAwareHTable.getName();
-    }
-
-    @Override
-    public boolean[] existsAll(List<Get> gets) throws IOException {
-        return transactionAwareHTable.existsAll(gets);
-    }
-
-    @Override
-    public void batch(List<? extends Row> actions, Object[] results)
-            throws IOException, InterruptedException {
-        transactionAwareHTable.batch(actions, results);
-    }
-
-    @Override
-    public <R> void batchCallback(List<? extends Row> actions,
-            Object[] results, Callback<R> callback) throws IOException,
-            InterruptedException {
-        transactionAwareHTable.batchCallback(actions, results, callback);
-    }
-
-    @Override
-    public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
-            byte[] value, Put put) throws IOException {
-        return transactionAwareHTable.checkAndPut(row, family, qualifier, 
value, put);
-    }
-
-    @Override
-    public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
-            CompareOp compareOp, byte[] value, Put put) throws IOException {
-        return transactionAwareHTable.checkAndPut(row, family, qualifier, 
compareOp, value, put);
-    }
-
-    @Override
-    public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
-            byte[] value, Delete delete) throws IOException {
-        return transactionAwareHTable.checkAndDelete(row, family, qualifier, 
value, delete);
-    }
-
-    @Override
-    public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
-            CompareOp compareOp, byte[] value, Delete delete)
-            throws IOException {
-        return transactionAwareHTable.checkAndDelete(row, family, qualifier, 
compareOp, value, delete);
-    }
-
-    @Override
-    public void mutateRow(RowMutations rm) throws IOException {
-        transactionAwareHTable.mutateRow(rm);
-    }
-
-    @Override
-    public Result append(Append append) throws IOException {
-        return transactionAwareHTable.append(append);
-    }
-
-    @Override
-    public Result increment(Increment increment) throws IOException {
-        return transactionAwareHTable.increment(increment);
-    }
-
-    @Override
-    public long incrementColumnValue(byte[] row, byte[] family,
-            byte[] qualifier, long amount) throws IOException {
-        return transactionAwareHTable.incrementColumnValue(row, family, 
qualifier, amount);
-    }
-
-    @Override
-    public long incrementColumnValue(byte[] row, byte[] family,
-            byte[] qualifier, long amount, Durability durability)
-            throws IOException {
-        return transactionAwareHTable.incrementColumnValue(row, family, 
qualifier, amount, durability);
-    }
-
-    @Override
-    public CoprocessorRpcChannel coprocessorService(byte[] row) {
-        return transactionAwareHTable.coprocessorService(row);
-    }
-
-    @Override
-    public <T extends Service, R> Map<byte[], R> coprocessorService(
-            Class<T> service, byte[] startKey, byte[] endKey,
-            Call<T, R> callable) throws ServiceException, Throwable {
-        return transactionAwareHTable.coprocessorService(service, startKey, 
endKey, callable);
-    }
-
-    @Override
-    public <T extends Service, R> void coprocessorService(Class<T> service,
-            byte[] startKey, byte[] endKey, Call<T, R> callable,
-            Callback<R> callback) throws ServiceException, Throwable {
-        transactionAwareHTable.coprocessorService(service, startKey, endKey, 
callable, callback);
-    }
-
-    @Override
-    public <R extends Message> Map<byte[], R> batchCoprocessorService(
-            MethodDescriptor methodDescriptor, Message request,
-            byte[] startKey, byte[] endKey, R responsePrototype)
-            throws ServiceException, Throwable {
-        return 
transactionAwareHTable.batchCoprocessorService(methodDescriptor, request, 
startKey, endKey, responsePrototype);
-    }
-
-    @Override
-    public <R extends Message> void batchCoprocessorService(
-            MethodDescriptor methodDescriptor, Message request,
-            byte[] startKey, byte[] endKey, R responsePrototype,
-            Callback<R> callback) throws ServiceException, Throwable {
-        transactionAwareHTable.batchCoprocessorService(methodDescriptor, 
request, startKey, endKey, responsePrototype, callback);
-    }
-
-    @Override
-    public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
-            CompareOp compareOp, byte[] value, RowMutations mutation)
-            throws IOException {
-        return transactionAwareHTable.checkAndMutate(row, family, qualifier, 
compareOp, value, mutation);
-    }
-
-    @Override
-    public void setOperationTimeout(int operationTimeout) {
-        transactionAwareHTable.setOperationTimeout(operationTimeout);
-    }
-
-    @Override
-    public int getOperationTimeout() {
-        return transactionAwareHTable.getOperationTimeout();
-    }
-
-    @Override
-    public void setRpcTimeout(int rpcTimeout) {
-        transactionAwareHTable.setRpcTimeout(rpcTimeout);
-    }
-
-    @Override
-    public int getRpcTimeout() {
-        return transactionAwareHTable.getRpcTimeout();
-    }
-
-    @Override
-    public TableDescriptor getDescriptor() throws IOException {
-        return transactionAwareHTable.getDescriptor();
-    }
-
-    @Override
-    public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, 
CompareOperator op,
-            byte[] value, Put put) throws IOException {
-        return transactionAwareHTable.checkAndPut(row, family, qualifier, op, 
value, put);
-    }
-
-    @Override
-    public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, 
CompareOperator op,
-            byte[] value, Delete delete) throws IOException {
-        return transactionAwareHTable.checkAndDelete(row, family, qualifier, 
op, value, delete);
-    }
-
-    @Override
-    public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
-        return transactionAwareHTable.checkAndMutate(row, family);
-    }
-
-    @Override
-    public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, 
CompareOperator op,
-            byte[] value, RowMutations mutation) throws IOException {
-        return transactionAwareHTable.checkAndMutate(row, family, qualifier, 
op, value, mutation);
-    }
-
-    @Override
-    public int getReadRpcTimeout() {
-        return transactionAwareHTable.getReadRpcTimeout();
-    }
-
-    @Override
-    public void setReadRpcTimeout(int readRpcTimeout) {
-        transactionAwareHTable.setReadRpcTimeout(readRpcTimeout);
-    }
-
-    @Override
-    public int getWriteRpcTimeout() {
-        return transactionAwareHTable.getWriteRpcTimeout();
-    }
-
-    @Override
-    public void setWriteRpcTimeout(int writeRpcTimeout) {
-        transactionAwareHTable.setWriteRpcTimeout(writeRpcTimeout);
-    }
-
-    @Override
-    public boolean[] exists(List<Get> gets) throws IOException {
-        return transactionAwareHTable.exists(gets);
-    }
-
-    @Override
-    public long getRpcTimeout(TimeUnit unit) {
-        return transactionAwareHTable.getRpcTimeout();
-    }
-
-    @Override
-    public long getReadRpcTimeout(TimeUnit unit) {
-        return transactionAwareHTable.getReadRpcTimeout(unit);
-    }
-
-    @Override
-    public long getWriteRpcTimeout(TimeUnit unit) {
-        return transactionAwareHTable.getWriteRpcTimeout(unit);
-    }
-
-    @Override
-    public long getOperationTimeout(TimeUnit unit) {
-        return transactionAwareHTable.getOperationTimeout(unit);
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
index f32764b..62bd808 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java
@@ -17,24 +17,55 @@
  */
 package org.apache.phoenix.transaction;
 
+import java.io.IOException;
+
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
+
+
 public class TransactionFactory {
-    enum TransactionProcessor {
-        Tephra,
-        Omid
+    public enum Provider {
+        TEPHRA((byte)1, TephraTransactionProvider.getInstance()),
+        OMID((byte)2, OmidTransactionProvider.getInstance());
+        
+        private final byte code;
+        private final PhoenixTransactionProvider provider;
+        
+        Provider(byte code, PhoenixTransactionProvider provider) {
+            this.code = code;
+            this.provider = provider;
+        }
+        
+        public byte getCode() {
+            return this.code;
+        }
+
+        public static Provider fromCode(int code) {
+            if (code < 1 || code > Provider.values().length) {
+                throw new IllegalArgumentException("Invalid 
TransactionFactory.Provider " + code);
+            }
+            return Provider.values()[code-1];
+        }
+        
+        public static Provider getDefault() {
+            return TEPHRA;
+        }
+
+        public PhoenixTransactionProvider getTransactionProvider()  {
+            return provider;
+        }
     }
 
-    static public TransactionProvider getTransactionProvider() {
-        return TephraTransactionProvider.getInstance();
+    public static PhoenixTransactionProvider getTransactionProvider(Provider 
provider) {
+        return provider.getTransactionProvider();
     }
     
-    static public TransactionProvider 
getTransactionProvider(TransactionProcessor processor) {
-        switch (processor) {
-        case Tephra:
-            return TephraTransactionProvider.getInstance();
-        case Omid:
-            return OmidTransactionProvider.getInstance();
-        default:
-            throw new IllegalArgumentException("Unknown transaction processor: 
" + processor);
+    public static PhoenixTransactionContext getTransactionContext(byte[] 
txState, int clientVersion) throws IOException {
+        if (txState == null || txState.length == 0) {
+            return null;
         }
+        Provider provider = (clientVersion < 
MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0) 
+                ? Provider.OMID
+                : Provider.fromCode(txState[txState.length-1]);
+        return 
provider.getTransactionProvider().getTransactionContext(txState);
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java
deleted file mode 100644
index 84211ab..0000000
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.transaction;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-
-public interface TransactionProvider {
-    public PhoenixTransactionContext getTransactionContext();
-    public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) 
throws IOException;
-    public PhoenixTransactionContext getTransactionContext(PhoenixConnection 
connection);
-    public PhoenixTransactionContext 
getTransactionContext(PhoenixTransactionContext contex, PhoenixConnection 
connection, boolean subTask);
-    
-    public PhoenixTransactionalTable 
getTransactionalTable(PhoenixTransactionContext ctx, Table htable);
-    
-    public Cell newDeleteFamilyMarker(byte[] row, byte[] family, long 
timestamp);
-    public Cell newDeleteColumnMarker(byte[] row, byte[] family, byte[] 
qualifier, long timestamp);
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index a0912f3..e85732a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -87,7 +87,6 @@ import org.apache.phoenix.schema.RowKeyValueAccessor;
 import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.ValueBitSet;
 import org.apache.phoenix.schema.types.PDataType;
-import org.apache.phoenix.transaction.TransactionFactory;
 
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
@@ -1514,7 +1513,7 @@ public class PhoenixRuntime {
      * @return wall clock time in milliseconds (i.e. Epoch time) of a given 
Cell time stamp.
      */
     public static long getWallClockTimeFromCellTimeStamp(long tsOfCell) {
-        return 
TransactionFactory.getTransactionProvider().getTransactionContext().isPreExistingVersion(tsOfCell)
 ? tsOfCell : TransactionUtil.convertToMilliseconds(tsOfCell);
+        return TransactionUtil.isTransactionalTimestamp(tsOfCell) ? 
TransactionUtil.convertToMilliseconds(tsOfCell) : tsOfCell;
     }
 
     public static long getCurrentScn(ReadOnlyProps props) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index f0bc2d6..914417c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -59,6 +59,7 @@ import org.apache.phoenix.filter.DistinctPrefixFilter;
 import org.apache.phoenix.filter.MultiEncodedCQKeyValueComparisonFilter;
 import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.VersionUtil;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.KeyRange.Bound;
@@ -88,6 +89,8 @@ import com.google.common.collect.Lists;
  */
 public class ScanUtil {
     public static final int[] SINGLE_COLUMN_SLOT_SPAN = new int[1];
+    public static final int UNKNOWN_CLIENT_VERSION = 
VersionUtil.encodeVersion(4, 4, 0);
+
     /*
      * Max length that we fill our key when we turn an inclusive key
      * into a exclusive key.
@@ -931,5 +934,17 @@ public class ScanUtil {
     public static boolean isIndexRebuild(Scan scan) {
         return scan.getAttribute((BaseScannerRegionObserver.REBUILD_INDEXES)) 
!= null;
     }
+ 
+    public static int getClientVersion(Scan scan) {
+        int clientVersion = UNKNOWN_CLIENT_VERSION;
+        byte[] clientVersionBytes = 
scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
+        if (clientVersionBytes != null) {
+            clientVersion = Bytes.toInt(clientVersionBytes);
+        }
+        return clientVersion;
+    }
     
+    public static void setClientVersion(Scan scan, int version) {
+        scan.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, 
Bytes.toBytes(version));
+    }
 }

Reply via email to