Repository: phoenix Updated Branches: refs/heads/5.x-HBase-2.0 113904275 -> 62027bff1
PHOENIX-4350 Replace deprecated or changed Region methods with new APIs(Rajeshbabu) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/62027bff Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/62027bff Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/62027bff Branch: refs/heads/5.x-HBase-2.0 Commit: 62027bff132dda23e6f7ae30334f191e68072ba2 Parents: 1139042 Author: Rajeshbabu Chintaguntla <rajeshb...@apache.org> Authored: Wed Nov 8 17:05:31 2017 +0530 Committer: Rajeshbabu Chintaguntla <rajeshb...@apache.org> Committed: Wed Nov 8 17:05:31 2017 +0530 ---------------------------------------------------------------------- ...ReplayWithIndexWritesAndCompressedWALIT.java | 8 ++- .../DataTableLocalIndexRegionScanner.java | 3 +- .../coprocessor/MetaDataEndpointImpl.java | 57 ++++++++------------ .../coprocessor/SequenceRegionObserver.java | 21 +++----- .../UngroupedAggregateRegionObserver.java | 30 ++++++----- .../hbase/index/IndexRegionSplitPolicy.java | 25 ++++----- .../org/apache/phoenix/hbase/index/Indexer.java | 3 +- .../stats/DefaultStatisticsCollector.java | 2 +- .../java/org/apache/phoenix/util/IndexUtil.java | 4 +- .../org/apache/phoenix/util/ServerUtil.java | 20 +++++++ pom.xml | 2 +- 11 files changed, 85 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/62027bff/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java b/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java index 5ca6de9..dfff8fe 100644 --- a/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java +++ b/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java @@ -183,15 +183,14 @@ public class WALReplayWithIndexWritesAndCompressedWALIT { CoveredColumnIndexSpecifierBuilder builder = new CoveredColumnIndexSpecifierBuilder(); builder.addIndexGroup(fam1); builder.build(htd); + WALFactory walFactory = new WALFactory(this.conf, null, "localhost,1234"); + WAL wal = createWAL(this.conf, walFactory); // create the region + its WAL - HRegion region0 = HRegion.createHRegion(hri, hbaseRootDir, this.conf, htd); // FIXME: Uses private type + HRegion region0 = HRegion.createHRegion(hri, hbaseRootDir, this.conf, htd, wal); // FIXME: Uses private type region0.close(); region0.getWAL().close(); - WALFactory walFactory = new WALFactory(this.conf, null, "localhost,1234"); - - WAL wal = createWAL(this.conf, walFactory); HRegionServer mockRS = Mockito.mock(HRegionServer.class); // mock out some of the internals of the RSS, so we can run CPs when(mockRS.getWAL(null)).thenReturn(wal); @@ -202,7 +201,6 @@ public class WALReplayWithIndexWritesAndCompressedWALIT { when(mockRS.getServerName()).thenReturn(mockServerName); HRegion region = spy(new HRegion(basedir, wal, this.fs, this.conf, hri, htd, mockRS)); region.initialize(); - when(region.getSequenceId()).thenReturn(0l); //make an attempted write to the primary that should also be indexed http://git-wip-us.apache.org/repos/asf/phoenix/blob/62027bff/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java index 64d4ac4..eee6c93 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java @@ -94,8 +94,7 @@ public class DataTableLocalIndexRegionScanner extends DelegateRegionScanner { boolean next = super.next(dataTableResults); addMutations(dataTableResults); if (ServerUtil.readyToCommit(mutationList.size(), mutationList.byteSize(), maxBatchSize, maxBatchSizeBytes)||!next) { - region.batchMutate(mutationList.toArray(new Mutation[mutationList.size()]), HConstants.NO_NONCE, - HConstants.NO_NONCE); + region.batchMutate(mutationList.toArray(new Mutation[mutationList.size()])); mutationList.clear(); } return next; http://git-wip-us.apache.org/repos/asf/phoenix/blob/62027bff/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index a42e1b7..c2124d0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -1405,7 +1405,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // Place a lock using key for the table to be created byte[] tableKey = SchemaUtil.getTableKey(tenantIdBytes, schemaName, tableName); try { - acquireLock(region, tableKey, locks); + ServerUtil.acquireLock(region, tableKey, locks); // If the table key resides outside the region, return without doing anything MetaDataMutationResult result = checkTableKeyInRegion(tableKey, region); @@ -1423,7 +1423,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // For an index on view, the view header row needs to be locked. result = checkTableKeyInRegion(parentTableKey, region); if (result == null) { - acquireLock(region, parentTableKey, locks); + ServerUtil.acquireLock(region, parentTableKey, locks); parentCacheKey = new ImmutableBytesPtr(parentTableKey); parentTable = loadTable(env, parentTableKey, parentCacheKey, clientTimeStamp, clientTimeStamp, clientVersion); @@ -1632,7 +1632,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso done.run(builder.build()); return; } finally { - region.releaseRowLocks(locks); + ServerUtil.releaseRowLocks(locks); } } catch (Throwable t) { logger.error("createTable failed", t); @@ -1648,16 +1648,6 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso QueryServicesOptions.DEFAULT_MAX_INDEXES_PER_TABLE); } - private static RowLock acquireLock(Region region, byte[] key, List<RowLock> locks) - throws IOException { - RowLock rowLock = region.getRowLock(key, false); - if (rowLock == null) { - throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key)); - } - locks.add(rowLock); - return rowLock; - } - private static final byte[] CHILD_TABLE_BYTES = new byte[] {PTable.LinkType.CHILD_TABLE.getSerializedValue()}; @@ -1883,7 +1873,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso done.run(MetaDataMutationResult.toProto(result)); return; } finally { - region.releaseRowLocks(locks); + ServerUtil.releaseRowLocks(locks); } } catch (Throwable t) { logger.error("dropTable failed", t); @@ -1962,7 +1952,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso byte[] viewKey = SchemaUtil.getTableKey(viewTenantId, viewSchemaName, viewName); Delete delete = new Delete(viewKey, clientTimeStamp); rowsToDelete.add(delete); - acquireLock(region, viewKey, locks); + ServerUtil.acquireLock(region, viewKey, locks); MetaDataMutationResult result = doDropTable(viewKey, viewTenantId, viewSchemaName, viewName, null, PTableType.VIEW, rowsToDelete, invalidateList, locks, tableNamesToDelete, sharedTablesToDelete, false, clientVersion); @@ -2025,7 +2015,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // of the client. Delete delete = new Delete(indexKey, clientTimeStamp); rowsToDelete.add(delete); - acquireLock(region, indexKey, locks); + ServerUtil.acquireLock(region, indexKey, locks); MetaDataMutationResult result = doDropTable(indexKey, tenantId, schemaName, indexName, tableName, PTableType.INDEX, rowsToDelete, invalidateList, locks, tableNamesToDelete, sharedTablesToDelete, false, clientVersion); @@ -2061,7 +2051,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } List<RowLock> locks = Lists.newArrayList(); try { - acquireLock(region, key, locks); + ServerUtil.acquireLock(region, key, locks); ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key); List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>(); invalidateList.add(cacheKey); @@ -2155,7 +2145,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, currentTime, table); } } finally { - region.releaseRowLocks(locks); + ServerUtil.releaseRowLocks(locks); } } catch (Throwable t) { ServerUtil.throwIOException(SchemaUtil.getTableName(schemaName, tableName), t); @@ -2356,7 +2346,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso byte[] viewKey = SchemaUtil.getTableKey(tenantId, schema, table); // lock the rows corresponding to views so that no other thread can modify the view meta-data - RowLock viewRowLock = acquireLock(region, viewKey, locks); + RowLock viewRowLock = ServerUtil.acquireLock(region, viewKey, locks); PTable view = doGetTable(viewKey, clientTimeStamp, viewRowLock, clientVersion); ColumnOrdinalPositionUpdateList ordinalPositionList = new ColumnOrdinalPositionUpdateList(); @@ -2681,7 +2671,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // lock the rows corresponding to views so that no other thread can modify the view // meta-data - RowLock viewRowLock = acquireLock(region, viewKey, locks); + RowLock viewRowLock = ServerUtil.acquireLock(region, viewKey, locks); PTable view = doGetTable(viewKey, clientTimeStamp, viewRowLock, clientVersion); ColumnOrdinalPositionUpdateList ordinalPositionList = @@ -3223,10 +3213,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso if(functionsAvailable.size() == numFunctions) return functionsAvailable; return null; } finally { - for (Region.RowLock lock : rowLocks) { - lock.release(); - } - rowLocks.clear(); + ServerUtil.releaseRowLocks(rowLocks); } } @@ -3380,7 +3367,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso // Since we're dropping the index, lock it to ensure // that a change in index state doesn't // occur while we're dropping it. - acquireLock(region, indexKey, locks); + ServerUtil.acquireLock(region, indexKey, locks); // Drop the index table. The doDropTable will expand // this to all of the table rows and invalidate the // index table @@ -3772,7 +3759,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso long clientTimeStamp = request.getClientTimestamp(); List<RowLock> locks = Lists.newArrayList(); try { - acquireLock(region, lockKey, locks); + ServerUtil.acquireLock(region, lockKey, locks); // Get as of latest timestamp so we can detect if we have a // newer schema that already // exists without making an additional query @@ -3802,7 +3789,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso done.run(builder.build()); return; } finally { - region.releaseRowLocks(locks); + ServerUtil.releaseRowLocks(locks); } } @@ -3876,7 +3863,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso List<RowLock> locks = Lists.newArrayList(); long clientTimeStamp = MetaDataUtil.getClientTimeStamp(functionMetaData); try { - acquireLock(region, lockKey, locks); + ServerUtil.acquireLock(region, lockKey, locks); // Get as of latest timestamp so we can detect if we have a newer function that already // exists without making an additional query ImmutableBytesPtr cacheKey = new FunctionBytesPtr(lockKey); @@ -3919,7 +3906,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso done.run(builder.build()); return; } finally { - region.releaseRowLocks(locks); + ServerUtil.releaseRowLocks(locks); } } catch (Throwable t) { logger.error("createFunction failed", t); @@ -3948,7 +3935,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso List<RowLock> locks = Lists.newArrayList(); long clientTimeStamp = MetaDataUtil.getClientTimeStamp(functionMetaData); try { - acquireLock(region, lockKey, locks); + ServerUtil.acquireLock(region, lockKey, locks); List<byte[]> keys = new ArrayList<byte[]>(1); keys.add(lockKey); List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>(); @@ -3971,7 +3958,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso done.run(MetaDataMutationResult.toProto(result)); return; } finally { - region.releaseRowLocks(locks); + ServerUtil.releaseRowLocks(locks); } } catch (Throwable t) { logger.error("dropFunction failed", t); @@ -4046,7 +4033,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso List<RowLock> locks = Lists.newArrayList(); long clientTimeStamp = MetaDataUtil.getClientTimeStamp(schemaMutations); try { - acquireLock(region, lockKey, locks); + ServerUtil.acquireLock(region, lockKey, locks); // Get as of latest timestamp so we can detect if we have a newer schema that already exists without // making an additional query ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(lockKey); @@ -4086,7 +4073,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso done.run(builder.build()); return; } finally { - region.releaseRowLocks(locks); + ServerUtil.releaseRowLocks(locks); } } catch (Throwable t) { logger.error("Creating the schema" + schemaName + "failed", t); @@ -4110,7 +4097,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso List<RowLock> locks = Lists.newArrayList(); long clientTimeStamp = MetaDataUtil.getClientTimeStamp(schemaMetaData); try { - acquireLock(region, lockKey, locks); + ServerUtil.acquireLock(region, lockKey, locks); List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>(1); result = doDropSchema(clientTimeStamp, schemaName, lockKey, schemaMetaData, invalidateList); if (result.getMutationCode() != MutationCode.SCHEMA_ALREADY_EXISTS) { @@ -4129,7 +4116,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso done.run(MetaDataMutationResult.toProto(result)); return; } finally { - region.releaseRowLocks(locks); + ServerUtil.releaseRowLocks(locks); } } catch (Throwable t) { logger.error("drop schema failed:", t); http://git-wip-us.apache.org/repos/asf/phoenix/blob/62027bff/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java index 8ef5e80..c004818 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java @@ -89,15 +89,6 @@ public class SequenceRegionObserver implements RegionObserver { QueryConstants.EMPTY_COLUMN_BYTES, timestamp, errorCodeBuf))); } - private static void acquireLock(Region region, byte[] key, List<RowLock> locks) - throws IOException { - RowLock rowLock = region.getRowLock(key, false); - if (rowLock == null) { - throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key)); - } - locks.add(rowLock); - } - /** * Use PreIncrement hook of BaseRegionObserver to overcome deficiencies in Increment * implementation (HBASE-10254): @@ -121,7 +112,7 @@ public class SequenceRegionObserver implements RegionObserver { TimeRange tr = increment.getTimeRange(); region.startRegionOperation(); try { - acquireLock(region, row, locks); + ServerUtil.acquireLock(region, row, locks); try { long maxTimestamp = tr.getMax(); boolean validateOnly = true; @@ -278,11 +269,11 @@ public class SequenceRegionObserver implements RegionObserver { } // update the KeyValues on the server Mutation[] mutations = new Mutation[]{put}; - region.batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE); + region.batchMutate(mutations); // return a Result with the updated KeyValues return Result.create(cells); } finally { - region.releaseRowLocks(locks); + ServerUtil.releaseRowLocks(locks); } } catch (Throwable t) { ServerUtil.throwIOException("Increment of sequence " + Bytes.toStringBinary(row), t); @@ -378,7 +369,7 @@ public class SequenceRegionObserver implements RegionObserver { List<RowLock> locks = Lists.newArrayList(); region.startRegionOperation(); try { - acquireLock(region, row, locks); + ServerUtil.acquireLock(region, row, locks); try { byte[] family = CellUtil.cloneFamily(keyValue); byte[] qualifier = CellUtil.cloneQualifier(keyValue); @@ -428,7 +419,7 @@ public class SequenceRegionObserver implements RegionObserver { } } Mutation[] mutations = new Mutation[]{m}; - region.batchMutate(mutations, HConstants.NO_NONCE, HConstants.NO_NONCE); + region.batchMutate(mutations); long serverTimestamp = MetaDataUtil.getClientTimeStamp(m); // Return result with single KeyValue. The only piece of information // the client cares about is the timestamp, which is the timestamp of @@ -436,7 +427,7 @@ public class SequenceRegionObserver implements RegionObserver { return Result.create(Collections.singletonList( (Cell)KeyValueUtil.newKeyValue(row, PhoenixDatabaseMetaData.SYSTEM_SEQUENCE_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, serverTimestamp, SUCCESS_VALUE))); } finally { - region.releaseRowLocks(locks); + ServerUtil.releaseRowLocks(locks); } } catch (Throwable t) { ServerUtil.throwIOException("Increment of sequence " + Bytes.toStringBinary(row), t); http://git-wip-us.apache.org/repos/asf/phoenix/blob/62027bff/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index a770aa0..e68f95e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -54,12 +54,14 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; @@ -252,7 +254,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } // TODO: should we use the one that is all or none? logger.debug("Committing bactch of " + mutations.size() + " mutations for " + region.getRegionInfo().getTable().getNameAsString()); - region.batchMutate(mutations.toArray(mutationArray), HConstants.NO_NONCE, HConstants.NO_NONCE); + region.batchMutate(mutations.toArray(mutationArray)); } private void setIndexAndTransactionProperties(List<Mutation> mutations, byte[] indexUUID, byte[] indexMaintainersPtr, byte[] txState, boolean useIndexProto) { @@ -269,14 +271,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } - private void commitBatchWithHTable(HTable table, List<Mutation> mutations) throws IOException { + private void commitBatchWithHTable(Table table, List<Mutation> mutations) throws IOException { if (mutations.isEmpty()) { return; } logger.debug("Committing batch of " + mutations.size() + " mutations for " + table); try { - table.batch(mutations); + table.batch(mutations, null); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); @@ -412,13 +414,15 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver byte[] deleteCQ = null; byte[] deleteCF = null; byte[] emptyCF = null; - HTable targetHTable = null; + Table targetHTable = null; boolean isPKChanging = false; ImmutableBytesWritable ptr = new ImmutableBytesWritable(); if (upsertSelectTable != null) { isUpsert = true; projectedTable = deserializeTable(upsertSelectTable); - targetHTable = new HTable(upsertSelectConfig, projectedTable.getPhysicalName().getBytes()); + targetHTable = + ConnectionFactory.createConnection(upsertSelectConfig).getTable( + TableName.valueOf(projectedTable.getPhysicalName().getBytes())); selectExpressions = deserializeExpressions(scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS)); values = new byte[projectedTable.getPKColumns().size()][]; isPKChanging = ExpressionUtil.isPkPositionChanging(new TableRef(projectedTable), selectExpressions); @@ -457,7 +461,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver MutationList mutations = new MutationList(); boolean needToWrite = false; Configuration conf = env.getConfiguration(); - long flushSize = region.getTableDesc().getMemStoreFlushSize(); + long flushSize = region.getTableDescriptor().getMemStoreFlushSize(); if (flushSize <= 0) { flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, @@ -858,7 +862,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } private void commit(Region region, List<Mutation> mutations, byte[] indexUUID, long blockingMemStoreSize, - byte[] indexMaintainersPtr, byte[] txState, HTable targetHTable, boolean useIndexProto, + byte[] indexMaintainersPtr, byte[] txState, Table targetHTable, boolean useIndexProto, boolean isPKChanging) throws IOException { List<Mutation> localRegionMutations = Lists.newArrayList(); @@ -872,7 +876,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver remoteRegionMutations.clear(); } - private void separateLocalAndRemoteMutations(HTable targetHTable, Region region, List<Mutation> mutations, + private void separateLocalAndRemoteMutations(Table targetHTable, Region region, List<Mutation> mutations, List<Mutation> localRegionMutations, List<Mutation> remoteRegionMutations, boolean isPKChanging){ boolean areMutationsInSameTable = areMutationsInSameTable(targetHTable, region); @@ -894,8 +898,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } - private boolean areMutationsInSameTable(HTable targetHTable, Region region) { - return (targetHTable == null || Bytes.compareTo(targetHTable.getTableName(), + private boolean areMutationsInSameTable(Table targetHTable, Region region) { + return (targetHTable == null || Bytes.compareTo(targetHTable.getName(), region.getTableDesc().getTableName().getName()) == 0); } @@ -1074,8 +1078,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } if (ServerUtil.readyToCommit(mutations.size(), mutations.byteSize(), maxBatchSize, maxBatchSizeBytes)) { - region.batchMutate(mutations.toArray(new Mutation[mutations.size()]), HConstants.NO_NONCE, - HConstants.NO_NONCE); + region.batchMutate(mutations.toArray(new Mutation[mutations.size()])); uuidValue = ServerCacheClient.generateId(); mutations.clear(); } @@ -1084,8 +1087,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } while (hasMore); if (!mutations.isEmpty()) { - region.batchMutate(mutations.toArray(new Mutation[mutations.size()]), HConstants.NO_NONCE, - HConstants.NO_NONCE); + region.batchMutate(mutations.toArray(new Mutation[mutations.size()])); } } } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/62027bff/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionSplitPolicy.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionSplitPolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionSplitPolicy.java index 13a3047..8fd3da5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionSplitPolicy.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionSplitPolicy.java @@ -18,9 +18,10 @@ package org.apache.phoenix.hbase.index; import java.util.List; +import java.util.Optional; +import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.IncreasingToUpperBoundRegionSplitPolicy; -import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.query.QueryConstants; @@ -42,29 +43,29 @@ public class IndexRegionSplitPolicy extends IncreasingToUpperBoundRegionSplitPol protected byte[] getSplitPoint() { byte[] oldSplitPoint = super.getSplitPoint(); if (oldSplitPoint == null) return null; - List<Store> stores = region.getStores(); + List<HStore> stores = region.getStores(); byte[] splitPointFromLargestStore = null; long largestStoreSize = 0; boolean isLocalIndexKey = false; - for (Store s : stores) { - if (s.getFamily().getNameAsString() + for (HStore s : stores) { + if (s.getColumnFamilyName() .startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) { - byte[] splitPoint = s.getSplitPoint(); - if (oldSplitPoint != null && splitPoint != null - && Bytes.compareTo(oldSplitPoint, splitPoint) == 0) { + Optional<byte[]> splitPoint = s.getSplitPoint(); + if (oldSplitPoint != null && splitPoint.isPresent() + && Bytes.compareTo(oldSplitPoint, splitPoint.get()) == 0) { isLocalIndexKey = true; } } } if (!isLocalIndexKey) return oldSplitPoint; - for (Store s : stores) { - if (!s.getFamily().getNameAsString() + for (HStore s : stores) { + if (!s.getColumnFamilyName() .startsWith(QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX)) { - byte[] splitPoint = s.getSplitPoint(); + Optional<byte[]> splitPoint = s.getSplitPoint(); long storeSize = s.getSize(); - if (splitPoint != null && largestStoreSize < storeSize) { - splitPointFromLargestStore = splitPoint; + if (splitPoint.isPresent() && largestStoreSize < storeSize) { + splitPointFromLargestStore = splitPoint.get(); largestStoreSize = storeSize; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/62027bff/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index 1c78fff..5f4afe5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -335,8 +335,7 @@ public class Indexer implements RegionObserver, RegionCoprocessor { if (!mutations.isEmpty()) { Region region = e.getEnvironment().getRegion(); // Otherwise, submit the mutations directly here - region.batchMutate(mutations.toArray(new Mutation[0]), HConstants.NO_NONCE, - HConstants.NO_NONCE); + region.batchMutate(mutations.toArray(new Mutation[0])); } return Result.EMPTY_RESULT; } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/62027bff/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java index 8f36fd6..c1cce01 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/DefaultStatisticsCollector.java @@ -203,7 +203,7 @@ class DefaultStatisticsCollector implements StatisticsCollector { // families when we're collecting stats for a local index. boolean collectingForLocalIndex = scan != null && !scan.getFamilyMap().isEmpty() && MetaDataUtil.isLocalIndexFamily(scan.getFamilyMap().keySet().iterator().next()); for (Store store : region.getStores()) { - ImmutableBytesPtr cfKey = new ImmutableBytesPtr(store.getFamily().getName()); + ImmutableBytesPtr cfKey = new ImmutableBytesPtr(store.getColumnFamilyDescriptor().getName()); boolean isLocalIndexStore = MetaDataUtil.isLocalIndexFamily(cfKey); if (isLocalIndexStore != collectingForLocalIndex) { continue; http://git-wip-us.apache.org/repos/asf/phoenix/blob/62027bff/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index c3182c5..65bff14 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -711,9 +711,7 @@ public class IndexUtil { m.setDurability(Durability.SKIP_WAL); } } - region.batchMutate( - mutations.toArray(new Mutation[mutations.size()]), - HConstants.NO_NONCE, HConstants.NO_NONCE); + region.batchMutate(mutations.toArray(new Mutation[mutations.size()])); } public static MetaDataMutationResult updateIndexState(String indexTableName, long minTimeStamp, http://git-wip-us.apache.org/repos/asf/phoenix/blob/62027bff/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java index a3c8787..1d7678a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java @@ -20,6 +20,7 @@ package org.apache.phoenix.util; import java.io.IOException; import java.sql.SQLException; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -34,6 +35,7 @@ import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.Region.RowLock; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException; import org.apache.phoenix.exception.PhoenixIOException; @@ -234,4 +236,22 @@ public class ServerUtil { endKey) < 0)); } + public static RowLock acquireLock(Region region, byte[] key, List<RowLock> locks) + throws IOException { + RowLock rowLock = region.getRowLock(key, false); + if (rowLock == null) { + throw new IOException("Failed to acquire lock on " + Bytes.toStringBinary(key)); + } + locks.add(rowLock); + return rowLock; + } + + public static void releaseRowLocks(List<RowLock> rowLocks) { + if (rowLocks != null) { + for (RowLock rowLock : rowLocks) { + rowLock.release(); + } + rowLocks.clear(); + } + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/62027bff/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index ea90e8a..1ad8f53 100644 --- a/pom.xml +++ b/pom.xml @@ -66,7 +66,7 @@ <top.dir>${project.basedir}</top.dir> <!-- Hadoop Versions --> - <hbase.version>2.0.0-alpha4-SNAPSHOT</hbase.version> + <hbase.version>2.0.0-alpha4</hbase.version> <hadoop-two.version>2.7.1</hadoop-two.version> <!-- Dependency versions -->