http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java deleted file mode 100644 index deceac6..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java +++ /dev/null @@ -1,798 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.transaction; - -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.CompareOperator; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Append; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Increment; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; -import org.apache.hadoop.hbase.client.Row; -import org.apache.hadoop.hbase.client.RowMutations; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.TableBuilder; -import org.apache.hadoop.hbase.client.TableDescriptor; -import org.apache.hadoop.hbase.client.coprocessor.Batch; -import org.apache.hadoop.hbase.filter.CompareFilter; -import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; - -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import com.google.protobuf.Service; -import com.google.protobuf.ServiceException; - -public interface PhoenixTransactionalTable extends Table { - /** - * Gets the fully qualified table name instance of this table. - */ - TableName getName(); - - /** - * Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance. - * <p> - * The reference returned is not a copy, so any change made to it will - * affect this instance. - */ - Configuration getConfiguration(); - - /** - * Gets the {@link org.apache.hadoop.hbase.HTableDescriptor table descriptor} for this table. - * @throws java.io.IOException if a remote or network exception occurs. - * @deprecated since 2.0 version and will be removed in 3.0 version. - * use {@link #getDescriptor()} - */ - @Deprecated - HTableDescriptor getTableDescriptor() throws IOException; - - /** - * Gets the {@link org.apache.hadoop.hbase.client.TableDescriptor table descriptor} for this table. - * @throws java.io.IOException if a remote or network exception occurs. - */ - TableDescriptor getDescriptor() throws IOException; - - /** - * Test for the existence of columns in the table, as specified by the Get. - * <p> - * - * This will return true if the Get matches one or more keys, false if not. - * <p> - * - * This is a server-side call so it prevents any data from being transfered to - * the client. - * - * @param get the Get - * @return true if the specified Get matches one or more keys, false if not - * @throws IOException e - */ - boolean exists(Get get) throws IOException; - - /** - * Test for the existence of columns in the table, as specified by the Gets. - * <p> - * - * This will return an array of booleans. Each value will be true if the related Get matches - * one or more keys, false if not. - * <p> - * - * This is a server-side call so it prevents any data from being transferred to - * the client. - * - * @param gets the Gets - * @return Array of boolean. True if the specified Get matches one or more keys, false if not. - * @throws IOException e - */ - boolean[] exists(List<Get> gets) throws IOException; - - /** - * Test for the existence of columns in the table, as specified by the Gets. - * This will return an array of booleans. Each value will be true if the related Get matches - * one or more keys, false if not. - * This is a server-side call so it prevents any data from being transferred to - * the client. - * - * @param gets the Gets - * @return Array of boolean. True if the specified Get matches one or more keys, false if not. - * @throws IOException e - * @deprecated since 2.0 version and will be removed in 3.0 version. - * use {@link #exists(List)} - */ - @Deprecated - default boolean[] existsAll(List<Get> gets) throws IOException { - return exists(gets); - } - - /** - * Method that does a batch call on Deletes, Gets, Puts, Increments, Appends, RowMutations. - * The ordering of execution of the actions is not defined. Meaning if you do a Put and a - * Get in the same {@link #batch} call, you will not necessarily be - * guaranteed that the Get returns what the Put had put. - * - * @param actions list of Get, Put, Delete, Increment, Append, RowMutations. - * @param results Empty Object[], same size as actions. Provides access to partial - * results, in case an exception is thrown. A null in the result array means that - * the call for that action failed, even after retries. The order of the objects - * in the results array corresponds to the order of actions in the request list. - * @throws IOException - * @since 0.90.0 - */ - void batch(final List<? extends Row> actions, final Object[] results) throws IOException, - InterruptedException; - - /** - * Same as {@link #batch(List, Object[])}, but with a callback. - * @since 0.96.0 - */ - <R> void batchCallback( - final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback - ) throws IOException, InterruptedException; - - /** - * Extracts certain cells from a given row. - * @param get The object that specifies what data to fetch and from which row. - * @return The data coming from the specified row, if it exists. If the row - * specified doesn't exist, the {@link Result} instance returned won't - * contain any {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. - * @throws IOException if a remote or network exception occurs. - * @since 0.20.0 - */ - Result get(Get get) throws IOException; - - /** - * Extracts specified cells from the given rows, as a batch. - * - * @param gets The objects that specify what data to fetch and from which rows. - * @return The data coming from the specified rows, if it exists. If the row specified doesn't - * exist, the {@link Result} instance returned won't contain any {@link - * org.apache.hadoop.hbase.Cell}s, as indicated by {@link Result#isEmpty()}. If there are any - * failures even after retries, there will be a <code>null</code> in the results' array for those - * Gets, AND an exception will be thrown. The ordering of the Result array corresponds to the order - * of the list of passed in Gets. - * @throws IOException if a remote or network exception occurs. - * @since 0.90.0 - * @apiNote {@link #put(List)} runs pre-flight validations on the input list on client. - * Currently {@link #get(List)} doesn't run any validations on the client-side, currently there - * is no need, but this may change in the future. An - * {@link IllegalArgumentException} will be thrown in this case. - */ - Result[] get(List<Get> gets) throws IOException; - - /** - * Returns a scanner on the current table as specified by the {@link Scan} - * object. - * Note that the passed {@link Scan}'s start row and caching properties - * maybe changed. - * - * @param scan A configured {@link Scan} object. - * @return A scanner. - * @throws IOException if a remote or network exception occurs. - * @since 0.20.0 - */ - ResultScanner getScanner(Scan scan) throws IOException; - - /** - * Gets a scanner on the current table for the given family. - * - * @param family The column family to scan. - * @return A scanner. - * @throws IOException if a remote or network exception occurs. - * @since 0.20.0 - */ - ResultScanner getScanner(byte[] family) throws IOException; - - /** - * Gets a scanner on the current table for the given family and qualifier. - * - * @param family The column family to scan. - * @param qualifier The column qualifier to scan. - * @return A scanner. - * @throws IOException if a remote or network exception occurs. - * @since 0.20.0 - */ - ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException; - - - /** - * Puts some data in the table. - * - * @param put The data to put. - * @throws IOException if a remote or network exception occurs. - * @since 0.20.0 - */ - void put(Put put) throws IOException; - - /** - * Batch puts the specified data into the table. - * <p> - * This can be used for group commit, or for submitting user defined batches. Before sending - * a batch of mutations to the server, the client runs a few validations on the input list. If an - * error is found, for example, a mutation was supplied but was missing it's column an - * {@link IllegalArgumentException} will be thrown and no mutations will be applied. If there - * are any failures even after retries, a {@link RetriesExhaustedWithDetailsException} will be - * thrown. RetriesExhaustedWithDetailsException contains lists of failed mutations and - * corresponding remote exceptions. The ordering of mutations and exceptions in the - * encapsulating exception corresponds to the order of the input list of Put requests. - * - * @param puts The list of mutations to apply. - * @throws IOException if a remote or network exception occurs. - * @since 0.20.0 - */ - void put(List<Put> puts) throws IOException; - - /** - * Atomically checks if a row/family/qualifier value matches the expected - * value. If it does, it adds the put. If the passed value is null, the check - * is for the lack of column (ie: non-existance) - * - * @param row to check - * @param family column family to check - * @param qualifier column qualifier to check - * @param value the expected value - * @param put data to put if check succeeds - * @throws IOException e - * @return true if the new put was executed, false otherwise - */ - boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, - byte[] value, Put put) throws IOException; - - /** - * Atomically checks if a row/family/qualifier value matches the expected - * value. If it does, it adds the put. If the passed value is null, the check - * is for the lack of column (ie: non-existence) - * - * The expected value argument of this call is on the left and the current - * value of the cell is on the right side of the comparison operator. - * - * Ie. eg. GREATER operator means expected value > existing <=> add the put. - * - * @param row to check - * @param family column family to check - * @param qualifier column qualifier to check - * @param compareOp comparison operator to use - * @param value the expected value - * @param put data to put if check succeeds - * @throws IOException e - * @return true if the new put was executed, false otherwise - * @deprecated Since 2.0.0. Will be removed in 3.0.0. Use - * {@link #checkAndPut(byte[], byte[], byte[], CompareOperator, byte[], Put)}} - */ - @Deprecated - boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, - CompareFilter.CompareOp compareOp, byte[] value, Put put) throws IOException; - - /** - * Atomically checks if a row/family/qualifier value matches the expected - * value. If it does, it adds the put. If the passed value is null, the check - * is for the lack of column (ie: non-existence) - * - * The expected value argument of this call is on the left and the current - * value of the cell is on the right side of the comparison operator. - * - * Ie. eg. GREATER operator means expected value > existing <=> add the put. - * - * @param row to check - * @param family column family to check - * @param qualifier column qualifier to check - * @param op comparison operator to use - * @param value the expected value - * @param put data to put if check succeeds - * @throws IOException e - * @return true if the new put was executed, false otherwise - */ - boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, - CompareOperator op, byte[] value, Put put) throws IOException; - - /** - * Deletes the specified cells/row. - * - * @param delete The object that specifies what to delete. - * @throws IOException if a remote or network exception occurs. - * @since 0.20.0 - */ - void delete(Delete delete) throws IOException; - - /** - * Batch Deletes the specified cells/rows from the table. - * <p> - * If a specified row does not exist, {@link Delete} will report as though sucessful - * delete; no exception will be thrown. If there are any failures even after retries, - * a * {@link RetriesExhaustedWithDetailsException} will be thrown. - * RetriesExhaustedWithDetailsException contains lists of failed {@link Delete}s and - * corresponding remote exceptions. - * - * @param deletes List of things to delete. The input list gets modified by this - * method. All successfully applied {@link Delete}s in the list are removed (in particular it - * gets re-ordered, so the order in which the elements are inserted in the list gives no - * guarantee as to the order in which the {@link Delete}s are executed). - * @throws IOException if a remote or network exception occurs. In that case - * the {@code deletes} argument will contain the {@link Delete} instances - * that have not be successfully applied. - * @since 0.20.1 - * @apiNote In 3.0.0 version, the input list {@code deletes} will no longer be modified. Also, - * {@link #put(List)} runs pre-flight validations on the input list on client. Currently - * {@link #delete(List)} doesn't run validations on the client, there is no need currently, - * but this may change in the future. An * {@link IllegalArgumentException} will be thrown - * in this case. - */ - void delete(List<Delete> deletes) throws IOException; - - /** - * Atomically checks if a row/family/qualifier value matches the expected - * value. If it does, it adds the delete. If the passed value is null, the - * check is for the lack of column (ie: non-existance) - * - * @param row to check - * @param family column family to check - * @param qualifier column qualifier to check - * @param value the expected value - * @param delete data to delete if check succeeds - * @throws IOException e - * @return true if the new delete was executed, false otherwise - */ - boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, - byte[] value, Delete delete) throws IOException; - - /** - * Atomically checks if a row/family/qualifier value matches the expected - * value. If it does, it adds the delete. If the passed value is null, the - * check is for the lack of column (ie: non-existence) - * - * The expected value argument of this call is on the left and the current - * value of the cell is on the right side of the comparison operator. - * - * Ie. eg. GREATER operator means expected value > existing <=> add the delete. - * - * @param row to check - * @param family column family to check - * @param qualifier column qualifier to check - * @param compareOp comparison operator to use - * @param value the expected value - * @param delete data to delete if check succeeds - * @throws IOException e - * @return true if the new delete was executed, false otherwise - * @deprecated Since 2.0.0. Will be removed in 3.0.0. Use - * {@link #checkAndDelete(byte[], byte[], byte[], byte[], Delete)} - */ - @Deprecated - boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, - CompareFilter.CompareOp compareOp, byte[] value, Delete delete) throws IOException; - - /** - * Atomically checks if a row/family/qualifier value matches the expected - * value. If it does, it adds the delete. If the passed value is null, the - * check is for the lack of column (ie: non-existence) - * - * The expected value argument of this call is on the left and the current - * value of the cell is on the right side of the comparison operator. - * - * Ie. eg. GREATER operator means expected value > existing <=> add the delete. - * - * @param row to check - * @param family column family to check - * @param qualifier column qualifier to check - * @param op comparison operator to use - * @param value the expected value - * @param delete data to delete if check succeeds - * @throws IOException e - * @return true if the new delete was executed, false otherwise - */ - boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, - CompareOperator op, byte[] value, Delete delete) throws IOException; - - /** - * Performs multiple mutations atomically on a single row. Currently - * {@link Put} and {@link Delete} are supported. - * - * @param rm object that specifies the set of mutations to perform atomically - * @throws IOException - */ - void mutateRow(final RowMutations rm) throws IOException; - - /** - * Appends values to one or more columns within a single row. - * <p> - * This operation guaranteed atomicity to readers. Appends are done - * under a single row lock, so write operations to a row are synchronized, and - * readers are guaranteed to see this operation fully completed. - * - * @param append object that specifies the columns and amounts to be used - * for the increment operations - * @throws IOException e - * @return values of columns after the append operation (maybe null) - */ - Result append(final Append append) throws IOException; - - /** - * Increments one or more columns within a single row. - * <p> - * This operation ensures atomicity to readers. Increments are done - * under a single row lock, so write operations to a row are synchronized, and - * readers are guaranteed to see this operation fully completed. - * - * @param increment object that specifies the columns and amounts to be used - * for the increment operations - * @throws IOException e - * @return values of columns after the increment - */ - Result increment(final Increment increment) throws IOException; - - /** - * See {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)} - * <p> - * The {@link Durability} is defaulted to {@link Durability#SYNC_WAL}. - * @param row The row that contains the cell to increment. - * @param family The column family of the cell to increment. - * @param qualifier The column qualifier of the cell to increment. - * @param amount The amount to increment the cell with (or decrement, if the - * amount is negative). - * @return The new value, post increment. - * @throws IOException if a remote or network exception occurs. - */ - long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, - long amount) throws IOException; - - /** - * Atomically increments a column value. If the column value already exists - * and is not a big-endian long, this could throw an exception. If the column - * value does not yet exist it is initialized to <code>amount</code> and - * written to the specified column. - * - * <p>Setting durability to {@link Durability#SKIP_WAL} means that in a fail - * scenario you will lose any increments that have not been flushed. - * @param row The row that contains the cell to increment. - * @param family The column family of the cell to increment. - * @param qualifier The column qualifier of the cell to increment. - * @param amount The amount to increment the cell with (or decrement, if the - * amount is negative). - * @param durability The persistence guarantee for this increment. - * @return The new value, post increment. - * @throws IOException if a remote or network exception occurs. - */ - long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, - long amount, Durability durability) throws IOException; - - /** - * Releases any resources held or pending changes in internal buffers. - * - * @throws IOException if a remote or network exception occurs. - */ - @Override - void close() throws IOException; - - /** - * Creates and returns a {@link com.google.protobuf.RpcChannel} instance connected to the - * table region containing the specified row. The row given does not actually have - * to exist. Whichever region would contain the row based on start and end keys will - * be used. Note that the {@code row} parameter is also not passed to the - * coprocessor handler registered for this protocol, unless the {@code row} - * is separately passed as an argument in the service request. The parameter - * here is only used to locate the region used to handle the call. - * - * <p> - * The obtained {@link com.google.protobuf.RpcChannel} instance can be used to access a published - * coprocessor {@link com.google.protobuf.Service} using standard protobuf service invocations: - * </p> - * - * <div style="background-color: #cccccc; padding: 2px"> - * <blockquote><pre> - * CoprocessorRpcChannel channel = myTable.coprocessorService(rowkey); - * MyService.BlockingInterface service = MyService.newBlockingStub(channel); - * MyCallRequest request = MyCallRequest.newBuilder() - * ... - * .build(); - * MyCallResponse response = service.myCall(null, request); - * </pre></blockquote></div> - * - * @param row The row key used to identify the remote region location - * @return A CoprocessorRpcChannel instance - */ - CoprocessorRpcChannel coprocessorService(byte[] row); - - /** - * Creates an instance of the given {@link com.google.protobuf.Service} subclass for each table - * region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive), and - * invokes the passed {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} method - * with each {@link com.google.protobuf.Service} instance. - * - * @param service the protocol buffer {@code Service} implementation to call - * @param startKey start region selection with region containing this row. If {@code null}, the - * selection will start with the first table region. - * @param endKey select regions up to and including the region containing this row. If {@code - * null}, selection will continue through the last table region. - * @param callable this instance's {@link org.apache.hadoop.hbase.client.coprocessor.Batch - * .Call#call} - * method will be invoked once per table region, using the {@link com.google.protobuf.Service} - * instance connected to that region. - * @param <T> the {@link com.google.protobuf.Service} subclass to connect to - * @param <R> Return type for the {@code callable} parameter's {@link - * org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} method - * @return a map of result values keyed by region name - */ - <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service, - byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable) - throws ServiceException, Throwable; - - /** - * Creates an instance of the given {@link com.google.protobuf.Service} subclass for each table - * region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive), and - * invokes the passed {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} method - * with each {@link Service} instance. - * - * <p> The given {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Callback#update(byte[], - * byte[], Object)} method will be called with the return value from each region's {@link - * org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} invocation. </p> - * - * @param service the protocol buffer {@code Service} implementation to call - * @param startKey start region selection with region containing this row. If {@code null}, the - * selection will start with the first table region. - * @param endKey select regions up to and including the region containing this row. If {@code - * null}, selection will continue through the last table region. - * @param callable this instance's {@link org.apache.hadoop.hbase.client.coprocessor.Batch - * .Call#call} - * method will be invoked once per table region, using the {@link Service} instance connected to - * that region. - * @param callback - * @param <T> the {@link Service} subclass to connect to - * @param <R> Return type for the {@code callable} parameter's {@link - * org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} method - */ - <T extends Service, R> void coprocessorService(final Class<T> service, - byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable, - final Batch.Callback<R> callback) throws ServiceException, Throwable; - - /** - * Creates an instance of the given {@link com.google.protobuf.Service} subclass for each table - * region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive), all - * the invocations to the same region server will be batched into one call. The coprocessor - * service is invoked according to the service instance, method name and parameters. - * - * @param methodDescriptor - * the descriptor for the protobuf service method to call. - * @param request - * the method call parameters - * @param startKey - * start region selection with region containing this row. If {@code null}, the - * selection will start with the first table region. - * @param endKey - * select regions up to and including the region containing this row. If {@code null}, - * selection will continue through the last table region. - * @param responsePrototype - * the proto type of the response of the method in Service. - * @param <R> - * the response type for the coprocessor Service method - * @throws ServiceException - * @throws Throwable - * @return a map of result values keyed by region name - */ - <R extends Message> Map<byte[], R> batchCoprocessorService( - Descriptors.MethodDescriptor methodDescriptor, Message request, - byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable; - - /** - * Creates an instance of the given {@link com.google.protobuf.Service} subclass for each table - * region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive), all - * the invocations to the same region server will be batched into one call. The coprocessor - * service is invoked according to the service instance, method name and parameters. - * - * <p> - * The given - * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Callback#update(byte[],byte[],Object)} - * method will be called with the return value from each region's invocation. - * </p> - * - * @param methodDescriptor - * the descriptor for the protobuf service method to call. - * @param request - * the method call parameters - * @param startKey - * start region selection with region containing this row. If {@code null}, the - * selection will start with the first table region. - * @param endKey - * select regions up to and including the region containing this row. If {@code null}, - * selection will continue through the last table region. - * @param responsePrototype - * the proto type of the response of the method in Service. - * @param callback - * callback to invoke with the response for each region - * @param <R> - * the response type for the coprocessor Service method - * @throws ServiceException - * @throws Throwable - */ - <R extends Message> void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, - Message request, byte[] startKey, byte[] endKey, R responsePrototype, - Batch.Callback<R> callback) throws ServiceException, Throwable; - - /** - * Atomically checks if a row/family/qualifier value matches the expected value. - * If it does, it performs the row mutations. If the passed value is null, the check - * is for the lack of column (ie: non-existence) - * - * The expected value argument of this call is on the left and the current - * value of the cell is on the right side of the comparison operator. - * - * Ie. eg. GREATER operator means expected value > existing <=> perform row mutations. - * - * @param row to check - * @param family column family to check - * @param qualifier column qualifier to check - * @param compareOp the comparison operator - * @param value the expected value - * @param mutation mutations to perform if check succeeds - * @throws IOException e - * @return true if the new put was executed, false otherwise - * @deprecated Since 2.0.0. Will be removed in 3.0.0. Use - * {@link #checkAndMutate(byte[], byte[], byte[], CompareOperator, byte[], RowMutations)} - */ - @Deprecated - boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, - CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException; - - /** - * Atomically checks if a row/family/qualifier value matches the expected value. - * If it does, it performs the row mutations. If the passed value is null, the check - * is for the lack of column (ie: non-existence) - * - * The expected value argument of this call is on the left and the current - * value of the cell is on the right side of the comparison operator. - * - * Ie. eg. GREATER operator means expected value > existing <=> perform row mutations. - * - * @param row to check - * @param family column family to check - * @param qualifier column qualifier to check - * @param op the comparison operator - * @param value the expected value - * @param mutation mutations to perform if check succeeds - * @throws IOException e - * @return true if the new put was executed, false otherwise - */ - boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, - byte[] value, RowMutations mutation) throws IOException; - - /** - * Get timeout of each rpc request in this Table instance. It will be overridden by a more - * specific rpc timeout config such as readRpcTimeout or writeRpcTimeout. - * @see #getReadRpcTimeout(TimeUnit) - * @see #getWriteRpcTimeout(TimeUnit) - * @param unit the unit of time the timeout to be represented in - * @return rpc timeout in the specified time unit - */ - long getRpcTimeout(TimeUnit unit); - - /** - * Get timeout (millisecond) of each rpc request in this Table instance. - * - * @return Currently configured read timeout - * @deprecated use {@link #getReadRpcTimeout(TimeUnit)} or - * {@link #getWriteRpcTimeout(TimeUnit)} instead - */ - @Deprecated - int getRpcTimeout(); - - /** - * Set timeout (millisecond) of each rpc request in operations of this Table instance, will - * override the value of hbase.rpc.timeout in configuration. - * If a rpc request waiting too long, it will stop waiting and send a new request to retry until - * retries exhausted or operation timeout reached. - * <p> - * NOTE: This will set both the read and write timeout settings to the provided value. - * - * @param rpcTimeout the timeout of each rpc request in millisecond. - * - * @deprecated Use setReadRpcTimeout or setWriteRpcTimeout instead - */ - @Deprecated - void setRpcTimeout(int rpcTimeout); - - /** - * Get timeout of each rpc read request in this Table instance. - * @param unit the unit of time the timeout to be represented in - * @return read rpc timeout in the specified time unit - */ - long getReadRpcTimeout(TimeUnit unit); - - /** - * Get timeout (millisecond) of each rpc read request in this Table instance. - * @deprecated since 2.0 and will be removed in 3.0 version - * use {@link #getReadRpcTimeout(TimeUnit)} instead - */ - @Deprecated - int getReadRpcTimeout(); - - /** - * Set timeout (millisecond) of each rpc read request in operations of this Table instance, will - * override the value of hbase.rpc.read.timeout in configuration. - * If a rpc read request waiting too long, it will stop waiting and send a new request to retry - * until retries exhausted or operation timeout reached. - * - * @param readRpcTimeout the timeout for read rpc request in milliseconds - * @deprecated since 2.0.0, use {@link TableBuilder#setReadRpcTimeout} instead - */ - @Deprecated - void setReadRpcTimeout(int readRpcTimeout); - - /** - * Get timeout of each rpc write request in this Table instance. - * @param unit the unit of time the timeout to be represented in - * @return write rpc timeout in the specified time unit - */ - long getWriteRpcTimeout(TimeUnit unit); - - /** - * Get timeout (millisecond) of each rpc write request in this Table instance. - * @deprecated since 2.0 and will be removed in 3.0 version - * use {@link #getWriteRpcTimeout(TimeUnit)} instead - */ - @Deprecated - int getWriteRpcTimeout(); - - /** - * Set timeout (millisecond) of each rpc write request in operations of this Table instance, will - * override the value of hbase.rpc.write.timeout in configuration. - * If a rpc write request waiting too long, it will stop waiting and send a new request to retry - * until retries exhausted or operation timeout reached. - * - * @param writeRpcTimeout the timeout for write rpc request in milliseconds - * @deprecated since 2.0.0, use {@link TableBuilder#setWriteRpcTimeout} instead - */ - @Deprecated - void setWriteRpcTimeout(int writeRpcTimeout); - - /** - * Get timeout of each operation in Table instance. - * @param unit the unit of time the timeout to be represented in - * @return operation rpc timeout in the specified time unit - */ - long getOperationTimeout(TimeUnit unit); - - /** - * Get timeout (millisecond) of each operation for in Table instance. - * @deprecated since 2.0 and will be removed in 3.0 version - * use {@link #getOperationTimeout(TimeUnit)} instead - */ - @Deprecated - int getOperationTimeout(); - - /** - * Set timeout (millisecond) of each operation in this Table instance, will override the value - * of hbase.client.operation.timeout in configuration. - * Operation timeout is a top-level restriction that makes sure a blocking method will not be - * blocked more than this. In each operation, if rpc request fails because of timeout or - * other reason, it will retry until success or throw a RetriesExhaustedException. But if the - * total time being blocking reach the operation timeout before retries exhausted, it will break - * early and throw SocketTimeoutException. - * @param operationTimeout the total timeout of each operation in millisecond. - * @deprecated since 2.0.0, use {@link TableBuilder#setOperationTimeout} instead - */ - @Deprecated - void setOperationTimeout(int operationTimeout); -}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java index 8d19ae5..add403a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java @@ -19,21 +19,21 @@ package org.apache.phoenix.transaction; import java.io.IOException; import java.sql.SQLException; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.transaction.TephraTransactionProvider.TephraTransactionClient; import org.apache.tephra.Transaction; import org.apache.tephra.Transaction.VisibilityLevel; import org.apache.tephra.TransactionAware; @@ -41,48 +41,24 @@ import org.apache.tephra.TransactionCodec; import org.apache.tephra.TransactionConflictException; import org.apache.tephra.TransactionContext; import org.apache.tephra.TransactionFailureException; -import org.apache.tephra.TransactionManager; import org.apache.tephra.TransactionSystemClient; import org.apache.tephra.TxConstants; -import org.apache.tephra.distributed.PooledClientProvider; -import org.apache.tephra.distributed.TransactionService; -import org.apache.tephra.distributed.TransactionServiceClient; -import org.apache.tephra.hbase.coprocessor.TransactionProcessor; -import org.apache.tephra.inmemory.InMemoryTxSystemClient; -import org.apache.tephra.metrics.TxMetricsCollector; -import org.apache.tephra.persist.HDFSTransactionStateStorage; -import org.apache.tephra.snapshot.SnapshotCodecProvider; -import org.apache.tephra.util.TxUtils; import org.apache.tephra.visibility.FenceWait; import org.apache.tephra.visibility.VisibilityFence; -import org.apache.tephra.zookeeper.TephraZKClientService; -import org.apache.twill.discovery.DiscoveryService; -import org.apache.twill.discovery.ZKDiscoveryService; -import org.apache.twill.internal.utils.Networks; -import org.apache.twill.zookeeper.RetryStrategies; -import org.apache.twill.zookeeper.ZKClientService; -import org.apache.twill.zookeeper.ZKClientServices; -import org.apache.twill.zookeeper.ZKClients; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; -import com.google.inject.util.Providers; -public class TephraTransactionContext implements PhoenixTransactionContext { +public class TephraTransactionContext implements PhoenixTransactionContext { + private static final Logger logger = LoggerFactory.getLogger(TephraTransactionContext.class); private static final TransactionCodec CODEC = new TransactionCodec(); - private static TransactionSystemClient txClient = null; - private static ZKClientService zkClient = null; - private static TransactionService txService = null; - private static TransactionManager txManager = null; - private final List<TransactionAware> txAwares; private final TransactionContext txContext; private Transaction tx; private TransactionSystemClient txServiceClient; - private TransactionFailureException e; public TephraTransactionContext() { this.txServiceClient = null; @@ -92,21 +68,21 @@ public class TephraTransactionContext implements PhoenixTransactionContext { public TephraTransactionContext(byte[] txnBytes) throws IOException { this(); - this.tx = (txnBytes != null && txnBytes.length > 0) ? CODEC - .decode(txnBytes) : null; + this.tx = CODEC.decode(txnBytes); } public TephraTransactionContext(PhoenixConnection connection) { - this.txServiceClient = txClient; + PhoenixTransactionClient client = connection.getQueryServices().initTransactionClient(getProvider()); + assert (client instanceof TephraTransactionClient); + this.txServiceClient = ((TephraTransactionClient)client).getTransactionClient(); this.txAwares = Collections.emptyList(); this.txContext = new TransactionContext(txServiceClient); } - public TephraTransactionContext(PhoenixTransactionContext ctx, - PhoenixConnection connection, boolean subTask) { - this.txServiceClient = txClient; + private TephraTransactionContext(PhoenixTransactionContext ctx, boolean subTask) { assert (ctx instanceof TephraTransactionContext); TephraTransactionContext tephraTransactionContext = (TephraTransactionContext) ctx; + this.txServiceClient = tephraTransactionContext.txServiceClient; if (subTask) { this.tx = tephraTransactionContext.getTransaction(); @@ -116,42 +92,13 @@ public class TephraTransactionContext implements PhoenixTransactionContext { this.txAwares = Collections.emptyList(); this.txContext = tephraTransactionContext.getContext(); } - - this.e = null; - } - - @Override - public void setInMemoryTransactionClient(Configuration config) { - TransactionManager txnManager = new TransactionManager(config); - txClient = this.txServiceClient = new InMemoryTxSystemClient(txnManager); } @Override - public ZKClientService setTransactionClient(Configuration config, ReadOnlyProps props, ConnectionInfo connectionInfo) { - String zkQuorumServersString = props.get(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM); - if (zkQuorumServersString==null) { - zkQuorumServersString = connectionInfo.getZookeeperQuorum()+":"+connectionInfo.getPort(); - } - - int timeOut = props.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); - // Create instance of the tephra zookeeper client - ZKClientService txZKClientService = ZKClientServices.delegate( - ZKClients.reWatchOnExpire( - ZKClients.retryOnFailure( - new TephraZKClientService(zkQuorumServersString, timeOut, null, - ArrayListMultimap.<String, byte[]>create()), - RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)) - ) - ); - txZKClientService.startAndWait(); - ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(txZKClientService); - PooledClientProvider pooledClientProvider = new PooledClientProvider( - config, zkDiscoveryService); - txClient = this.txServiceClient = new TransactionServiceClient(config,pooledClientProvider); - - return txZKClientService; + public TransactionFactory.Provider getProvider() { + return TransactionFactory.Provider.TEPHRA; } - + @Override public void begin() throws SQLException { if (txContext == null) { @@ -180,8 +127,6 @@ public class TephraTransactionContext implements PhoenixTransactionContext { try { txContext.finish(); } catch (TransactionFailureException e) { - this.e = e; - if (e instanceof TransactionConflictException) { throw new SQLExceptionInfo.Builder( SQLExceptionCode.TRANSACTION_CONFLICT_EXCEPTION) @@ -203,14 +148,8 @@ public class TephraTransactionContext implements PhoenixTransactionContext { } try { - if (e != null) { - txContext.abort(e); - e = null; - } else { - txContext.abort(); - } + txContext.abort(); } catch (TransactionFailureException e) { - this.e = null; throw new SQLExceptionInfo.Builder( SQLExceptionCode.TRANSACTION_FAILED) .setMessage(e.getMessage()).setRootCause(e).build() @@ -248,7 +187,7 @@ public class TephraTransactionContext implements PhoenixTransactionContext { } @Override - public void commitDDLFence(PTable dataTable, Logger logger) + public void commitDDLFence(PTable dataTable) throws SQLException { byte[] key = dataTable.getName().getBytes(); @@ -275,7 +214,12 @@ public class TephraTransactionContext implements PhoenixTransactionContext { } } + @Override public void markDMLFence(PTable table) { + if (table.getType() == PTableType.INDEX) { + return; + } + byte[] logicalKey = table.getName().getBytes(); TransactionAware logicalTxAware = VisibilityFence.create(logicalKey); @@ -299,6 +243,9 @@ public class TephraTransactionContext implements PhoenixTransactionContext { @Override public void join(PhoenixTransactionContext ctx) { + if (ctx == PhoenixTransactionContext.NULL_CONTEXT) { + return; + } assert (ctx instanceof TephraTransactionContext); TephraTransactionContext tephraContext = (TephraTransactionContext) ctx; @@ -324,7 +271,6 @@ public class TephraTransactionContext implements PhoenixTransactionContext { public void reset() { tx = null; txAwares.clear(); - this.e = null; } @Override @@ -405,86 +351,15 @@ public class TephraTransactionContext implements PhoenixTransactionContext { assert (tx != null); try { - return CODEC.encode(tx); + byte[] encodedTxBytes = CODEC.encode(tx); + encodedTxBytes = Arrays.copyOf(encodedTxBytes, encodedTxBytes.length + 1); + encodedTxBytes[encodedTxBytes.length - 1] = getProvider().getCode(); + return encodedTxBytes; } catch (IOException e) { throw new SQLException(e); } } - @Override - public long getMaxTransactionsPerSecond() { - return TxConstants.MAX_TX_PER_MS; - } - - @Override - public boolean isPreExistingVersion(long version) { - return TxUtils.isPreExistingVersion(version); - } - - @Override - public RegionObserver getCoprocessor() { - return new TransactionProcessor(); - } - - @Override - public byte[] getFamilyDeleteMarker() { - return TxConstants.FAMILY_DELETE_QUALIFIER; - } - - @Override - public void setTxnConfigs(Configuration config, String tmpFolder, int defaultTxnTimeoutSeconds) throws IOException { - config.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false); - config.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times"); - config.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1); - config.setInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT, Networks.getRandomPort()); - config.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder); - config.setInt(TxConstants.Manager.CFG_TX_TIMEOUT, defaultTxnTimeoutSeconds); - config.unset(TxConstants.Manager.CFG_TX_HDFS_USER); - config.setLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL, 5L); - } - - @Override - public void setupTxManager(Configuration config, String url) throws SQLException { - - if (txService != null) { - return; - } - - ConnectionInfo connInfo = ConnectionInfo.create(url); - zkClient = ZKClientServices.delegate( - ZKClients.reWatchOnExpire( - ZKClients.retryOnFailure( - ZKClientService.Builder.of(connInfo.getZookeeperConnectionString()) - .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT, - HConstants.DEFAULT_ZK_SESSION_TIMEOUT)) - .build(), - RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS) - ) - ) - ); - zkClient.startAndWait(); - - DiscoveryService discovery = new ZKDiscoveryService(zkClient); - txManager = new TransactionManager(config, new HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config), new TxMetricsCollector()), new TxMetricsCollector()); - txService = new TransactionService(config, zkClient, discovery, Providers.of(txManager)); - txService.startAndWait(); - } - - @Override - public void tearDownTxManager() { - try { - if (txService != null) txService.stopAndWait(); - } finally { - try { - if (zkClient != null) zkClient.stopAndWait(); - } finally { - txService = null; - zkClient = null; - txManager = null; - } - } - } - /** * TephraTransactionContext specific functions */ @@ -510,4 +385,17 @@ public class TephraTransactionContext implements PhoenixTransactionContext { txAware.startTx(tx); } } + + @Override + public PhoenixTransactionContext newTransactionContext(PhoenixTransactionContext context, boolean subTask) { + return new TephraTransactionContext(context, subTask); + } + + @Override + public Table getTransactionalTable(Table htable, boolean isImmutable) { + TransactionAwareHTable transactionAwareHTable = new TransactionAwareHTable(htable, isImmutable ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW); + this.addTransactionAware(transactionAwareHTable); + return transactionAwareHTable; + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java index 075c922..2e52efa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java @@ -18,16 +18,37 @@ package org.apache.phoenix.transaction; import java.io.IOException; +import java.util.concurrent.TimeUnit; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.phoenix.coprocessor.TephraTransactionalProcessor; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; +import org.apache.phoenix.transaction.TransactionFactory.Provider; +import org.apache.tephra.TransactionManager; +import org.apache.tephra.TransactionSystemClient; import org.apache.tephra.TxConstants; +import org.apache.tephra.distributed.PooledClientProvider; +import org.apache.tephra.distributed.TransactionService; +import org.apache.tephra.distributed.TransactionServiceClient; +import org.apache.tephra.inmemory.InMemoryTxSystemClient; +import org.apache.tephra.metrics.TxMetricsCollector; +import org.apache.tephra.persist.HDFSTransactionStateStorage; +import org.apache.tephra.snapshot.SnapshotCodecProvider; +import org.apache.tephra.zookeeper.TephraZKClientService; +import org.apache.twill.discovery.DiscoveryService; +import org.apache.twill.discovery.ZKDiscoveryService; +import org.apache.twill.zookeeper.RetryStrategies; +import org.apache.twill.zookeeper.ZKClientService; +import org.apache.twill.zookeeper.ZKClientServices; +import org.apache.twill.zookeeper.ZKClients; -public class TephraTransactionProvider implements TransactionProvider { +import com.google.common.collect.ArrayListMultimap; +import com.google.inject.util.Providers; + +public class TephraTransactionProvider implements PhoenixTransactionProvider { private static final TephraTransactionProvider INSTANCE = new TephraTransactionProvider(); public static final TephraTransactionProvider getInstance() { @@ -37,12 +58,6 @@ public class TephraTransactionProvider implements TransactionProvider { private TephraTransactionProvider() { } - - @Override - public PhoenixTransactionContext getTransactionContext() { - return new TephraTransactionContext(); - } - @Override public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException { return new TephraTransactionContext(txnBytes); @@ -54,23 +69,129 @@ public class TephraTransactionProvider implements TransactionProvider { } @Override - public PhoenixTransactionContext getTransactionContext(PhoenixTransactionContext contex, PhoenixConnection connection, boolean subTask) { - return new TephraTransactionContext(contex, connection, subTask); + public PhoenixTransactionClient getTransactionClient(Configuration config, ConnectionInfo connectionInfo) { + if (connectionInfo.isConnectionless()) { + TransactionManager txnManager = new TransactionManager(config); + TransactionSystemClient txClient = new InMemoryTxSystemClient(txnManager); + return new TephraTransactionClient(txClient); + + } + String zkQuorumServersString = config.get(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM); + if (zkQuorumServersString==null) { + zkQuorumServersString = connectionInfo.getZookeeperConnectionString(); + } + + int timeOut = config.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); + // Create instance of the tephra zookeeper client + ZKClientService zkClientService = ZKClientServices.delegate( + ZKClients.reWatchOnExpire( + ZKClients.retryOnFailure( + new TephraZKClientService(zkQuorumServersString, timeOut, null, + ArrayListMultimap.<String, byte[]>create()), + RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)) + ) + ); + //txZKClientService.startAndWait(); + ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(zkClientService); + PooledClientProvider pooledClientProvider = new PooledClientProvider( + config, zkDiscoveryService); + TransactionServiceClient txClient = new TransactionServiceClient(config,pooledClientProvider); + TephraTransactionClient client = new TephraTransactionClient(zkClientService, txClient); + client.start(); + + return client; } @Override - public PhoenixTransactionalTable getTransactionalTable(PhoenixTransactionContext ctx, Table htable) { - return new TephraTransactionTable(ctx, htable); + public PhoenixTransactionService getTransactionService(Configuration config, ConnectionInfo connInfo) { + ZKClientService zkClient = ZKClientServices.delegate( + ZKClients.reWatchOnExpire( + ZKClients.retryOnFailure( + ZKClientService.Builder.of(connInfo.getZookeeperConnectionString()) + .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT, + HConstants.DEFAULT_ZK_SESSION_TIMEOUT)) + .build(), + RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS) + ) + ) + ); + + //zkClient.startAndWait(); + DiscoveryService discovery = new ZKDiscoveryService(zkClient); + TransactionManager txManager = new TransactionManager(config, new HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config), new TxMetricsCollector()), new TxMetricsCollector()); + TransactionService txService = new TransactionService(config, zkClient, discovery, Providers.of(txManager)); + TephraTransactionService service = new TephraTransactionService(zkClient, txService); + //txService.startAndWait(); + service.start(); + return service; + } + + static class TephraTransactionService implements PhoenixTransactionService { + private final ZKClientService zkClient; + private final TransactionService txService; + + public TephraTransactionService(ZKClientService zkClient, TransactionService txService) { + this.zkClient = zkClient; + this.txService = txService; + } + + public void start() { + zkClient.startAndWait(); + txService.startAndWait(); + } + + @Override + public void close() throws IOException { + try { + if (txService != null) txService.stopAndWait(); + } finally { + if (zkClient != null) zkClient.stopAndWait(); + } + } + } - @Override - public Cell newDeleteFamilyMarker(byte[] row, byte[] family, long timestamp) { - return CellUtil.createCell(row, family, TxConstants.FAMILY_DELETE_QUALIFIER, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY); + static class TephraTransactionClient implements PhoenixTransactionClient { + private final ZKClientService zkClient; + private final TransactionSystemClient txClient; + + public TephraTransactionClient(TransactionSystemClient txClient) { + this(null, txClient); + } + + public TephraTransactionClient(ZKClientService zkClient, TransactionSystemClient txClient) { + this.zkClient = zkClient; + this.txClient = txClient; + } + + public void start() { + zkClient.startAndWait(); + } + + public TransactionSystemClient getTransactionClient() { + return txClient; + } + + @Override + public void close() throws IOException { + zkClient.stopAndWait(); + } + } @Override - public Cell newDeleteColumnMarker(byte[] row, byte[] family, byte[] qualifier, long timestamp) { - return CellUtil.createCell(row, family, qualifier, timestamp, KeyValue.Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY); + public Class<? extends RegionObserver> getCoprocessor() { + return TephraTransactionalProcessor.class; + } + + @Override + public Provider getProvider() { + return TransactionFactory.Provider.TEPHRA; + } + + @Override + public boolean isUnsupported(Feature feature) { + return false; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java deleted file mode 100644 index e28e98b..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionTable.java +++ /dev/null @@ -1,360 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.transaction; - -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.CompareOperator; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Append; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Increment; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Row; -import org.apache.hadoop.hbase.client.RowMutations; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.TableDescriptor; -import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; -import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; -import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.PTableType; -import org.apache.tephra.TxConstants; - -import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.Message; -import com.google.protobuf.Service; -import com.google.protobuf.ServiceException; - -public class TephraTransactionTable implements PhoenixTransactionalTable { - - private TransactionAwareHTable transactionAwareHTable; - - private TephraTransactionContext tephraTransactionContext; - - public TephraTransactionTable(PhoenixTransactionContext ctx, Table hTable) { - this(ctx, hTable, null); - } - - public TephraTransactionTable(PhoenixTransactionContext ctx, Table hTable, PTable pTable) { - - assert(ctx instanceof TephraTransactionContext); - - tephraTransactionContext = (TephraTransactionContext) ctx; - - transactionAwareHTable = new TransactionAwareHTable(hTable, (pTable != null && pTable.isImmutableRows()) - ? TxConstants.ConflictDetection.NONE : TxConstants.ConflictDetection.ROW); - - tephraTransactionContext.addTransactionAware(transactionAwareHTable); - - if (pTable != null && pTable.getType() != PTableType.INDEX) { - tephraTransactionContext.markDMLFence(pTable); - } - } - - @Override - public Result get(Get get) throws IOException { - return transactionAwareHTable.get(get); - } - - @Override - public void put(Put put) throws IOException { - transactionAwareHTable.put(put); - } - - @Override - public void delete(Delete delete) throws IOException { - transactionAwareHTable.delete(delete); - } - - @Override - public ResultScanner getScanner(Scan scan) throws IOException { - return transactionAwareHTable.getScanner(scan); - } - - - @Override - public Configuration getConfiguration() { - return transactionAwareHTable.getConfiguration(); - } - - @Override - public HTableDescriptor getTableDescriptor() throws IOException { - return transactionAwareHTable.getTableDescriptor(); - } - - @Override - public boolean exists(Get get) throws IOException { - return transactionAwareHTable.exists(get); - } - - @Override - public Result[] get(List<Get> gets) throws IOException { - return transactionAwareHTable.get(gets); - } - - @Override - public ResultScanner getScanner(byte[] family) throws IOException { - return transactionAwareHTable.getScanner(family); - } - - @Override - public ResultScanner getScanner(byte[] family, byte[] qualifier) - throws IOException { - return transactionAwareHTable.getScanner(family, qualifier); - } - - @Override - public void put(List<Put> puts) throws IOException { - transactionAwareHTable.put(puts); - } - - @Override - public void delete(List<Delete> deletes) throws IOException { - transactionAwareHTable.delete(deletes); - } - - @Override - public void close() throws IOException { - transactionAwareHTable.close(); - } - - - @Override - public TableName getName() { - return transactionAwareHTable.getName(); - } - - @Override - public boolean[] existsAll(List<Get> gets) throws IOException { - return transactionAwareHTable.existsAll(gets); - } - - @Override - public void batch(List<? extends Row> actions, Object[] results) - throws IOException, InterruptedException { - transactionAwareHTable.batch(actions, results); - } - - @Override - public <R> void batchCallback(List<? extends Row> actions, - Object[] results, Callback<R> callback) throws IOException, - InterruptedException { - transactionAwareHTable.batchCallback(actions, results, callback); - } - - @Override - public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, - byte[] value, Put put) throws IOException { - return transactionAwareHTable.checkAndPut(row, family, qualifier, value, put); - } - - @Override - public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, - CompareOp compareOp, byte[] value, Put put) throws IOException { - return transactionAwareHTable.checkAndPut(row, family, qualifier, compareOp, value, put); - } - - @Override - public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, - byte[] value, Delete delete) throws IOException { - return transactionAwareHTable.checkAndDelete(row, family, qualifier, value, delete); - } - - @Override - public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, - CompareOp compareOp, byte[] value, Delete delete) - throws IOException { - return transactionAwareHTable.checkAndDelete(row, family, qualifier, compareOp, value, delete); - } - - @Override - public void mutateRow(RowMutations rm) throws IOException { - transactionAwareHTable.mutateRow(rm); - } - - @Override - public Result append(Append append) throws IOException { - return transactionAwareHTable.append(append); - } - - @Override - public Result increment(Increment increment) throws IOException { - return transactionAwareHTable.increment(increment); - } - - @Override - public long incrementColumnValue(byte[] row, byte[] family, - byte[] qualifier, long amount) throws IOException { - return transactionAwareHTable.incrementColumnValue(row, family, qualifier, amount); - } - - @Override - public long incrementColumnValue(byte[] row, byte[] family, - byte[] qualifier, long amount, Durability durability) - throws IOException { - return transactionAwareHTable.incrementColumnValue(row, family, qualifier, amount, durability); - } - - @Override - public CoprocessorRpcChannel coprocessorService(byte[] row) { - return transactionAwareHTable.coprocessorService(row); - } - - @Override - public <T extends Service, R> Map<byte[], R> coprocessorService( - Class<T> service, byte[] startKey, byte[] endKey, - Call<T, R> callable) throws ServiceException, Throwable { - return transactionAwareHTable.coprocessorService(service, startKey, endKey, callable); - } - - @Override - public <T extends Service, R> void coprocessorService(Class<T> service, - byte[] startKey, byte[] endKey, Call<T, R> callable, - Callback<R> callback) throws ServiceException, Throwable { - transactionAwareHTable.coprocessorService(service, startKey, endKey, callable, callback); - } - - @Override - public <R extends Message> Map<byte[], R> batchCoprocessorService( - MethodDescriptor methodDescriptor, Message request, - byte[] startKey, byte[] endKey, R responsePrototype) - throws ServiceException, Throwable { - return transactionAwareHTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype); - } - - @Override - public <R extends Message> void batchCoprocessorService( - MethodDescriptor methodDescriptor, Message request, - byte[] startKey, byte[] endKey, R responsePrototype, - Callback<R> callback) throws ServiceException, Throwable { - transactionAwareHTable.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype, callback); - } - - @Override - public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, - CompareOp compareOp, byte[] value, RowMutations mutation) - throws IOException { - return transactionAwareHTable.checkAndMutate(row, family, qualifier, compareOp, value, mutation); - } - - @Override - public void setOperationTimeout(int operationTimeout) { - transactionAwareHTable.setOperationTimeout(operationTimeout); - } - - @Override - public int getOperationTimeout() { - return transactionAwareHTable.getOperationTimeout(); - } - - @Override - public void setRpcTimeout(int rpcTimeout) { - transactionAwareHTable.setRpcTimeout(rpcTimeout); - } - - @Override - public int getRpcTimeout() { - return transactionAwareHTable.getRpcTimeout(); - } - - @Override - public TableDescriptor getDescriptor() throws IOException { - return transactionAwareHTable.getDescriptor(); - } - - @Override - public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, - byte[] value, Put put) throws IOException { - return transactionAwareHTable.checkAndPut(row, family, qualifier, op, value, put); - } - - @Override - public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, - byte[] value, Delete delete) throws IOException { - return transactionAwareHTable.checkAndDelete(row, family, qualifier, op, value, delete); - } - - @Override - public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { - return transactionAwareHTable.checkAndMutate(row, family); - } - - @Override - public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, - byte[] value, RowMutations mutation) throws IOException { - return transactionAwareHTable.checkAndMutate(row, family, qualifier, op, value, mutation); - } - - @Override - public int getReadRpcTimeout() { - return transactionAwareHTable.getReadRpcTimeout(); - } - - @Override - public void setReadRpcTimeout(int readRpcTimeout) { - transactionAwareHTable.setReadRpcTimeout(readRpcTimeout); - } - - @Override - public int getWriteRpcTimeout() { - return transactionAwareHTable.getWriteRpcTimeout(); - } - - @Override - public void setWriteRpcTimeout(int writeRpcTimeout) { - transactionAwareHTable.setWriteRpcTimeout(writeRpcTimeout); - } - - @Override - public boolean[] exists(List<Get> gets) throws IOException { - return transactionAwareHTable.exists(gets); - } - - @Override - public long getRpcTimeout(TimeUnit unit) { - return transactionAwareHTable.getRpcTimeout(); - } - - @Override - public long getReadRpcTimeout(TimeUnit unit) { - return transactionAwareHTable.getReadRpcTimeout(unit); - } - - @Override - public long getWriteRpcTimeout(TimeUnit unit) { - return transactionAwareHTable.getWriteRpcTimeout(unit); - } - - @Override - public long getOperationTimeout(TimeUnit unit) { - return transactionAwareHTable.getOperationTimeout(unit); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java index f32764b..62bd808 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionFactory.java @@ -17,24 +17,55 @@ */ package org.apache.phoenix.transaction; +import java.io.IOException; + +import org.apache.phoenix.coprocessor.MetaDataProtocol; + + public class TransactionFactory { - enum TransactionProcessor { - Tephra, - Omid + public enum Provider { + TEPHRA((byte)1, TephraTransactionProvider.getInstance()), + OMID((byte)2, OmidTransactionProvider.getInstance()); + + private final byte code; + private final PhoenixTransactionProvider provider; + + Provider(byte code, PhoenixTransactionProvider provider) { + this.code = code; + this.provider = provider; + } + + public byte getCode() { + return this.code; + } + + public static Provider fromCode(int code) { + if (code < 1 || code > Provider.values().length) { + throw new IllegalArgumentException("Invalid TransactionFactory.Provider " + code); + } + return Provider.values()[code-1]; + } + + public static Provider getDefault() { + return TEPHRA; + } + + public PhoenixTransactionProvider getTransactionProvider() { + return provider; + } } - static public TransactionProvider getTransactionProvider() { - return TephraTransactionProvider.getInstance(); + public static PhoenixTransactionProvider getTransactionProvider(Provider provider) { + return provider.getTransactionProvider(); } - static public TransactionProvider getTransactionProvider(TransactionProcessor processor) { - switch (processor) { - case Tephra: - return TephraTransactionProvider.getInstance(); - case Omid: - return OmidTransactionProvider.getInstance(); - default: - throw new IllegalArgumentException("Unknown transaction processor: " + processor); + public static PhoenixTransactionContext getTransactionContext(byte[] txState, int clientVersion) throws IOException { + if (txState == null || txState.length == 0) { + return null; } + Provider provider = (clientVersion < MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0) + ? Provider.OMID + : Provider.fromCode(txState[txState.length-1]); + return provider.getTransactionProvider().getTransactionContext(txState); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java deleted file mode 100644 index 84211ab..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TransactionProvider.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.transaction; - -import java.io.IOException; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.client.Table; -import org.apache.phoenix.jdbc.PhoenixConnection; - -public interface TransactionProvider { - public PhoenixTransactionContext getTransactionContext(); - public PhoenixTransactionContext getTransactionContext(byte[] txnBytes) throws IOException; - public PhoenixTransactionContext getTransactionContext(PhoenixConnection connection); - public PhoenixTransactionContext getTransactionContext(PhoenixTransactionContext contex, PhoenixConnection connection, boolean subTask); - - public PhoenixTransactionalTable getTransactionalTable(PhoenixTransactionContext ctx, Table htable); - - public Cell newDeleteFamilyMarker(byte[] row, byte[] family, long timestamp); - public Cell newDeleteColumnMarker(byte[] row, byte[] family, byte[] qualifier, long timestamp); -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java index a0912f3..e85732a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java @@ -87,7 +87,6 @@ import org.apache.phoenix.schema.RowKeyValueAccessor; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.ValueBitSet; import org.apache.phoenix.schema.types.PDataType; -import org.apache.phoenix.transaction.TransactionFactory; import com.google.common.base.Function; import com.google.common.base.Joiner; @@ -1514,7 +1513,7 @@ public class PhoenixRuntime { * @return wall clock time in milliseconds (i.e. Epoch time) of a given Cell time stamp. */ public static long getWallClockTimeFromCellTimeStamp(long tsOfCell) { - return TransactionFactory.getTransactionProvider().getTransactionContext().isPreExistingVersion(tsOfCell) ? tsOfCell : TransactionUtil.convertToMilliseconds(tsOfCell); + return TransactionUtil.isTransactionalTimestamp(tsOfCell) ? TransactionUtil.convertToMilliseconds(tsOfCell) : tsOfCell; } public static long getCurrentScn(ReadOnlyProps props) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/15fa00fa/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java index f0bc2d6..914417c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java @@ -59,6 +59,7 @@ import org.apache.phoenix.filter.DistinctPrefixFilter; import org.apache.phoenix.filter.MultiEncodedCQKeyValueComparisonFilter; import org.apache.phoenix.filter.SkipScanFilter; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.hbase.index.util.VersionUtil; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.KeyRange.Bound; @@ -88,6 +89,8 @@ import com.google.common.collect.Lists; */ public class ScanUtil { public static final int[] SINGLE_COLUMN_SLOT_SPAN = new int[1]; + public static final int UNKNOWN_CLIENT_VERSION = VersionUtil.encodeVersion(4, 4, 0); + /* * Max length that we fill our key when we turn an inclusive key * into a exclusive key. @@ -931,5 +934,17 @@ public class ScanUtil { public static boolean isIndexRebuild(Scan scan) { return scan.getAttribute((BaseScannerRegionObserver.REBUILD_INDEXES)) != null; } + + public static int getClientVersion(Scan scan) { + int clientVersion = UNKNOWN_CLIENT_VERSION; + byte[] clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION); + if (clientVersionBytes != null) { + clientVersion = Bytes.toInt(clientVersionBytes); + } + return clientVersion; + } + public static void setClientVersion(Scan scan, int version) { + scan.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, Bytes.toBytes(version)); + } }