Repository: phoenix
Updated Branches:
  refs/heads/txn d49c3bf6f -> bd271b214


Moved TransactionContext from MutationState to PhoenixConnection


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/bd271b21
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/bd271b21
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/bd271b21

Branch: refs/heads/txn
Commit: bd271b214dab942631798ea77688e51f9442ef18
Parents: d49c3bf
Author: Thomas D'Silva <twdsi...@gmail.com>
Authored: Thu Mar 12 00:46:33 2015 -0700
Committer: Thomas D'Silva <twdsi...@gmail.com>
Committed: Thu Mar 12 00:46:33 2015 -0700

----------------------------------------------------------------------
 .../apache/phoenix/execute/MutationState.java   | 198 ++++++++-----------
 .../apache/phoenix/jdbc/PhoenixConnection.java  |  67 ++++++-
 2 files changed, 143 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/bd271b21/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 6e37cc5..45b9dd6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -27,9 +27,12 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import co.cask.tephra.TransactionAware;
+import co.cask.tephra.hbase98.TransactionAwareHTable;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -39,15 +42,12 @@ import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.exception.SQLExceptionCode;
-import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.IndexMetaDataCacheClient;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.monitoring.PhoenixMetrics;
-import org.apache.phoenix.query.HBaseFactoryProvider;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.IllegalDataException;
 import org.apache.phoenix.schema.MetaDataClient;
@@ -63,24 +63,11 @@ import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.ServerUtil;
-import org.apache.twill.discovery.ZKDiscoveryService;
-import org.apache.twill.zookeeper.RetryStrategies;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.apache.twill.zookeeper.ZKClientServices;
-import org.apache.twill.zookeeper.ZKClients;
 import org.cloudera.htrace.Span;
 import org.cloudera.htrace.TraceScope;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import co.cask.tephra.TransactionAware;
-import co.cask.tephra.TransactionContext;
-import co.cask.tephra.TransactionFailureException;
-import co.cask.tephra.distributed.PooledClientProvider;
-import co.cask.tephra.distributed.TransactionServiceClient;
-import co.cask.tephra.hbase98.TransactionAwareHTable;
-
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -96,53 +83,45 @@ public class MutationState implements SQLCloseable {
     private static final Logger logger = 
LoggerFactory.getLogger(MutationState.class);
 
     private PhoenixConnection connection;
-    private final TransactionServiceClient transactionServiceClient;
     private final long maxSize;
     private final ImmutableBytesPtr tempPtr = new ImmutableBytesPtr();
-    private final Map<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> 
mutations; // TODO: Sizing?
+    // map from table to rows 
+    //   rows - map from rowkey to columns
+    //      columns - map from column to value
+    private final Map<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> 
mutations = Maps.newHashMapWithExpectedSize(3); // TODO: Sizing?
+    // map from table ref to htable (possibly a TransactionAwareHTable)
+    private Map<TableRef, HTableInterface> tableRefToHTableMap;
     private long sizeOffset;
-    private int numRows;
+    private int numRows = 0;
     
-    public MutationState(int maxSize, PhoenixConnection connection) throws 
SQLException {
+    public MutationState(int maxSize, PhoenixConnection connection) {
         this(maxSize,connection,0);
     }
     
-    public MutationState(int maxSize, PhoenixConnection connection, long 
sizeOffset) throws SQLException {
-       this(maxSize, connection, sizeOffset, Maps.<TableRef, 
Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>newHashMapWithExpectedSize(3), 0);
+    public MutationState(int maxSize, PhoenixConnection connection, long 
sizeOffset) {
+        this.maxSize = maxSize;
+        this.connection = connection;
+        this.sizeOffset = sizeOffset;
     }
     
-    public MutationState(TableRef table, 
Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations, long sizeOffset, long 
maxSize, PhoenixConnection connection) throws SQLException {
-       this(maxSize, connection, sizeOffset, 
Maps.newHashMap(ImmutableMap.of(table, mutations)), mutations.size());
+    public MutationState(TableRef table, 
Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations, long sizeOffset, long 
maxSize, PhoenixConnection connection) {
+        this.maxSize = maxSize;
+        this.connection = connection;
+        this.mutations.put(table, mutations);
+        this.sizeOffset = sizeOffset;
+        this.numRows = mutations.size();
+        throwIfTooBig();
     }
     
-    public MutationState(long maxSize, PhoenixConnection connection, long 
sizeOffset, Map<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> 
mutations, int numRows) throws SQLException {
+    private MutationState(List<Map.Entry<TableRef, 
Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> entries, long sizeOffset, long 
maxSize, PhoenixConnection connection) {
         this.maxSize = maxSize;
         this.connection = connection;
         this.sizeOffset = sizeOffset;
-        this.mutations = mutations; 
-        this.numRows = numRows;
+        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> 
entry : entries) {
+            numRows += entry.getValue().size();
+            this.mutations.put(entry.getKey(), entry.getValue());
+        }
         throwIfTooBig();
-        
-        //create a transaction service client
-//        Configuration config = 
HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
-//             String zkQuorumServersString = 
ConnectionInfo.getZookeeperConnectionString(connection.getURL());
-//             ZKClientService zkClientService = ZKClientServices.delegate(
-//                           ZKClients.reWatchOnExpire(
-//                             ZKClients.retryOnFailure(
-//                               
ZKClientService.Builder.of(zkQuorumServersString)
-//                                 
.setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT, 
HConstants.DEFAULT_ZK_SESSION_TIMEOUT))
-//                                 .build(),
-//                               RetryStrategies.exponentialDelay(500, 2000, 
TimeUnit.MILLISECONDS)
-//                             )
-//                           )
-//                         );
-//             zkClientService.startAndWait();
-//             ZKDiscoveryService zkDiscoveryService = new 
ZKDiscoveryService(zkClientService);
-//             PooledClientProvider pooledClientProvider = new 
PooledClientProvider(
-//                             config, zkDiscoveryService);
-//             this.transactionServiceClient = new 
TransactionServiceClient(config,
-//                             pooledClientProvider);
-               this.transactionServiceClient = null;
     }
     
     private void throwIfTooBig() {
@@ -381,49 +360,31 @@ public class MutationState implements SQLCloseable {
         }
     }
     
-    @SuppressWarnings("deprecation")
-    public void commit() throws SQLException {
-       // create list of transaction aware htables
-        List<TransactionAware> txAwareHTables = 
Lists.newArrayListWithExpectedSize(mutations.size());
-        // create list of htables (some of which could be transactional)
-        List<HTableInterface> hTables = 
Lists.newArrayListWithExpectedSize(mutations.size());
-        for ( TableRef tableRef : this.mutations.keySet()) {
-               PTable table = tableRef.getTable();
-                       byte[] hTableName = table.getPhysicalName().getBytes();
-               HTableInterface hTable = 
connection.getQueryServices().getTable(hTableName);
-               if (table.isTransactional()) {
-                       TransactionAwareHTable transactionAwareHTable = new 
TransactionAwareHTable(hTable);
-               txAwareHTables.add(transactionAwareHTable);
-               hTable = transactionAwareHTable;
-               }
-               hTables.add(hTable);
-        }
-        
-        if (txAwareHTables.isEmpty()) {
-               commitMutations(hTables);
-        }
-        else {
-               TransactionContext transactionContext = new 
TransactionContext(transactionServiceClient, txAwareHTables);
-                       try {
-                               transactionContext.start();
-                               commitMutations(hTables);
-                               transactionContext.finish();
-                       } catch (TransactionFailureException e) {
-                               try {
-                                       transactionContext.abort();
-                                       throw new SQLExceptionInfo.Builder(
-                                                       
SQLExceptionCode.TRANSACTION_FINISH_EXCEPTION)
-                                                       
.setRootCause(e).build().buildException();
-                               } catch (TransactionFailureException e1) {
-                                       throw new SQLExceptionInfo.Builder(
-                                                       
SQLExceptionCode.TRANSACTION_ABORT_EXCEPTION)
-                                                       
.setRootCause(e1).build().buildException();
-                               }
-                       }
-        }
+    /**
+     * Creates a map from table ref to htable which is used by {@link 
PhoenixConnection} 
+     * for transactions
+     * @return list of transaction aware htables
+     */
+    public List<TransactionAware> preSend() throws SQLException {
+       List<TransactionAware> txAwareHTables = 
Lists.newArrayListWithExpectedSize(mutations.size());
+       tableRefToHTableMap = Maps.newHashMapWithExpectedSize(mutations.size());
+       for ( TableRef tableRef : this.mutations.keySet()) {
+               PTable table = tableRef.getTable();
+               byte[] hTableName = table.getPhysicalName().getBytes();
+               HTableInterface hTable = 
connection.getQueryServices().getTable(hTableName);
+               if (table.isTransactional()) {
+                       TransactionAwareHTable transactionAwareHTable = new 
TransactionAwareHTable(hTable);
+               txAwareHTables.add(transactionAwareHTable);
+               hTable = transactionAwareHTable;
+               }
+               tableRefToHTableMap.put(tableRef, hTable);
+       }
+       return txAwareHTables;
     }
     
-    public void commitMutations(List<HTableInterface> hTables) throws 
SQLException {
+    
+    @SuppressWarnings("deprecation")
+    public void send() throws SQLException {
         int i = 0;
         byte[] tenantId = connection.getTenantId() == null ? null : 
connection.getTenantId().getBytes();
         long[] serverTimeStamps = validate();
@@ -486,10 +447,7 @@ public class MutationState implements SQLCloseable {
                     }
                     
                     SQLException sqlE = null;
-                    HTableInterface hTable = 
connection.getQueryServices().getTable(htableName);
-                    if (table.isTransactional()) {
-                        hTable = new TransactionAwareHTable(hTable);
-                    }
+                    HTableInterface hTable = tableRefToHTableMap.get(tableRef);
                     try {
                         logMutationSize(hTable, mutations, connection);
                         MUTATION_BATCH_SIZE.update(mutations.size());
@@ -524,31 +482,15 @@ public class MutationState implements SQLCloseable {
                         }
                         // Throw to client with both what was committed so far 
and what is left to be committed.
                         // That way, client can either undo what was done or 
try again with what was not done.
-                        int numCommitedMutations = 0;
-                        Map<TableRef, 
Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> commitedMutations = 
Maps.newHashMapWithExpectedSize(committedList.size());
-                        for (Map.Entry<TableRef, 
Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> committedEntry : committedList) {
-                               numCommitedMutations += 
committedEntry.getValue().size();
-                               commitedMutations.put(committedEntry.getKey(), 
committedEntry.getValue());
-                        }
-                        sqlE = new CommitException(e, this, new 
MutationState(this.maxSize, this.connection, this.sizeOffset, 
commitedMutations, numCommitedMutations));
+                        sqlE = new CommitException(e, this, new 
MutationState(committedList, this.sizeOffset, this.maxSize, this.connection));
                     } finally {
                         try {
-                            hTable.close();
-                        } catch (IOException e) {
-                            if (sqlE != null) {
-                                
sqlE.setNextException(ServerUtil.parseServerException(e));
-                            } else {
-                                sqlE = ServerUtil.parseServerException(e);
+                            if (cache != null) {
+                                cache.close();
                             }
                         } finally {
-                            try {
-                                if (cache != null) {
-                                    cache.close();
-                                }
-                            } finally {
-                                if (sqlE != null) {
-                                    throw sqlE;
-                                }
+                            if (sqlE != null) {
+                                throw sqlE;
                             }
                         }
                     }
@@ -565,7 +507,29 @@ public class MutationState implements SQLCloseable {
         assert(this.mutations.isEmpty());
     }
     
-    public void rollback(PhoenixConnection connection) throws SQLException {
+    public void postSend() throws SQLException {
+       SQLException sqlE = null;
+               for (Entry<TableRef, HTableInterface> entry : 
tableRefToHTableMap.entrySet()) {
+                       HTableInterface hTable = entry.getValue();
+                       try {
+                               hTable.close();
+                       } 
+                       catch (IOException e) {
+                               if (sqlE != null) {
+                                       
sqlE.setNextException(ServerUtil.parseServerException(e));
+                               } else {
+                                       sqlE = 
ServerUtil.parseServerException(e);
+                               }
+                       } 
+                       finally {
+                               if (sqlE != null) {
+                    throw sqlE;
+                }
+                       }
+               }
+    }
+    
+    public void clear(PhoenixConnection connection) throws SQLException {
         this.mutations.clear();
         numRows = 0;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/bd271b21/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index d5687f0..b5c9a2d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -51,9 +51,11 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nullable;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.phoenix.call.CallRunner;
 import org.apache.phoenix.exception.SQLExceptionCode;
@@ -61,9 +63,11 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.expression.function.FunctionArgumentType;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.jdbc.PhoenixStatement.PhoenixStatementParser;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.DelegateConnectionQueryServices;
+import org.apache.phoenix.query.HBaseFactoryProvider;
 import org.apache.phoenix.query.MetaDataMutated;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
@@ -92,9 +96,20 @@ import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.SQLCloseables;
+import org.apache.twill.discovery.ZKDiscoveryService;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
 import org.cloudera.htrace.Sampler;
 import org.cloudera.htrace.TraceScope;
 
+import co.cask.tephra.TransactionAware;
+import co.cask.tephra.TransactionContext;
+import co.cask.tephra.TransactionFailureException;
+import co.cask.tephra.distributed.PooledClientProvider;
+import co.cask.tephra.distributed.TransactionServiceClient;
+
 import com.google.common.base.Objects;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
@@ -123,6 +138,7 @@ public class PhoenixConnection implements Connection, 
org.apache.phoenix.jdbc.Jd
     private List<SQLCloseable> statements = new ArrayList<SQLCloseable>();
     private final Map<PDataType<?>, Format> formatters = new HashMap<>();
     private final MutationState mutationState;
+    private final TransactionServiceClient transactionServiceClient;
     private final int mutateBatchSize;
     private final Long scn;
     private boolean isAutoCommit = false;
@@ -241,7 +257,25 @@ public class PhoenixConnection implements Connection, 
org.apache.phoenix.jdbc.Jd
         // setup tracing, if its enabled
         this.sampler = Tracing.getConfiguredSampler(this);
         this.customTracingAnnotations = getImmutableCustomTracingAnnotations();
-               
+        
+        //create a transaction service client
+        Configuration config = 
HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+        String zkQuorumServersString = 
ConnectionInfo.getZookeeperConnectionString(url);
+        ZKClientService zkClientService = ZKClientServices.delegate(
+                     ZKClients.reWatchOnExpire(
+                       ZKClients.retryOnFailure(
+                         ZKClientService.Builder.of(zkQuorumServersString)
+                           
.setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT, 
HConstants.DEFAULT_ZK_SESSION_TIMEOUT))
+                           .build(),
+                         RetryStrategies.exponentialDelay(500, 2000, 
TimeUnit.MILLISECONDS)
+                       )
+                     )
+                   );
+        zkClientService.startAndWait();
+        ZKDiscoveryService zkDiscoveryService = new 
ZKDiscoveryService(zkClientService);
+        PooledClientProvider pooledClientProvider = new PooledClientProvider(
+                       config, zkDiscoveryService);
+        this.transactionServiceClient = new 
TransactionServiceClient(config,pooledClientProvider);
     }
     
     private ImmutableMap<String, String> 
getImmutableCustomTracingAnnotations() {
@@ -398,7 +432,7 @@ public class PhoenixConnection implements Connection, 
org.apache.phoenix.jdbc.Jd
         // from modifying this list.
         this.statements = Lists.newArrayList();
         try {
-            mutationState.rollback(this);
+            mutationState.clear(this);
         } finally {
             try {
                 SQLCloseables.closeAll(statements);
@@ -426,13 +460,36 @@ public class PhoenixConnection implements Connection, 
org.apache.phoenix.jdbc.Jd
             isClosed = true;
         }
     }
-    
+
     @Override
     public void commit() throws SQLException {
         CallRunner.run(new CallRunner.CallableThrowable<Void, SQLException>() {
             @Override
             public Void call() throws SQLException {
-                mutationState.commit();
+               List<TransactionAware> txAwareHTables = mutationState.preSend();
+                               if (txAwareHTables.isEmpty()) {
+                                       mutationState.send();
+                               } 
+                               else {
+                                       TransactionContext transactionContext = 
new TransactionContext(transactionServiceClient, txAwareHTables);
+                                       try {
+                                               transactionContext.start();
+                               mutationState.send();
+                                               transactionContext.finish();
+                                       } catch (TransactionFailureException e) 
{
+                                               try {
+                                                       
transactionContext.abort();
+                                                       throw new 
SQLExceptionInfo.Builder(
+                                                                       
SQLExceptionCode.TRANSACTION_FINISH_EXCEPTION)
+                                                                       
.setRootCause(e).build().buildException();
+                                               } catch 
(TransactionFailureException e1) {
+                                                       throw new 
SQLExceptionInfo.Builder(
+                                                                       
SQLExceptionCode.TRANSACTION_ABORT_EXCEPTION)
+                                                                       
.setRootCause(e1).build().buildException();
+                                               }
+                                       }
+                               }
+                               mutationState.postSend();
                 return null;
             }
         }, Tracing.withTracing(this, "committing mutations"));
@@ -635,7 +692,7 @@ public class PhoenixConnection implements Connection, 
org.apache.phoenix.jdbc.Jd
 
     @Override
     public void rollback() throws SQLException {
-        mutationState.rollback(this);
+        mutationState.clear(this);
     }
 
     @Override

Reply via email to