http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyRegionOperation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyRegionOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyRegionOperation.java index 33cfa09..3cc988f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyRegionOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyRegionOperation.java @@ -140,11 +140,11 @@ public class DestroyRegionOperation extends DistributedCacheOperation { protected HashMap subregionSerialNumbers; protected boolean notifyOfRegionDeparture; + /** * true if need to automatically recreate region, and mark destruction as a reinitialization */ protected transient LocalRegion lockRoot = null; // used for early destroy - // lock acquisition @Override protected InternalCacheEvent createEvent(DistributedRegion rgn) throws EntryNotFoundException { @@ -158,9 +158,8 @@ public class DestroyRegionOperation extends DistributedCacheOperation { } protected RegionEventImpl createRegionEvent(DistributedRegion rgn) { - RegionEventImpl event = new RegionEventImpl(rgn, getOperation(), this.callbackArg, - true /* originRemote */, getSender()); - return event; + return new RegionEventImpl(rgn, getOperation(), this.callbackArg, true /* originRemote */, + getSender()); } private Runnable destroyOp(final DistributionManager dm, final LocalRegion lclRgn, @@ -183,12 +182,12 @@ public class DestroyRegionOperation extends DistributedCacheOperation { advisee = PartitionedRegionHelper.getProxyBucketRegion(GemFireCacheImpl.getInstance(), regionPath, waitForBucketInitializationToComplete); - } catch (PRLocallyDestroyedException e) { + } catch (PRLocallyDestroyedException ignore) { // region not found - it's been destroyed - } catch (RegionDestroyedException e) { + } catch (RegionDestroyedException ignore) { // ditto } catch (PartitionedRegionException e) { - if (e.getMessage().indexOf("destroyed") == -1) { + if (!e.getMessage().contains("destroyed")) { throw e; } // region failed registration & is unusable @@ -228,11 +227,11 @@ public class DestroyRegionOperation extends DistributedCacheOperation { } doRegionDestroy(event); - } catch (RegionDestroyedException e) { + } catch (RegionDestroyedException ignore) { logger.debug("{} Region destroyed: nothing to do", this); - } catch (CancelException e) { + } catch (CancelException ignore) { logger.debug("{} Cancelled: nothing to do", this); - } catch (EntryNotFoundException e) { + } catch (EntryNotFoundException ignore) { logger.debug("{} Entry not found, nothing to do", this); } catch (VirtualMachineError err) { SystemFailure.initiateFailure(err); @@ -292,7 +291,7 @@ public class DestroyRegionOperation extends DistributedCacheOperation { // pool, the entry // update is allowed to complete. dm.getWaitingThreadPool().execute(destroyOp(dm, lclRgn, sendReply)); - } catch (RejectedExecutionException e) { + } catch (RejectedExecutionException ignore) { // rejected while trying to execute destroy thread // must be shutting down, just quit } @@ -303,19 +302,19 @@ public class DestroyRegionOperation extends DistributedCacheOperation { // shared region, since another cache may // have already destroyed it in shared memory, in which our listeners // still need to be called and java region object cleaned up. - GemFireCacheImpl c = (GemFireCacheImpl) CacheFactory.getInstance(sys); + InternalCache cache = (InternalCache) CacheFactory.getInstance(sys); // only get the region while holding the appropriate destroy lock. // this prevents us from getting a "stale" region if (getOperation().isDistributed()) { String rootName = GemFireCacheImpl.parsePath(path)[0]; - this.lockRoot = (LocalRegion) c.getRegion(rootName); + this.lockRoot = (LocalRegion) cache.getRegion(rootName); if (this.lockRoot == null) return null; this.lockRoot.acquireDestroyLock(); } - return (LocalRegion) c.getRegion(path); + return (LocalRegion) cache.getRegion(path); } private void disableRegionDepartureNotification() { @@ -411,15 +410,15 @@ public class DestroyRegionOperation extends DistributedCacheOperation { rgn.basicDestroyRegion(ev, false /* cacheWrite */, false /* lock */, true/* cacheCallbacks */); } - } catch (CacheWriterException e) { + } catch (CacheWriterException ignore) { throw new Error( LocalizedStrings.DestroyRegionOperation_CACHEWRITER_SHOULD_NOT_HAVE_BEEN_CALLED .toLocalizedString()); - } catch (TimeoutException e) { + } catch (TimeoutException ignore) { throw new Error( LocalizedStrings.DestroyRegionOperation_DISTRIBUTEDLOCK_SHOULD_NOT_HAVE_BEEN_ACQUIRED .toLocalizedString()); - } catch (RejectedExecutionException e) { + } catch (RejectedExecutionException ignore) { // rejected while trying to execute recreate thread // must be shutting down, so what we were trying to do must not be // important anymore, so just quit @@ -468,13 +467,13 @@ public class DestroyRegionOperation extends DistributedCacheOperation { } public static final class DestroyRegionWithContextMessage extends DestroyRegionMessage { + protected transient Object context; @Override final public RegionEventImpl createRegionEvent(DistributedRegion rgn) { - ClientRegionEventImpl event = new ClientRegionEventImpl(rgn, getOperation(), this.callbackArg, + return new ClientRegionEventImpl(rgn, getOperation(), this.callbackArg, true /* originRemote */, getSender(), (ClientProxyMembershipID) this.context); - return event; } @Override
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/DiskEntry.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskEntry.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskEntry.java index bf7c4d2..f78a6c1 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskEntry.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskEntry.java @@ -27,7 +27,6 @@ import org.apache.geode.internal.ByteArrayDataInput; import org.apache.geode.internal.HeapDataOutputStream; import org.apache.geode.internal.Version; import org.apache.geode.internal.cache.DiskStoreImpl.AsyncDiskEntry; -import org.apache.geode.internal.cache.Token.Tombstone; import org.apache.geode.internal.cache.lru.EnableLRU; import org.apache.geode.internal.cache.lru.LRUClockNode; import org.apache.geode.internal.cache.lru.LRUEntry; @@ -52,18 +51,14 @@ import org.apache.geode.internal.util.BlobHelper; * provides accessor and mutator methods for a disk entry's state. This allows us to abstract all of * the interesting behavior into a {@linkplain DiskEntry.Helper helper class} that we only need to * implement once. - * - * <P> - * - * Each <code>DiskEntry</code> has a unique <code>id</code> that is used by the {@link DiskRegion} - * to identify the key/value pair. Before the disk entry is written to disk, the value of the - * <code>id</code> is {@link DiskRegion#INVALID_ID invalid}. Once the object has been written to - * disk, the <code>id</code> is a positive number. If the value is {@linkplain Helper#update - * updated}, then the <code>id</code> is negated to signify that the value on disk is dirty. + * <p> + * Each {@code DiskEntry} has a unique {@code id} that is used by the {@link DiskRegion} to identify + * the key/value pair. Before the disk entry is written to disk, the value of the {@code id} is + * {@link DiskRegion#INVALID_ID invalid}. Once the object has been written to disk, the {@code id} + * is a positive number. If the value is {@linkplain Helper#update updated}, then the {@code id} is + * negated to signify that the value on disk is dirty. * * @see DiskRegion - * - * * @since GemFire 3.2 */ public interface DiskEntry extends RegionEntry { @@ -78,8 +73,6 @@ public interface DiskEntry extends RegionEntry { /** * In some cases we need to do something just before we drop the value from a DiskEntry that is * being moved (i.e. overflowed) to disk. - * - * @param context */ public void handleValueOverflow(RegionEntryContext context); @@ -90,12 +83,10 @@ public interface DiskEntry extends RegionEntry { public boolean isRemovedFromDisk(); /** - * Returns the id of this <code>DiskEntry</code> + * Returns the id of this {@code DiskEntry} */ public DiskId getDiskId(); - public void _removePhase1(); - public int updateAsyncEntrySize(EnableLRU capacityController); public DiskEntry getPrev(); @@ -119,10 +110,8 @@ public interface DiskEntry extends RegionEntry { */ public static final byte[] TOMBSTONE_BYTES = new byte[0]; - /////////////////////// Inner Classes ////////////////////// - /** - * A Helper class for performing functions common to all <code>DiskEntry</code>s. + * A Helper class for performing functions common to all {@code DiskEntry}s. */ public static class Helper { private static final Logger logger = LogService.getLogger(); @@ -185,12 +174,10 @@ public interface DiskEntry extends RegionEntry { } } - /** * Get the value of an entry that is on disk without faulting it in . It checks for the presence * in the buffer also. This method is used for concurrent map operations and CQ processing * - * @throws DiskAccessException * @since GemFire 5.1 */ static Object getValueOnDiskOrBuffer(DiskEntry entry, DiskRegion dr, @@ -223,8 +210,8 @@ public interface DiskEntry extends RegionEntry { synchronized (syncObj) { if (did != null && did.isPendingAsync()) { @Retained - Object v = entry._getValueRetain(context, true); // TODO:KIRK:OK Rusty had Object v = - // entry.getValueWithContext(context); + Object v = entry._getValueRetain(context, true); + if (Token.isRemovedFromDisk(v)) { v = null; } @@ -309,9 +296,11 @@ public interface DiskEntry extends RegionEntry { entry.setLastModified(mgr, de.getLastModified()); ReferenceCountHelper.setReferenceCountOwner(entry); - v = de._getValueRetain(context, true); // OFFHEAP copied to heap entry; todo allow entry - // to refer to offheap since it will be copied to - // network. + + // OFFHEAP copied to heap entry; + // TODO: allow entry to refer to offheap since it will be copied to network. + v = de._getValueRetain(context, true); + ReferenceCountHelper.setReferenceCountOwner(null); if (v == null) { if (did == null) { @@ -331,7 +320,7 @@ public interface DiskEntry extends RegionEntry { BytesAndBits bb = null; try { bb = dr.getBytesAndBits(did, false); - } catch (DiskAccessException dae) { + } catch (DiskAccessException ignore) { return false; } if (EntryBits.isInvalid(bb.getBits())) { @@ -367,8 +356,7 @@ public interface DiskEntry extends RegionEntry { Object tmp = cd.getValue(); if (tmp instanceof byte[]) { - byte[] bb = (byte[]) tmp; - entry.value = bb; + entry.value = (byte[]) tmp; entry.setSerialized(true); } else { try { @@ -378,11 +366,10 @@ public interface DiskEntry extends RegionEntry { entry.value = hdos; entry.setSerialized(true); } catch (IOException e) { - RuntimeException e2 = new IllegalArgumentException( + throw new IllegalArgumentException( LocalizedStrings.DiskEntry_AN_IOEXCEPTION_WAS_THROWN_WHILE_SERIALIZING - .toLocalizedString()); - e2.initCause(e); - throw e2; + .toLocalizedString(), + e); } } } @@ -460,7 +447,7 @@ public interface DiskEntry extends RegionEntry { } else { entry.setValueWithContext(drv, entry.prepareValueForCache((RegionEntryContext) r, re.getValue(), false)); - if (!Tombstone.isInvalidOrRemoved(re.getValue())) { + if (!Token.isInvalidOrRemoved(re.getValue())) { updateStats(drv, r, 1/* InVM */, 0/* OnDisk */, 0); } } @@ -574,7 +561,7 @@ public interface DiskEntry extends RegionEntry { if (this.bytes == null) { return "null"; } - StringBuffer sb = new StringBuffer(); + StringBuilder sb = new StringBuilder(); int len = getLength(); for (int i = 0; i < len; i++) { sb.append(this.bytes[i]).append(", "); @@ -808,8 +795,6 @@ public interface DiskEntry extends RegionEntry { /** * Writes the key/value object stored in the given entry to disk * - * @throws RegionClearedException - * * @see DiskRegion#put */ private static void writeToDisk(DiskEntry entry, LocalRegion region, boolean async, @@ -833,8 +818,6 @@ public interface DiskEntry extends RegionEntry { /** * Updates the value of the disk entry with a new value. This allows us to free up disk space in * the non-backup case. - * - * @throws RegionClearedException */ public static void update(DiskEntry entry, LocalRegion region, Object newValue, EntryEventImpl event) throws RegionClearedException { @@ -892,7 +875,7 @@ public interface DiskEntry extends RegionEntry { if (caughtCacheClosed) { // 47616: not to set the value to be removedFromDisk since it failed to persist } else { - // Asif Ensure that the value is rightly set despite clear so + // Ensure that the value is rightly set despite clear so // that it can be distributed correctly entry.setValueWithContext(region, newValue); // OFFHEAP newValue was already // preparedForCache @@ -1010,12 +993,12 @@ public interface DiskEntry extends RegionEntry { @Retained public static Object getValueOffHeapOrDiskWithoutFaultIn(DiskEntry entry, LocalRegion region) { @Retained - Object v = entry._getValueRetain(region, true); // TODO:KIRK:OK Object v = - // entry.getValueWithContext(region); + Object v = entry._getValueRetain(region, true); + if (v == null || Token.isRemovedFromDisk(v) && !region.isIndexCreationThread()) { synchronized (entry) { - v = entry._getValueRetain(region, true); // TODO:KIRK:OK v = - // entry.getValueWithContext(region); + v = entry._getValueRetain(region, true); + if (v == null) { v = Helper.getOffHeapValueOnDiskOrBuffer(entry, region.getDiskRegion(), region); } @@ -1024,24 +1007,10 @@ public interface DiskEntry extends RegionEntry { if (Token.isRemovedFromDisk(v)) { // fix for bug 31800 v = null; - // } else if (v instanceof ByteSource) { - // // If the ByteSource contains a Delta or ListOfDelta then we want to deserialize it - // Object deserVal = ((CachedDeserializable)v).getDeserializedForReading(); - // if (deserVal != v) { - // OffHeapHelper.release(v); - // v = deserVal; - // } } return v; } - /** - * - * @param entry - * @param region - * @return Value - * @throws DiskAccessException - */ public static Object faultInValue(DiskEntry entry, LocalRegion region) { return faultInValue(entry, region, false); } @@ -1058,8 +1027,8 @@ public interface DiskEntry extends RegionEntry { private static Object faultInValue(DiskEntry entry, LocalRegion region, boolean retainResult) { DiskRegion dr = region.getDiskRegion(); @Retained - Object v = entry._getValueRetain(region, true); // TODO:KIRK:OK Object v = - // entry.getValueWithContext(region); + Object v = entry._getValueRetain(region, true); + boolean lruFaultedIn = false; boolean done = false; try { @@ -1071,7 +1040,7 @@ public interface DiskEntry extends RegionEntry { // See if it is pending async because of a faultOut. // If so then if we are not a backup then we can unschedule the pending async. // In either case we need to do the lruFaultIn logic. - boolean evicted = ((LRUEntry) entry).testEvicted(); + boolean evicted = ((LRUClockNode) entry).testEvicted(); if (evicted) { if (!dr.isBackup()) { // @todo do we also need a bit that tells us if it is in the async queue? @@ -1086,8 +1055,8 @@ public interface DiskEntry extends RegionEntry { } if (!done && (v == null || Token.isRemovedFromDisk(v) && !region.isIndexCreationThread())) { synchronized (entry) { - v = entry._getValueRetain(region, true); // TODO:KIRK:OK v = - // entry.getValueWithContext(region); + v = entry._getValueRetain(region, true); + if (v == null) { v = readValueFromDisk(entry, region); if (entry instanceof LRUEntry) { @@ -1126,8 +1095,7 @@ public interface DiskEntry extends RegionEntry { DiskId did = entry.getDiskId(); if (did != null) { Object value = null; - DiskRecoveryStore region = recoveryStore; - DiskRegionView dr = region.getDiskRegionView(); + DiskRegionView dr = recoveryStore.getDiskRegionView(); dr.acquireReadLock(); try { synchronized (did) { @@ -1135,7 +1103,7 @@ public interface DiskEntry extends RegionEntry { if (oplogId == did.getOplogId()) { value = getValueFromDisk(dr, did, in); if (value != null) { - setValueOnFaultIn(value, did, entry, dr, region); + setValueOnFaultIn(value, did, entry, dr, recoveryStore); } } } @@ -1194,7 +1162,7 @@ public interface DiskEntry extends RegionEntry { try { if (recoveryStore.getEvictionAttributes() != null && recoveryStore.getEvictionAttributes().getAlgorithm().isLIFO()) { - ((VMLRURegionMap) recoveryStore.getRegionMap()).updateStats(); + ((AbstractLRURegionMap) recoveryStore.getRegionMap()).updateStats(); return; } // this must be done after releasing synchronization @@ -1314,24 +1282,18 @@ public interface DiskEntry extends RegionEntry { } /** - * Writes the value of this <code>DiskEntry</code> to disk and <code>null</code> s out the - * reference to the value to free up VM space. + * Writes the value of this {@code DiskEntry} to disk and {@code null} s out the reference to + * the value to free up VM space. * <p> * Note that if the value had already been written to disk, it is not written again. * <p> * Caller must synchronize on entry and it is assumed the entry is evicted - * - * see #writeToDisk - * - * @throws RegionClearedException */ public static int overflowToDisk(DiskEntry entry, LocalRegion region, EnableLRU ccHelper) throws RegionClearedException { DiskRegion dr = region.getDiskRegion(); - final int oldSize = region.calculateRegionEntryValueSize(entry);; - // Asif:Get diskID . If it is null, it implies it is - // overflow only mode. - // long id = entry.getDiskId().getKeyId(); + final int oldSize = region.calculateRegionEntryValueSize(entry); + // Get diskID . If it is null, it implies it is overflow only mode. DiskId did = entry.getDiskId(); if (did == null) { ((LRUEntry) entry).setDelayedDiskId(region); @@ -1348,7 +1310,7 @@ public interface DiskEntry extends RegionEntry { return 0; } - // TODO:Asif: Check if we need to overflow even when id is = 0 + // TODO: Check if we need to overflow even when id is = 0 boolean wasAlreadyPendingAsync = did.isPendingAsync(); if (did.needsToBeWritten()) { if (dr.isSync()) { @@ -1474,7 +1436,7 @@ public interface DiskEntry extends RegionEntry { // Only setValue to null if this was an evict. // We could just be a backup that is writing async. if (!Token.isInvalid(entryVal) && (entryVal != Token.TOMBSTONE) - && entry instanceof LRUEntry && ((LRUEntry) entry).testEvicted()) { + && entry instanceof LRUEntry && ((LRUClockNode) entry).testEvicted()) { // Moved this here to fix bug 40116. region.updateSizeOnEvict(entry.getKey(), entryValSize); updateStats(dr, region, -1/* InVM */, 1/* OnDisk */, did.getValueLength()); @@ -1603,11 +1565,6 @@ public interface DiskEntry extends RegionEntry { return result; } - /** - * @param entry - * @param region - * @param tag - */ public static void updateVersionOnly(DiskEntry entry, LocalRegion region, VersionTag tag) { DiskRegion dr = region.getDiskRegion(); if (!dr.isBackup()) { @@ -1709,7 +1666,6 @@ public interface DiskEntry extends RegionEntry { } /** - * * @return byte indicating the user bits. The correct value is returned only in the specific * case of entry recovered from oplog ( & not rolled to Htree) & the RECOVER_VALUES flag * is false . In other cases the exact value is not needed http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/DiskRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskRegion.java index 6d4b598..f8b8289 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskRegion.java @@ -279,12 +279,12 @@ public class DiskRegion extends AbstractDiskRegion { private void destroyOldTomstones(final DiskRecoveryStore drs) { // iterate over all region entries in drs drs.foreachRegionEntry(new RegionEntryCallback() { - public void handleRegionEntry(RegionEntry re) { - DiskEntry de = (DiskEntry) re; + public void handleRegionEntry(RegionEntry regionEntry) { + DiskEntry de = (DiskEntry) regionEntry; synchronized (de) { DiskId id = de.getDiskId(); - if (id != null && re.isTombstone()) { - VersionStamp stamp = re.getVersionStamp(); + if (id != null && regionEntry.isTombstone()) { + VersionStamp stamp = regionEntry.getVersionStamp(); if (getRegionVersionVector().isTombstoneTooOld(stamp.getMemberID(), stamp.getRegionVersion())) { drs.destroyRecoveredEntry(de.getKey()); @@ -299,8 +299,8 @@ public class DiskRegion extends AbstractDiskRegion { private void destroyRemainingRecoveredEntries(final DiskRecoveryStore drs) { // iterate over all region entries in drs drs.foreachRegionEntry(new RegionEntryCallback() { - public void handleRegionEntry(RegionEntry re) { - DiskEntry de = (DiskEntry) re; + public void handleRegionEntry(RegionEntry regionEntry) { + DiskEntry de = (DiskEntry) regionEntry; synchronized (de) { DiskId id = de.getDiskId(); if (id != null) { @@ -320,8 +320,8 @@ public class DiskRegion extends AbstractDiskRegion { public void resetRecoveredEntries(final DiskRecoveryStore drs) { // iterate over all region entries in drs drs.foreachRegionEntry(new RegionEntryCallback() { - public void handleRegionEntry(RegionEntry re) { - DiskEntry de = (DiskEntry) re; + public void handleRegionEntry(RegionEntry regionEntry) { + DiskEntry de = (DiskEntry) regionEntry; synchronized (de) { DiskId id = de.getDiskId(); if (id != null) { @@ -770,13 +770,13 @@ public class DiskRegion extends AbstractDiskRegion { return; } region.foreachRegionEntry(new RegionEntryCallback() { - public void handleRegionEntry(RegionEntry re) { - DiskEntry de = (DiskEntry) re; + public void handleRegionEntry(RegionEntry regionEntry) { + DiskEntry de = (DiskEntry) regionEntry; DiskId id = de.getDiskId(); if (id != null) { synchronized (id) { - re.setValueToNull(); // TODO why call _setValue twice in a row? - re.removePhase2(); + regionEntry.setValueToNull(); // TODO why call _setValue twice in a row? + regionEntry.removePhase2(); id.unmarkForWriting(); if (EntryBits.isNeedsValue(id.getUserBits())) { long oplogId = id.getOplogId(); http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreBackup.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreBackup.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreBackup.java index 6f50c9f..309dea3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreBackup.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreBackup.java @@ -26,8 +26,6 @@ import org.apache.geode.internal.cache.persistence.BackupInspector; * oplogs that still need to be backed up, along with the lists of oplog files that should be * deleted when the oplog is backed up. See * {@link DiskStoreImpl#startBackup(File, BackupInspector, org.apache.geode.internal.cache.persistence.RestoreScript)} - * - * */ public class DiskStoreBackup { http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreFactoryImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreFactoryImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreFactoryImpl.java index 000bf0d..7a7044b 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreFactoryImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreFactoryImpl.java @@ -18,7 +18,6 @@ import java.io.File; import java.util.Arrays; import org.apache.geode.GemFireIOException; -import org.apache.geode.cache.Cache; import org.apache.geode.cache.DiskStoreFactory; import org.apache.geode.cache.DiskStore; import org.apache.geode.distributed.internal.ResourceEvent; @@ -35,14 +34,15 @@ import org.apache.geode.pdx.internal.TypeRegistry; * @since GemFire prPersistSprint2 */ public class DiskStoreFactoryImpl implements DiskStoreFactory { - private final Cache cache; + + private final InternalCache cache; private final DiskStoreAttributes attrs = new DiskStoreAttributes(); - public DiskStoreFactoryImpl(Cache cache) { + public DiskStoreFactoryImpl(InternalCache cache) { this.cache = cache; } - public DiskStoreFactoryImpl(Cache cache, DiskStoreAttributes attrs) { + public DiskStoreFactoryImpl(InternalCache cache, DiskStoreAttributes attrs) { this.attrs.name = attrs.name; setAutoCompact(attrs.getAutoCompact()); setAllowForceCompaction(attrs.getAllowForceCompaction()); @@ -90,13 +90,13 @@ public class DiskStoreFactoryImpl implements DiskStoreFactory { if (compactionThreshold < 0) { throw new IllegalArgumentException( LocalizedStrings.DiskWriteAttributesImpl_0_HAS_TO_BE_POSITIVE_NUMBER_AND_THE_VALUE_GIVEN_1_IS_NOT_ACCEPTABLE - .toLocalizedString(new Object[] {CacheXml.COMPACTION_THRESHOLD, - Integer.valueOf(compactionThreshold)})); + .toLocalizedString( + new Object[] {CacheXml.COMPACTION_THRESHOLD, compactionThreshold})); } else if (compactionThreshold > 100) { throw new IllegalArgumentException( LocalizedStrings.DiskWriteAttributesImpl_0_HAS_TO_BE_LESS_THAN_2_BUT_WAS_1 - .toLocalizedString(new Object[] {CacheXml.COMPACTION_THRESHOLD, - Integer.valueOf(compactionThreshold), Integer.valueOf(100)})); + .toLocalizedString( + new Object[] {CacheXml.COMPACTION_THRESHOLD, compactionThreshold, 100})); } this.attrs.compactionThreshold = compactionThreshold; return this; @@ -106,7 +106,7 @@ public class DiskStoreFactoryImpl implements DiskStoreFactory { if (timeInterval < 0) { throw new IllegalArgumentException( LocalizedStrings.DiskWriteAttributesFactory_TIME_INTERVAL_SPECIFIED_HAS_TO_BE_A_NONNEGATIVE_NUMBER_AND_THE_VALUE_GIVEN_0_IS_NOT_ACCEPTABLE - .toLocalizedString(Long.valueOf(timeInterval))); + .toLocalizedString(timeInterval)); } this.attrs.timeInterval = timeInterval; return this; @@ -116,14 +116,12 @@ public class DiskStoreFactoryImpl implements DiskStoreFactory { InternalRegionArguments internalRegionArgs) { this.attrs.name = name; synchronized (this.cache) { - assert this.cache instanceof GemFireCacheImpl; - GemFireCacheImpl gfc = (GemFireCacheImpl) this.cache; DiskStoreImpl ds = - new DiskStoreImpl(gfc, this.attrs, true/* ownedByRegion */, internalRegionArgs); + new DiskStoreImpl(this.cache, this.attrs, true/* ownedByRegion */, internalRegionArgs); if (isOwnedByPR) { ds.doInitialRecovery(); } - gfc.addRegionOwnedDiskStore(ds); + this.cache.addRegionOwnedDiskStore(ds); return ds; } } @@ -137,15 +135,14 @@ public class DiskStoreFactoryImpl implements DiskStoreFactory { result = findExisting(name); if (result == null) { if (this.cache instanceof GemFireCacheImpl) { - GemFireCacheImpl gfc = (GemFireCacheImpl) this.cache; - TypeRegistry registry = gfc.getPdxRegistry(); - DiskStoreImpl dsi = new DiskStoreImpl(gfc, this.attrs); + TypeRegistry registry = this.cache.getPdxRegistry(); + DiskStoreImpl dsi = new DiskStoreImpl(this.cache, this.attrs); result = dsi; - /** Added for M&M **/ - gfc.getInternalDistributedSystem().handleResourceEvent(ResourceEvent.DISKSTORE_CREATE, - dsi); + // Added for M&M + this.cache.getInternalDistributedSystem() + .handleResourceEvent(ResourceEvent.DISKSTORE_CREATE, dsi); dsi.doInitialRecovery(); - gfc.addDiskStore(dsi); + this.cache.addDiskStore(dsi); if (registry != null) { registry.creatingDiskStore(dsi); } @@ -163,8 +160,7 @@ public class DiskStoreFactoryImpl implements DiskStoreFactory { // member depends on state that goes into this disk store // that isn't backed up. if (this.cache instanceof GemFireCacheImpl) { - GemFireCacheImpl gfc = (GemFireCacheImpl) this.cache; - BackupManager backup = gfc.getBackupManager(); + BackupManager backup = this.cache.getBackupManager(); if (backup != null) { backup.waitForBackup(); } @@ -175,8 +171,7 @@ public class DiskStoreFactoryImpl implements DiskStoreFactory { private DiskStore findExisting(String name) { DiskStore existing = null; if (this.cache instanceof GemFireCacheImpl) { - GemFireCacheImpl gfc = (GemFireCacheImpl) this.cache; - existing = gfc.findDiskStore(name); + existing = this.cache.findDiskStore(name); if (existing != null) { if (((DiskStoreImpl) existing).sameAs(this.attrs)) { return existing; @@ -192,8 +187,7 @@ public class DiskStoreFactoryImpl implements DiskStoreFactory { if (diskDirSizes.length != diskDirs.length) { throw new IllegalArgumentException( LocalizedStrings.AttributesFactory_NUMBER_OF_DISKSIZES_IS_0_WHICH_IS_NOT_EQUAL_TO_NUMBER_OF_DISK_DIRS_WHICH_IS_1 - .toLocalizedString(new Object[] {Integer.valueOf(diskDirSizes.length), - Integer.valueOf(diskDirs.length)})); + .toLocalizedString(new Object[] {diskDirSizes.length, diskDirs.length})); } verifyNonNegativeDirSize(diskDirSizes); checkIfDirectoriesExist(diskDirs); @@ -207,8 +201,6 @@ public class DiskStoreFactoryImpl implements DiskStoreFactory { /** * Checks if directories exist, if they don't then create those directories - * - * @param diskDirs */ public static void checkIfDirectoriesExist(File[] diskDirs) { for (int i = 0; i < diskDirs.length; i++) { @@ -225,15 +217,13 @@ public class DiskStoreFactoryImpl implements DiskStoreFactory { /** * Verify all directory sizes are positive - * - * @param sizes */ public static void verifyNonNegativeDirSize(int[] sizes) { for (int i = 0; i < sizes.length; i++) { if (sizes[i] < 0) { throw new IllegalArgumentException( LocalizedStrings.AttributesFactory_DIR_SIZE_CANNOT_BE_NEGATIVE_0 - .toLocalizedString(Integer.valueOf(sizes[i]))); + .toLocalizedString(sizes[i])); } } } @@ -254,7 +244,7 @@ public class DiskStoreFactoryImpl implements DiskStoreFactory { } else if (maxOplogSize < 0) { throw new IllegalArgumentException( LocalizedStrings.DiskWriteAttributesFactory_MAXIMUM_OPLOG_SIZE_SPECIFIED_HAS_TO_BE_A_NONNEGATIVE_NUMBER_AND_THE_VALUE_GIVEN_0_IS_NOT_ACCEPTABLE - .toLocalizedString(Long.valueOf(maxOplogSize))); + .toLocalizedString(maxOplogSize)); } this.attrs.maxOplogSizeInBytes = maxOplogSize * (1024 * 1024); return this; @@ -267,7 +257,7 @@ public class DiskStoreFactoryImpl implements DiskStoreFactory { if (maxOplogSizeInBytes < 0) { throw new IllegalArgumentException( LocalizedStrings.DiskWriteAttributesFactory_MAXIMUM_OPLOG_SIZE_SPECIFIED_HAS_TO_BE_A_NONNEGATIVE_NUMBER_AND_THE_VALUE_GIVEN_0_IS_NOT_ACCEPTABLE - .toLocalizedString(Long.valueOf(maxOplogSizeInBytes))); + .toLocalizedString(maxOplogSizeInBytes)); } this.attrs.maxOplogSizeInBytes = maxOplogSizeInBytes; return this; @@ -277,7 +267,7 @@ public class DiskStoreFactoryImpl implements DiskStoreFactory { if (queueSize < 0) { throw new IllegalArgumentException( LocalizedStrings.DiskWriteAttributesFactory_QUEUE_SIZE_SPECIFIED_HAS_TO_BE_A_NONNEGATIVE_NUMBER_AND_THE_VALUE_GIVEN_0_IS_NOT_ACCEPTABLE - .toLocalizedString(Integer.valueOf(queueSize))); + .toLocalizedString(queueSize)); } this.attrs.queueSize = queueSize; return this; @@ -285,10 +275,10 @@ public class DiskStoreFactoryImpl implements DiskStoreFactory { public DiskStoreFactory setWriteBufferSize(int writeBufferSize) { if (writeBufferSize < 0) { - // TODO Gester add a message for WriteBufferSize + // TODO add a message for WriteBufferSize throw new IllegalArgumentException( LocalizedStrings.DiskWriteAttributesFactory_QUEUE_SIZE_SPECIFIED_HAS_TO_BE_A_NONNEGATIVE_NUMBER_AND_THE_VALUE_GIVEN_0_IS_NOT_ACCEPTABLE - .toLocalizedString(Integer.valueOf(writeBufferSize))); + .toLocalizedString(writeBufferSize)); } this.attrs.writeBufferSize = writeBufferSize; return this; http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java index d1609ca..aeabbbc 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java @@ -14,19 +14,62 @@ */ package org.apache.geode.internal.cache; -import static org.apache.geode.distributed.ConfigurationProperties.CACHE_XML_FILE; -import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; -import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; +import static org.apache.geode.distributed.ConfigurationProperties.*; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.PrintStream; +import java.net.InetAddress; +import java.nio.channels.ClosedByInterruptException; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import it.unimi.dsi.fastutil.ints.IntOpenHashSet; import it.unimi.dsi.fastutil.longs.LongOpenHashSet; import org.apache.commons.io.FileUtils; +import org.apache.logging.log4j.Logger; + import org.apache.geode.CancelCriterion; import org.apache.geode.CancelException; import org.apache.geode.StatisticsFactory; import org.apache.geode.SystemFailure; import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheClosedException; +import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.DiskAccessException; import org.apache.geode.cache.DiskStore; import org.apache.geode.cache.DiskStoreFactory; @@ -72,55 +115,11 @@ import org.apache.geode.pdx.internal.EnumInfo; import org.apache.geode.pdx.internal.PdxField; import org.apache.geode.pdx.internal.PdxType; import org.apache.geode.pdx.internal.PeerTypeRegistration; -import org.apache.logging.log4j.Logger; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.FilenameFilter; -import java.io.IOException; -import java.io.PrintStream; -import java.net.InetAddress; -import java.nio.channels.ClosedByInterruptException; -import java.nio.channels.FileChannel; -import java.nio.channels.FileLock; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.TreeSet; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; -import java.util.regex.Matcher; -import java.util.regex.Pattern; /** * Represents a (disk-based) persistent store for region data. Used for both persistent recoverable * regions and overflow-only regions. - * - * + * * @since GemFire 3.2 */ @SuppressWarnings("synthetic-access") @@ -128,6 +127,7 @@ public class DiskStoreImpl implements DiskStore { private static final Logger logger = LogService.getLogger(); private static final String BACKUP_DIR_PREFIX = "dir"; + public static final boolean KRF_DEBUG = Boolean.getBoolean("disk.KRF_DEBUG"); public static final int MAX_OPEN_INACTIVE_OPLOGS = @@ -166,6 +166,7 @@ public class DiskStoreImpl implements DiskStore { public static final String RECOVER_VALUE_PROPERTY_NAME = DistributionConfig.GEMFIRE_PREFIX + "disk.recoverValues"; + public static final String RECOVER_VALUES_SYNC_PROPERTY_NAME = DistributionConfig.GEMFIRE_PREFIX + "disk.recoverValuesSync"; @@ -177,9 +178,12 @@ public class DiskStoreImpl implements DiskStore { DistributionConfig.GEMFIRE_PREFIX + "disk.recoverLruValues"; boolean RECOVER_VALUES = getBoolean(DiskStoreImpl.RECOVER_VALUE_PROPERTY_NAME, true); + boolean RECOVER_VALUES_SYNC = getBoolean(DiskStoreImpl.RECOVER_VALUES_SYNC_PROPERTY_NAME, false); + boolean FORCE_KRF_RECOVERY = getBoolean(DistributionConfig.GEMFIRE_PREFIX + "disk.FORCE_KRF_RECOVERY", false); + final boolean RECOVER_LRU_VALUES = getBoolean(DiskStoreImpl.RECOVER_LRU_VALUES_PROPERTY_NAME, false); @@ -188,7 +192,9 @@ public class DiskStoreImpl implements DiskStore { } public static final long MIN_RESERVED_DRID = 1; + public static final long MAX_RESERVED_DRID = 8; + static final long MIN_DRID = MAX_RESERVED_DRID + 1; /** @@ -205,9 +211,7 @@ public class DiskStoreImpl implements DiskStore { private final int MAX_OPLOGS_PER_COMPACTION = Integer.getInteger( DistributionConfig.GEMFIRE_PREFIX + "MAX_OPLOGS_PER_COMPACTION", Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "MAX_OPLOGS_PER_ROLL", 1).intValue()); - /** - * - */ + public static final int MAX_CONCURRENT_COMPACTIONS = Integer.getInteger( DistributionConfig.GEMFIRE_PREFIX + "MAX_CONCURRENT_COMPACTIONS", Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "MAX_CONCURRENT_ROLLS", 1).intValue()); @@ -219,6 +223,7 @@ public class DiskStoreImpl implements DiskStore { */ public static final int MAX_PENDING_TASKS = Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "disk.MAX_PENDING_TASKS", 6); + /** * This system property indicates that IF should also be preallocated. This property will be used * in conjunction with the PREALLOCATE_OPLOGS property. If PREALLOCATE_OPLOGS is ON the below will @@ -227,6 +232,7 @@ public class DiskStoreImpl implements DiskStore { static final boolean PREALLOCATE_IF = !System.getProperty(DistributionConfig.GEMFIRE_PREFIX + "preAllocateIF", "true") .equalsIgnoreCase("false"); + /** * This system property indicates that Oplogs should be preallocated till the maxOplogSize as * specified for the disk store. @@ -252,19 +258,14 @@ public class DiskStoreImpl implements DiskStore { public static volatile HashSet<String> TEST_CHK_FALLOC_DIRS; public static volatile HashSet<String> TEST_NO_FALLOC_DIRS; - // /** delay for slowing down recovery, for testing purposes only */ - // public static volatile int recoverDelay = 0; - - // //////////////////// Instance Fields /////////////////////// - - private final GemFireCacheImpl cache; + private final InternalCache cache; /** The stats for this store */ private final DiskStoreStats stats; /** - * Asif:Added as stop gap arrangement to fix bug 39380. It is not a clean fix as keeping track of - * the threads acquiring read lock, etc is not a good idea to solve the issue + * Added as stop gap arrangement to fix bug 39380. It is not a clean fix as keeping track of the + * threads acquiring read lock, etc is not a good idea to solve the issue */ private final AtomicInteger entryOpsCount = new AtomicInteger(); /** @@ -291,10 +292,11 @@ public class DiskStoreImpl implements DiskStore { * is forced. If this value is 0 then no limit. */ private final int maxAsyncItems; + private final AtomicInteger forceFlushCount; + private final Object asyncMonitor; - // complex vars /** Compactor task which does the compaction. Null if compaction not possible. */ private final OplogCompactor oplogCompactor; @@ -303,7 +305,9 @@ public class DiskStoreImpl implements DiskStore { private volatile DiskStoreBackup diskStoreBackup = null; private final ReentrantReadWriteLock compactorLock = new ReentrantReadWriteLock(); + private final WriteLock compactorWriteLock = compactorLock.writeLock(); + private final ReadLock compactorReadLock = compactorLock.readLock(); /** @@ -316,37 +320,21 @@ public class DiskStoreImpl implements DiskStore { new AtomicReference<DiskAccessException>(); PersistentOplogSet persistentOplogs = new PersistentOplogSet(this); - OverflowOplogSet overflowOplogs = new OverflowOplogSet(this); - - // private boolean isThreadWaitingForSpace = false; - - /** - * Get the next available dir - */ - - // /** - // * Max timed wait for disk space to become available for an entry operation - // , - // * in milliseconds. This will be the maximum time for which a - // * create/modify/remove operation will wait so as to allow switch over & get - // a - // * new Oplog for writing. If no space is available in that time, - // * DiskAccessException will be thrown. The default wait will be for 120 - // * seconds - // */ - // private static final long MAX_WAIT_FOR_SPACE = Integer.getInteger( - // "MAX_WAIT_FOR_SPACE", 20).intValue() * 1000; + OverflowOplogSet overflowOplogs = new OverflowOplogSet(this); private final AtomicLong regionIdCtr = new AtomicLong(MIN_DRID); + /** * Only contains backup DiskRegions. The Value could be a RecoveredDiskRegion or a DiskRegion */ private final ConcurrentMap<Long, DiskRegion> drMap = new ConcurrentHashMap<Long, DiskRegion>(); + /** * A set of overflow only regions that are using this disk store. */ private final Set<DiskRegion> overflowMap = new ConcurrentHashSet<DiskRegion>(); + /** * Contains all of the disk recovery stores for which we are recovering values asnynchronously. */ @@ -369,9 +357,8 @@ public class DiskStoreImpl implements DiskStore { private final ThreadPoolExecutor diskStoreTaskPool; private final ThreadPoolExecutor delayedWritePool; - private volatile Future lastDelayedWrite; - // ///////////////////// Constructors ///////////////////////// + private volatile Future lastDelayedWrite; private static int calcCompactionThreshold(int ct) { if (ct == DiskStoreFactory.DEFAULT_COMPACTION_THRESHOLD) { @@ -387,19 +374,19 @@ public class DiskStoreImpl implements DiskStore { } /** - * Creates a new <code>DiskRegion</code> that access disk on behalf of the given region. + * Creates a new {@code DiskRegion} that access disk on behalf of the given region. */ - DiskStoreImpl(Cache cache, DiskStoreAttributes props) { + DiskStoreImpl(InternalCache cache, DiskStoreAttributes props) { this(cache, props, false, null); } - DiskStoreImpl(Cache cache, DiskStoreAttributes props, boolean ownedByRegion, + DiskStoreImpl(InternalCache cache, DiskStoreAttributes props, boolean ownedByRegion, InternalRegionArguments internalRegionArgs) { this(cache, props.getName(), props, ownedByRegion, internalRegionArgs, false, false/* upgradeVersionOnly */, false, false, true, false/* offlineModify */); } - DiskStoreImpl(Cache cache, String name, DiskStoreAttributes props, boolean ownedByRegion, + DiskStoreImpl(InternalCache cache, String name, DiskStoreAttributes props, boolean ownedByRegion, InternalRegionArguments internalRegionArgs, boolean offline, boolean upgradeVersionOnly, boolean offlineValidating, boolean offlineCompacting, boolean needsOplogs, boolean offlineModify) { @@ -427,7 +414,7 @@ public class DiskStoreImpl implements DiskStore { this.warningPercent = props.getDiskUsageWarningPercentage(); this.criticalPercent = props.getDiskUsageCriticalPercentage(); - this.cache = (GemFireCacheImpl) cache; + this.cache = cache; StatisticsFactory factory = cache.getDistributedSystem(); this.stats = new DiskStoreStats(factory, getName()); @@ -474,7 +461,7 @@ public class DiskStoreImpl implements DiskStore { this.maxDirSize = tempMaxDirSize * 1024 * 1024; this.infoFileDirIndex = 0; // Now that we no longer have db files, use all directories for oplogs - /** + /* * The infoFileDir contains the lock file and the init file. It will be directories[0] on a * brand new disk store. On an existing disk store it will be the directory the init file is * found in. @@ -495,7 +482,7 @@ public class DiskStoreImpl implements DiskStore { int MAXT = DiskStoreImpl.MAX_CONCURRENT_COMPACTIONS; final ThreadGroup compactThreadGroup = - LoggingThreadGroup.createThreadGroup("Oplog Compactor Thread Group", this.logger); + LoggingThreadGroup.createThreadGroup("Oplog Compactor Thread Group", logger); final ThreadFactory compactThreadFactory = GemfireCacheHelper.CreateThreadFactory(compactThreadGroup, "Idle OplogCompactor"); this.diskStoreTaskPool = new ThreadPoolExecutor(MAXT, MAXT, 10, TimeUnit.SECONDS, @@ -504,7 +491,7 @@ public class DiskStoreImpl implements DiskStore { final ThreadGroup deleteThreadGroup = - LoggingThreadGroup.createThreadGroup("Oplog Delete Thread Group", this.logger); + LoggingThreadGroup.createThreadGroup("Oplog Delete Thread Group", logger); final ThreadFactory deleteThreadFactory = GemfireCacheHelper.CreateThreadFactory(deleteThreadGroup, "Oplog Delete Task"); @@ -583,7 +570,7 @@ public class DiskStoreImpl implements DiskStore { } /** - * Returns the <code>DiskStoreStats</code> for this store + * Returns the {@code DiskStoreStats} for this store */ public DiskStoreStats getStats() { return this.stats; @@ -697,7 +684,7 @@ public class DiskStoreImpl implements DiskStore { * @param entry The entry which is going to be written to disk * @throws RegionClearedException If a clear operation completed before the put operation * completed successfully, resulting in the put operation to abort. - * @throws IllegalArgumentException If <code>id</code> is less than zero + * @throws IllegalArgumentException If {@code id} is less than zero */ final void put(LocalRegion region, DiskEntry entry, ValueWrapper value, boolean async) throws RegionClearedException { @@ -886,7 +873,6 @@ public class DiskStoreImpl implements DiskStore { * Given a BytesAndBits object convert it to the relevant Object (deserialize if necessary) and * return the object * - * @param bb * @return the converted object */ static Object convertBytesAndBitsIntoObject(BytesAndBits bb) { @@ -909,7 +895,6 @@ public class DiskStoreImpl implements DiskStore { /** * Given a BytesAndBits object get the serialized blob * - * @param bb * @return the converted object */ static Object convertBytesAndBitsToSerializedForm(BytesAndBits bb) { @@ -1029,7 +1014,7 @@ public class DiskStoreImpl implements DiskStore { * HTree with the oplog being destroyed * * @return null if entry has nothing stored on disk (id == INVALID_ID) - * @throws IllegalArgumentException If <code>id</code> is less than zero, no action is taken. + * @throws IllegalArgumentException If {@code id} is less than zero, no action is taken. */ public final Object getNoBuffer(DiskRegion dr, DiskId id) { BytesAndBits bb = null; @@ -1067,8 +1052,8 @@ public class DiskStoreImpl implements DiskStore { * * @throws RegionClearedException If a clear operation completed before the put operation * completed successfully, resulting in the put operation to abort. - * @throws IllegalArgumentException If <code>id</code> is {@linkplain #INVALID_ID invalid}or is - * less than zero, no action is taken. + * @throws IllegalArgumentException If {@code id} is {@linkplain #INVALID_ID invalid}or is less + * than zero, no action is taken. */ final void remove(LocalRegion region, DiskEntry entry, boolean async, boolean isClear) throws RegionClearedException { @@ -1191,7 +1176,7 @@ public class DiskStoreImpl implements DiskStore { if (currentOpsInProgress == 0) { synchronized (this.closeRegionGuard) { if (dr.isRegionClosed() && entryOpsCount.get() == 0) { - this.closeRegionGuard.notify(); + this.closeRegionGuard.notifyAll(); } } } @@ -1237,7 +1222,6 @@ public class DiskStoreImpl implements DiskStore { /** * Get serialized form of data off the disk * - * @param id * @since GemFire 5.7 */ public Object getSerializedData(DiskRegion dr, DiskId id) { @@ -1269,7 +1253,7 @@ public class DiskStoreImpl implements DiskStore { DiskEntry entry = ade.de; DiskEntry.Helper.handleFullAsyncQueue(entry, region, tag); } - } catch (RegionDestroyedException ex) { + } catch (RegionDestroyedException ignore) { // Normally we flush before closing or destroying a region // but in some cases it is closed w/o flushing. // So just ignore it; see bug 41305. @@ -1397,8 +1381,7 @@ public class DiskStoreImpl implements DiskStore { private int fillDrainList() { synchronized (this.drainSync) { this.drainList = new ArrayList(asyncQueue.size()); - int drainCount = asyncQueue.drainTo(this.drainList); - return drainCount; + return asyncQueue.drainTo(this.drainList); } } @@ -1410,8 +1393,6 @@ public class DiskStoreImpl implements DiskStore { * To fix bug 41770 clear the list in a way that will not break a concurrent iterator that is not * synced on drainSync. Only clear from it entries on the given region. Currently we do this by * clearing the isPendingAsync bit on each entry in this list. - * - * @param rvv */ void clearDrainList(LocalRegion r, RegionVersionVector rvv) { synchronized (this.drainSync) { @@ -1516,7 +1497,7 @@ public class DiskStoreImpl implements DiskStore { try { this.flusherThread.join(waitMs); return true; - } catch (InterruptedException ie) { + } catch (InterruptedException ignore) { Thread.currentThread().interrupt(); } return false; @@ -1532,7 +1513,7 @@ public class DiskStoreImpl implements DiskStore { } } - public GemFireCacheImpl getCache() { + public InternalCache getCache() { return this.cache; } @@ -1759,7 +1740,7 @@ public class DiskStoreImpl implements DiskStore { } } } // else - } catch (RegionDestroyedException ex) { + } catch (RegionDestroyedException ignore) { // Normally we flush before closing or destroying a region // but in some cases it is closed w/o flushing. // So just ignore it; see bug 41305. @@ -2050,18 +2031,8 @@ public class DiskStoreImpl implements DiskStore { return this.directories[this.infoFileDirIndex]; } - /** For Testing * */ - // void addToOplogSet(int oplogID, File opFile, DirectoryHolder dirHolder) { - // Oplog oplog = new Oplog(oplogID, this); - // oplog.addRecoveredFile(opFile, dirHolder); - // // @todo check callers to see if they need drf support - // this.oplogSet.add(oplog); - // } - - /** For Testing * */ /** * returns the size of the biggest directory available to the region - * */ public long getMaxDirSize() { return maxDirSize; @@ -2143,8 +2114,6 @@ public class DiskStoreImpl implements DiskStore { /** * Removes anything found in the async queue for the given region - * - * @param rvv */ private void clearAsyncQueue(LocalRegion region, boolean needsWriteLock, RegionVersionVector rvv) { @@ -2263,7 +2232,7 @@ public class DiskStoreImpl implements DiskStore { if (diskException.get() != null) { try { _testHandleDiskAccessException.await(); - } catch (InterruptedException e) { + } catch (InterruptedException ignore) { Thread.currentThread().interrupt(); } } @@ -2466,25 +2435,26 @@ public class DiskStoreImpl implements DiskStore { dr.setRegionClosed(true); } gotLock = true; - } catch (CancelException e) { + } catch (CancelException ignore) { synchronized (this.closeRegionGuard) { if (!dr.isRegionClosed()) { if (!closeDataOnly) { dr.setRegionClosed(true); } - // Asif: I am quite sure that it should also be Ok if instead + // I am quite sure that it should also be Ok if instead // while it is a If Check below. Because if acquireReadLock // thread - // has acquired thelock, it is bound to see the isRegionClose as + // has acquired the lock, it is bound to see the isRegionClose as // true - // and so will realse teh lock causing decrement to zeo , before + // and so will release the lock causing decrement to zero , before // releasing the closeRegionGuard. But still...not to take any // chance while (this.entryOpsCount.get() > 0) { try { + // TODO: calling wait while holding two locks this.closeRegionGuard.wait(20000); - } catch (InterruptedException ie) { + } catch (InterruptedException ignored) { // Exit without closing the region, do not know what else // can be done Thread.currentThread().interrupt(); @@ -2534,8 +2504,6 @@ public class DiskStoreImpl implements DiskStore { /** * stops the compactor outside the write lock. Once stopped then it proceeds to destroy the * current & old oplogs - * - * @param dr */ void beginDestroyRegion(LocalRegion region, DiskRegion dr) { if (dr.isBackup()) { @@ -2571,7 +2539,7 @@ public class DiskStoreImpl implements DiskStore { while (this.backgroundTasks.get() > 0) { try { this.backgroundTasks.wait(500L); - } catch (InterruptedException ex) { + } catch (InterruptedException ignore) { interrupted = true; } } @@ -2720,7 +2688,7 @@ public class DiskStoreImpl implements DiskStore { return null; } - return l.toArray(new CompactableOplog[0]); + return l.toArray(new CompactableOplog[l.size()]); } /** @@ -2745,7 +2713,6 @@ public class DiskStoreImpl implements DiskStore { * @param baselineCopyMap this will be populated with baseline oplogs Files that will be used in * the restore script. * @return an array of Oplogs to be copied for an incremental backup. - * @throws IOException */ private Oplog[] filterBaselineOplogs(BackupInspector baselineInspector, Map<File, File> baselineCopyMap) throws IOException { @@ -2796,11 +2763,9 @@ public class DiskStoreImpl implements DiskStore { } // Convert the filtered oplog list to an array - return oplogList.toArray(new Oplog[] {}); + return oplogList.toArray(new Oplog[oplogList.size()]); } - - /** * Get all of the oplogs */ @@ -3013,7 +2978,7 @@ public class DiskStoreImpl implements DiskStore { while (this.scheduled) { try { wait(); - } catch (InterruptedException ex) { + } catch (InterruptedException ignore) { Thread.currentThread().interrupt(); } } @@ -3114,30 +3079,13 @@ public class DiskStoreImpl implements DiskStore { if (dr.isRegionClosed()) { return; } - // // Stop the compactor if running, without taking lock. - // if (this.oplogCompactor != null) { - // try { - // this.oplogCompactor.stopCompactor(); - // } - // catch (CancelException ignore) { - // // Asif:To fix Bug 39380 , ignore the cache closed exception here. - // // allow it to call super .close so that it would be able to close - // the - // // oplogs - // // Though I do not think this exception will be thrown by - // // the stopCompactor. Still not taking chance and ignoring it - - // } - // } - // // if (!isSync()) { - // stopAsyncFlusher(true); // do this before writeLock - // // } + boolean gotLock = false; try { try { acquireWriteLock(dr); gotLock = true; - } catch (CancelException e) { + } catch (CancelException ignore) { // see workaround below. } @@ -3163,8 +3111,9 @@ public class DiskStoreImpl implements DiskStore { } boolean interrupted = Thread.interrupted(); try { + // TODO: calling wait while holding two locks this.closeRegionGuard.wait(1000); - } catch (InterruptedException ie) { + } catch (InterruptedException ignore) { interrupted = true; } finally { if (interrupted) { @@ -3175,7 +3124,7 @@ public class DiskStoreImpl implements DiskStore { if (this.entryOpsCount.get() > 0) { logger.warn(LocalizedMessage.create( LocalizedStrings.DisKRegion_OUTSTANDING_OPS_REMAIN_AFTER_0_SECONDS_FOR_DISK_REGION_1, - new Object[] {Integer.valueOf(loopCount), dr.getName()})); + new Object[] {loopCount, dr.getName()})); for (;;) { if (this.entryOpsCount.get() == 0) { @@ -3183,8 +3132,9 @@ public class DiskStoreImpl implements DiskStore { } boolean interrupted = Thread.interrupted(); try { + // TODO: calling wait while holding two locks this.closeRegionGuard.wait(1000); - } catch (InterruptedException ie) { + } catch (InterruptedException ignore) { interrupted = true; } finally { if (interrupted) { @@ -3233,7 +3183,7 @@ public class DiskStoreImpl implements DiskStore { dr.resetRVV(); dr.setRVVTrusted(false); dr.writeRVV(null, null); // just persist the empty rvv with trust=false - } catch (RegionDestroyedException rde) { + } catch (RegionDestroyedException ignore) { // ignore a RegionDestroyedException at this stage } if (this.initFile != null && dr.isBackup()) { @@ -4111,11 +4061,6 @@ public class DiskStoreImpl implements DiskStore { * Start the backup process. This is the second step of the backup process. In this method, we * define the data we're backing up by copying the init file and rolling to the next file. After * this method returns operations can proceed as normal, except that we don't remove oplogs. - * - * @param targetDir - * @param baselineInspector - * @param restoreScript - * @throws IOException */ public void startBackup(File targetDir, BackupInspector baselineInspector, RestoreScript restoreScript) throws IOException { @@ -4130,7 +4075,7 @@ public class DiskStoreImpl implements DiskStore { } // Get an appropriate lock object for each set of oplogs. - Object childLock = childOplog.lock;; + Object childLock = childOplog.lock; // TODO - We really should move this lock into the disk store, but // until then we need to do this magic to make sure we're actually @@ -4201,9 +4146,6 @@ public class DiskStoreImpl implements DiskStore { /** * Copy the oplogs to the backup directory. This is the final step of the backup process. The * oplogs we copy are defined in the startBackup method. - * - * @param backupManager - * @throws IOException */ public void finishBackup(BackupManager backupManager) throws IOException { if (diskStoreBackup == null) { @@ -4312,17 +4254,17 @@ public class DiskStoreImpl implements DiskStore { props.setProperty(CACHE_XML_FILE, ""); DistributedSystem ds = DistributedSystem.connect(props); offlineDS = ds; - Cache c = org.apache.geode.cache.CacheFactory.create(ds); - offlineCache = c; - org.apache.geode.cache.DiskStoreFactory dsf = c.createDiskStoreFactory(); + InternalCache cache = (InternalCache) CacheFactory.create(ds); + offlineCache = cache; + DiskStoreFactory dsf = cache.createDiskStoreFactory(); dsf.setDiskDirs(dsDirs); if (offlineCompacting && maxOplogSize != -1L) { dsf.setMaxOplogSize(maxOplogSize); } - DiskStoreImpl dsi = new DiskStoreImpl(c, dsName, + DiskStoreImpl dsi = new DiskStoreImpl(cache, dsName, ((DiskStoreFactoryImpl) dsf).getDiskStoreAttributes(), false, null, true, upgradeVersionOnly, offlineValidate, offlineCompacting, needsOplogs, offlineModify); - ((GemFireCacheImpl) c).addDiskStore(dsi); + cache.addDiskStore(dsi); return dsi; } @@ -4536,7 +4478,7 @@ public class DiskStoreImpl implements DiskStore { while (!isClosing() && currentAsyncValueRecoveryMap.containsKey(diskRegion.getId())) { try { currentAsyncValueRecoveryMap.wait(); - } catch (InterruptedException e) { + } catch (InterruptedException ignore) { interrupted = true; } } @@ -4591,9 +4533,9 @@ public class DiskStoreImpl implements DiskStore { if (lastWriteTask != null) { try { lastWriteTask.get(); - } catch (InterruptedException e) { + } catch (InterruptedException ignore) { Thread.currentThread().interrupt(); - } catch (Exception e) { + } catch (Exception ignore) { // do nothing, an exception from the write task was already logged. } } @@ -4684,7 +4626,7 @@ public class DiskStoreImpl implements DiskStore { delayedWritePool.shutdown(); try { delayedWritePool.awaitTermination(1, TimeUnit.SECONDS); - } catch (InterruptedException e) { + } catch (InterruptedException ignore) { Thread.currentThread().interrupt(); } } http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreMonitor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreMonitor.java index 551f733..ac72361 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreMonitor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreMonitor.java @@ -14,6 +14,19 @@ */ package org.apache.geode.internal.cache; +import java.io.File; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import org.apache.logging.log4j.Logger; + import org.apache.geode.cache.DiskAccessException; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.distributed.internal.InternalDistributedSystem; @@ -22,25 +35,16 @@ import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.LoggingThreadGroup; import org.apache.geode.internal.logging.log4j.LocalizedMessage; import org.apache.geode.internal.logging.log4j.LogMarker; -import org.apache.logging.log4j.Logger; - -import java.io.File; -import java.util.HashSet; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.*; public class DiskStoreMonitor { private static final Logger logger = LogService.getLogger(); private static final boolean DISABLE_MONITOR = Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "DISK_USAGE_DISABLE_MONITORING"); - // private static final boolean AUTO_RECONNECT = - // Boolean.getBoolean("gemfire.DISK_USAGE_ENABLE_AUTO_RECONNECT"); private static final int USAGE_CHECK_INTERVAL = Integer .getInteger(DistributionConfig.GEMFIRE_PREFIX + "DISK_USAGE_POLLING_INTERVAL_MILLIS", 10000); + private static final float LOG_WARNING_THRESHOLD_PCT = Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "DISK_USAGE_LOG_WARNING_PERCENT", 99); @@ -67,7 +71,7 @@ public class DiskStoreMonitor { if (val < 0 || val > 100) { throw new IllegalArgumentException( LocalizedStrings.DiskWriteAttributesFactory_DISK_USAGE_WARNING_INVALID_0 - .toLocalizedString(Float.valueOf(val))); + .toLocalizedString(val)); } } @@ -80,17 +84,15 @@ public class DiskStoreMonitor { if (val < 0 || val > 100) { throw new IllegalArgumentException( LocalizedStrings.DiskWriteAttributesFactory_DISK_USAGE_CRITICAL_INVALID_0 - .toLocalizedString(Float.valueOf(val))); + .toLocalizedString(val)); } } private final ScheduledExecutorService exec; private final Map<DiskStoreImpl, Set<DirectoryHolderUsage>> disks; - private final LogUsage logDisk; - // // this is set when we go into auto_reconnect mode - // private volatile DirectoryHolderUsage criticalDisk; + private final LogUsage logDisk; volatile DiskStateAction _testAction; @@ -209,9 +211,9 @@ public class DiskStoreMonitor { private File getLogDir() { File log = null; - GemFireCacheImpl gci = GemFireCacheImpl.getInstance(); - if (gci != null) { - InternalDistributedSystem ds = gci.getInternalDistributedSystem(); + InternalCache internalCache = GemFireCacheImpl.getInstance(); + if (internalCache != null) { + InternalDistributedSystem ds = internalCache.getInternalDistributedSystem(); if (ds != null) { DistributionConfig conf = ds.getConfig(); if (conf != null) { @@ -230,7 +232,7 @@ public class DiskStoreMonitor { return log; } - abstract class DiskUsage { + abstract static class DiskUsage { private DiskState state; DiskUsage() { @@ -305,7 +307,7 @@ public class DiskStoreMonitor { protected abstract void handleStateChange(DiskState next, String pct); } - class LogUsage extends DiskUsage { + static class LogUsage extends DiskUsage { private final File dir; public LogUsage(File dir) { @@ -382,41 +384,12 @@ public class DiskStoreMonitor { logger.error(LogMarker.DISK_STORE_MONITOR, LocalizedMessage.create(LocalizedStrings.DiskStoreMonitor_DISK_CRITICAL, args)); - try { - // // prepare for restart - // if (AUTO_RECONNECT) { - // disk.getCache().saveCacheXmlForReconnect(); - // criticalDisk = this; - // } - } finally { - // pull the plug - disk.handleDiskAccessException(new DiskAccessException(msg, disk)); - } + // TODO: this is weird... + disk.handleDiskAccessException(new DiskAccessException(msg, disk)); break; } } - // private void performReconnect(String msg) { - // try { - // // don't try to reconnect before the cache is closed - // disk._testHandleDiskAccessException.await(); - // - // // now reconnect, clear out the var first so a close can interrupt the - // // reconnect - // criticalDisk = null; - // boolean restart = disk.getCache().getDistributedSystem().tryReconnect(true, msg, - // disk.getCache()); - // if (LogMarker.DISK_STORE_MONITOR || logger.isDebugEnabled()) { - // String pre = restart ? "Successfully" : "Unsuccessfully"; - // logger.info(LocalizedStrings.DEBUG, pre + " attempted to restart cache"); - // } - // } catch (InterruptedException e) { - // Thread.currentThread().interrupt(); - // } finally { - // close(); - // } - // } - @Override protected File dir() { return dir.getDir(); http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java index 36ad9ce..e22e1d9 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXCommitMessage.java @@ -48,12 +48,10 @@ import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.log4j.LogMarker; -/** - * - */ public class DistTXCommitMessage extends TXMessage { private static final Logger logger = LogService.getLogger(); + protected ArrayList<ArrayList<DistTxThinEntryState>> entryStateList = null; /** for deserialization */ @@ -75,7 +73,7 @@ public class DistTXCommitMessage extends TXMessage { logger.debug("DistTXCommitMessage.operateOnTx: Tx {}", txId); } - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + InternalCache cache = GemFireCacheImpl.getInstance(); TXManagerImpl txMgr = cache.getTXMgr(); final TXStateProxy txStateProxy = txMgr.getTXState(); TXCommitMessage cmsg = null; @@ -256,7 +254,7 @@ public class DistTXCommitMessage extends TXMessage { @Override public String toString() { - StringBuffer sb = new StringBuffer(); + StringBuilder sb = new StringBuilder(); sb.append("DistTXCommitPhaseTwoReplyMessage ").append("processorid=").append(this.processorId) .append(" reply to sender ").append(this.getSender()); return sb.toString(); @@ -339,7 +337,7 @@ public class DistTXCommitMessage extends TXMessage { (DistTxCommitExceptionCollectingException) this.exception; return cce.getCacheClosedMembers(); } else { - return Collections.EMPTY_SET; + return Collections.emptySet(); } } @@ -349,7 +347,7 @@ public class DistTXCommitMessage extends TXMessage { (DistTxCommitExceptionCollectingException) this.exception; return cce.getRegionDestroyedMembers(regionFullPath); } else { - return Collections.EMPTY_SET; + return Collections.emptySet(); } } @@ -387,14 +385,12 @@ public class DistTXCommitMessage extends TXMessage { /** * Determine if the commit processing was incomplete, if so throw a detailed exception * indicating the source of the problem - * - * @param msgMap */ public void handlePotentialCommitFailure( HashMap<DistributedMember, DistTXCoordinatorInterface> msgMap) { if (fatalExceptions.size() > 0) { - StringBuffer errorMessage = new StringBuffer("Incomplete commit of transaction ").append(id) - .append(". Caused by the following exceptions: "); + StringBuilder errorMessage = new StringBuilder("Incomplete commit of transaction ") + .append(id).append(". Caused by the following exceptions: "); for (Iterator i = fatalExceptions.entrySet().iterator(); i.hasNext();) { Map.Entry me = (Map.Entry) i.next(); DistributedMember mem = (DistributedMember) me.getKey(); @@ -428,16 +424,13 @@ public class DistTXCommitMessage extends TXMessage { public Set getRegionDestroyedMembers(String regionFullPath) { Set members = (Set) this.regionExceptions.get(regionFullPath); if (members == null) { - members = Collections.EMPTY_SET; + members = Collections.emptySet(); } return members; } /** * Protected by (this) - * - * @param member - * @param exceptions */ public void addExceptionsFromMember(InternalDistributedMember member, Set exceptions) { for (Iterator iter = exceptions.iterator(); iter.hasNext();) { http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXPrecommitMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXPrecommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXPrecommitMessage.java index ffbc3ba..0ab2cc3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXPrecommitMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXPrecommitMessage.java @@ -54,7 +54,7 @@ import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.log4j.LogMarker; /** - * + * */ public final class DistTXPrecommitMessage extends TXMessage { @@ -107,7 +107,7 @@ public final class DistTXPrecommitMessage extends TXMessage { /* * Perform precommit - * + * * [DISTTX] Handle different exceptions here */ txMgr.precommit(); @@ -202,7 +202,7 @@ public final class DistTXPrecommitMessage extends TXMessage { * Return the value from the get operation, serialize it bytes as late as possible to avoid * making un-neccesary byte[] copies. De-serialize those same bytes as late as possible to avoid * using precious threads (aka P2P readers). - * + * * @param recipient the origin VM that performed the get * @param processorId the processor on which the origin thread is waiting * @param val the raw value that will eventually be serialized @@ -218,7 +218,7 @@ public final class DistTXPrecommitMessage extends TXMessage { /** * Processes this message. This method is invoked by the receiver of the message. - * + * * @param dm the distribution manager that is processing the message. */ @Override @@ -272,9 +272,9 @@ public final class DistTXPrecommitMessage extends TXMessage { /** * Reply processor which collects all CommitReplyExceptions for Dist Tx and emits a detailed * failure exception if problems occur - * + * * @see TXCommitMessage.CommitReplyProcessor - * + * * [DISTTX] TODO see if need ReliableReplyProcessor21? departed members? */ public static final class DistTxPrecommitReplyProcessor extends ReplyProcessor21 { @@ -361,7 +361,7 @@ public final class DistTXPrecommitMessage extends TXMessage { /** * An Exception that collects many remote CommitExceptions - * + * * @see TXCommitMessage.CommitExceptionCollectingException */ public static class DistTxPrecommitExceptionCollectingException extends ReplyException { @@ -388,7 +388,7 @@ public final class DistTXPrecommitMessage extends TXMessage { /** * Determine if the commit processing was incomplete, if so throw a detailed exception * indicating the source of the problem - * + * * @param msgMap */ public void handlePotentialCommitFailure( @@ -436,7 +436,7 @@ public final class DistTXPrecommitMessage extends TXMessage { /** * Protected by (this) - * + * * @param member * @param exceptions */ http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXRollbackMessage.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXRollbackMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXRollbackMessage.java index bfe302a..d4f5943 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXRollbackMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXRollbackMessage.java @@ -75,7 +75,7 @@ public final class DistTXRollbackMessage extends TXMessage { logger.debug("Dist TX: Rollback: {}", txId); } - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + InternalCache cache = GemFireCacheImpl.getInstance(); TXManagerImpl txMgr = cache.getTXMgr(); final TXStateProxy txState = txMgr.getTXState(); boolean rollbackSuccessful = false; @@ -87,10 +87,6 @@ public final class DistTXRollbackMessage extends TXMessage { "DistTXRollbackMessage.operateOnTx: found a previously committed transaction:{}", txId); } - // TXCommitMessage cmsg = txMgr.getRecentlyCompletedMessage(txId); - // if (txMgr.isExceptionToken(cmsg)) { - // throw txMgr.getExceptionForToken(cmsg, txId); - // } } else if (txState != null) { // [DISTTX] TODO - Handle scenarios of no txState // if no TXState was created (e.g. due to only getEntry/size operations @@ -219,7 +215,7 @@ public final class DistTXRollbackMessage extends TXMessage { @Override public String toString() { - StringBuffer sb = new StringBuffer(); + StringBuilder sb = new StringBuilder(); sb.append("DistTXRollbackReplyMessage ").append("processorid=").append(this.processorId) .append(" reply to sender ").append(this.getSender()); return sb.toString(); @@ -232,7 +228,6 @@ public final class DistTXRollbackMessage extends TXMessage { /** * A processor to capture the value returned by {@link DistTXRollbackReplyMessage} - * */ public static class DistTXRollbackResponse extends RemoteOperationResponse { private volatile Boolean rollbackState; @@ -275,9 +270,6 @@ public final class DistTXRollbackMessage extends TXMessage { final String msg = "DistTXRollbackResponse got RemoteOperationException; rethrowing"; logger.debug(msg, e); throw e; - } catch (TransactionDataNotColocatedException e) { - // Throw this up to user! - throw e; } return rollbackState; } @@ -354,7 +346,7 @@ public final class DistTXRollbackMessage extends TXMessage { (DistTxRollbackExceptionCollectingException) this.exception; return cce.getCacheClosedMembers(); } else { - return Collections.EMPTY_SET; + return Collections.emptySet(); } } @@ -364,7 +356,7 @@ public final class DistTXRollbackMessage extends TXMessage { (DistTxRollbackExceptionCollectingException) this.exception; return cce.getRegionDestroyedMembers(regionFullPath); } else { - return Collections.EMPTY_SET; + return Collections.emptySet(); } } @@ -402,14 +394,12 @@ public final class DistTXRollbackMessage extends TXMessage { /** * Determine if the commit processing was incomplete, if so throw a detailed exception * indicating the source of the problem - * - * @param msgMap */ public void handlePotentialCommitFailure( HashMap<DistributedMember, DistTXCoordinatorInterface> msgMap) { if (fatalExceptions.size() > 0) { - StringBuffer errorMessage = new StringBuffer("Incomplete commit of transaction ").append(id) - .append(". Caused by the following exceptions: "); + StringBuilder errorMessage = new StringBuilder("Incomplete commit of transaction ") + .append(id).append(". Caused by the following exceptions: "); for (Iterator i = fatalExceptions.entrySet().iterator(); i.hasNext();) { Map.Entry me = (Map.Entry) i.next(); DistributedMember mem = (DistributedMember) me.getKey(); @@ -443,16 +433,13 @@ public final class DistTXRollbackMessage extends TXMessage { public Set getRegionDestroyedMembers(String regionFullPath) { Set members = (Set) this.regionExceptions.get(regionFullPath); if (members == null) { - members = Collections.EMPTY_SET; + members = Collections.emptySet(); } return members; } /** * Protected by (this) - * - * @param member - * @param exceptions */ public void addExceptionsFromMember(InternalDistributedMember member, Set exceptions) { for (Iterator iter = exceptions.iterator(); iter.hasNext();) {