DonalEvans commented on a change in pull request #6179:
URL: https://github.com/apache/geode/pull/6179#discussion_r600886136
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelper.java
##########
@@ -295,79 +99,36 @@ static int decodeAddressToDataSize(long addr) {
* @throws UnsupportedOperationException if the address has compressed data
Review comment:
This method and the method on the delegate do not throw an
`UnsupportedOperationException`, but rather an `AssertionError`.
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelperInstance.java
##########
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.offheap;
+
+import java.util.function.Function;
+
+import org.apache.geode.annotations.Immutable;
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.internal.cache.CachedDeserializableFactory;
+import org.apache.geode.internal.cache.DiskId;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.RegionEntryContext;
+import org.apache.geode.internal.cache.Token;
+import org.apache.geode.internal.cache.entries.DiskEntry;
+import org.apache.geode.internal.cache.entries.OffHeapRegionEntry;
+import org.apache.geode.internal.offheap.annotations.Released;
+import org.apache.geode.internal.offheap.annotations.Retained;
+import org.apache.geode.internal.offheap.annotations.Unretained;
+import org.apache.geode.internal.serialization.DSCODE;
+
+/**
+ * The class just has static methods that operate on instances of {@link
OffHeapRegionEntry}. It
+ * allows common code to be shared for all the classes we have that implement
+ * {@link OffHeapRegionEntry}.
+ *
+ * @since Geode 1.0
+ */
+class OffHeapRegionEntryHelperInstance {
+
+ static final long NULL_ADDRESS = 0L << 1;
+ static final long INVALID_ADDRESS = 1L << 1;
+ static final long LOCAL_INVALID_ADDRESS = 2L << 1;
+ static final long DESTROYED_ADDRESS = 3L << 1;
+ static final long REMOVED_PHASE1_ADDRESS = 4L << 1;
+ static final long REMOVED_PHASE2_ADDRESS = 5L << 1;
+ static final long END_OF_STREAM_ADDRESS = 6L << 1;
+ static final long NOT_AVAILABLE_ADDRESS = 7L << 1;
+ static final long TOMBSTONE_ADDRESS = 8L << 1;
+ static final int MAX_LENGTH_FOR_DATA_AS_ADDRESS = 8;
+
+ private final Function<Long, OffHeapStoredObject> offHeapStoredObjectFactory;
+ private final ReferenceCounterInstance referenceCounter;
+
+ OffHeapRegionEntryHelperInstance() {
+ this(OffHeapStoredObject::new, new ReferenceCounterInstance());
+ }
+
+ @VisibleForTesting
+ OffHeapRegionEntryHelperInstance(Function<Long, OffHeapStoredObject>
offHeapStoredObjectFactory,
+ ReferenceCounterInstance referenceCounter) {
+ this.offHeapStoredObjectFactory = offHeapStoredObjectFactory;
+ this.referenceCounter = referenceCounter;
+ }
+
+ /**
+ * This method may release the object stored at ohAddress if the result
needs to be decompressed
+ * and the decompress parameter is true. This decompressed result will be on
the heap.
+ *
+ * @param ohAddress OFF_HEAP_ADDRESS
+ * @param decompress true if off-heap value should be decompressed before
returning
+ * @param context used for decompression
+ * @return OFF_HEAP_OBJECT (sometimes)
+ */
+ @Unretained
+ @Retained
+ Object addressToObject(@Released @Retained long ohAddress, boolean
decompress,
+ RegionEntryContext context) {
+ if (isOffHeap(ohAddress)) {
+ @Unretained
+ OffHeapStoredObject chunk = offHeapStoredObjectFactory.apply(ohAddress);
+ @Unretained
+ Object result = chunk;
+ if (decompress && chunk.isCompressed()) {
+ try {
+ // to fix bug 47982 need to:
+ byte[] decompressedBytes = chunk.getDecompressedBytes(context);
+ if (chunk.isSerialized()) {
+ // return a VMCachedDeserializable with the decompressed
serialized bytes since chunk is
+ // serialized
+ result = CachedDeserializableFactory.create(decompressedBytes,
context.getCache());
+ } else {
+ // return a byte[] since chunk is not serialized
+ result = decompressedBytes;
+ }
+ } finally {
+ // decompress is only true when this method is called by
_getValueRetain.
+ // In that case the caller has already retained ohAddress because it
thought
+ // we would return it. But we have unwrapped it and are returning
the decompressed
+ // results.
+ // So we need to release the chunk here.
+ chunk.release();
+ }
+ }
+ return result;
+ } else if ((ohAddress & ENCODED_BIT) != 0) {
+ TinyStoredObject daa = new TinyStoredObject(ohAddress);
+ Object result = daa;
+ if (decompress && daa.isCompressed()) {
+ byte[] decompressedBytes = daa.getDecompressedBytes(context);
+ if (daa.isSerialized()) {
+ // return a VMCachedDeserializable with the decompressed serialized
bytes since daa is
+ // serialized
+ result = CachedDeserializableFactory.create(decompressedBytes,
context.getCache());
+ } else {
+ // return a byte[] since daa is not serialized
+ result = decompressedBytes;
+ }
+ }
+ return result;
+ } else {
+ return TokenAddress.addrToObj[(int) ohAddress >> 1];
+ }
+ }
+
+ int getSerializedLength(TinyStoredObject dataAsAddress) {
+ final long ohAddress = dataAsAddress.getAddress();
+
+ if ((ohAddress & ENCODED_BIT) != 0) {
+ boolean isLong = (ohAddress & LONG_BIT) != 0;
+ if (isLong) {
+ return 9;
+ } else {
+ return (int) ((ohAddress & SIZE_MASK) >> SIZE_SHIFT);
+ }
+ } else {
+ return 0;
+ }
+ }
+
+ /*
+ * This method is optimized for cases where if the caller wants to convert
address to a Token
+ * compared to addressToObject which would deserialize the value.
+ */
+ private Token addressToToken(long ohAddress) {
+ if (isOffHeap(ohAddress) || (ohAddress & ENCODED_BIT) != 0) {
+ return Token.NOT_A_TOKEN;
+ } else {
+ return TokenAddress.addrToObj[(int) ohAddress >> 1];
+ }
+ }
+
+ private void releaseAddress(@Released long ohAddress) {
+ if (isOffHeap(ohAddress)) {
+ referenceCounter.release(ohAddress);
+ }
+ }
+
+ /**
+ * The address in 're' will be @Released.
+ */
+ public void releaseEntry(@Released OffHeapRegionEntry re) {
+ if (re instanceof DiskEntry) {
+ DiskId did = ((DiskEntry) re).getDiskId();
+ if (did != null && did.isPendingAsync()) {
+ synchronized (did) {
+ // This may not be needed so remove this call if it causes problems.
+ // We no longer need this entry to be written to disk so unschedule
it
+ // before we change its value to REMOVED_PHASE2.
+ did.setPendingAsync(false);
+ setValue(re, Token.REMOVED_PHASE2);
+ return;
+ }
+ }
+ }
+ setValue(re, Token.REMOVED_PHASE2);
+ }
+
+ public void releaseEntry(@Unretained OffHeapRegionEntry re,
+ @Released StoredObject expectedValue) {
+ long oldAddress = TokenAddress.objectToAddress(expectedValue);
+ final long newAddress = TokenAddress.objectToAddress(Token.REMOVED_PHASE2);
+ if (re.setAddress(oldAddress, newAddress)) {
+ releaseAddress(oldAddress);
+ } /*
+ * else { if (!calledSetValue || re.getAddress() != newAddress) {
expectedValue.release(); } }
+ */
+ }
+
+ /**
+ * This bit is set to indicate that this address has data encoded in it.
+ */
+ private static final long ENCODED_BIT = 1L;
+ /**
+ * This bit is set to indicate that the encoded data is serialized.
+ */
+ static final long SERIALIZED_BIT = 2L;
+ /**
+ * This bit is set to indicate that the encoded data is compressed.
+ */
+ static final long COMPRESSED_BIT = 4L;
+ /**
+ * This bit is set to indicate that the encoded data is a long whose value
fits in 7 bytes.
+ */
+ private static final long LONG_BIT = 8L;
+ /**
+ * size is in the range 0..7 so we only need 3 bits.
+ */
+ private static final long SIZE_MASK = 0x70L;
+ /**
+ * number of bits to shift the size by.
+ */
+ private static final int SIZE_SHIFT = 4;
+ // the msb of this byte is currently unused
+
+ /**
+ * Returns 0 if the data could not be encoded as an address.
+ */
+ long encodeDataAsAddress(byte[] v, boolean isSerialized, boolean
isCompressed) {
+ if (v.length < MAX_LENGTH_FOR_DATA_AS_ADDRESS) {
+ long result = 0L;
+ for (int i = 0; i < v.length; i++) {
+ result |= v[i] & 0x00ff;
+ result <<= 8;
+ }
+ result |= (v.length << SIZE_SHIFT) | ENCODED_BIT;
+ if (isSerialized) {
+ result |= SERIALIZED_BIT;
+ }
+ if (isCompressed) {
+ result |= COMPRESSED_BIT;
+ }
+ return result;
+ } else if (isSerialized && !isCompressed) {
+ // Check for some special types that take more than 7 bytes to serialize
+ // but that might be able to be inlined with less than 8 bytes.
+ if (v[0] == DSCODE.LONG.toByte()) {
+ // A long is currently always serialized as 8 bytes (9 if you include
the dscode).
+ // But many long values will actually be small enough for is to encode
in 7 bytes.
+ if ((v[1] == 0 && (v[2] & 0x80) == 0) || (v[1] == -1 && (v[2] & 0x80)
!= 0)) {
+ // The long can be encoded as 7 bytes since the most signification
byte
+ // is simply an extension of the sign byte on the second most
signification byte.
+ long result = 0L;
+ for (int i = 2; i < v.length; i++) {
+ result |= v[i] & 0x00ff;
+ result <<= 8;
+ }
+ result |= (7 << SIZE_SHIFT) | LONG_BIT | SERIALIZED_BIT |
ENCODED_BIT;
+ return result;
+ }
+ }
+ }
+ return 0L;
+ }
+
+ Object decodeAddressToObject(long ohAddress) {
+ byte[] bytes = decodeUncompressedAddressToBytes(ohAddress);
+
+ boolean isSerialized = (ohAddress & SERIALIZED_BIT) != 0;
+ if (isSerialized) {
+ return EntryEventImpl.deserialize(bytes);
+ } else {
+ return bytes;
+ }
+ }
+
+ int decodeAddressToDataSize(long addr) {
+ if ((addr & ENCODED_BIT) == 0) {
+ throw new AssertionError("Invalid address: " + addr);
+ }
+ boolean isLong = (addr & LONG_BIT) != 0;
+ if (isLong) {
+ return 9;
+ }
+ return (int) ((addr & SIZE_MASK) >> SIZE_SHIFT);
+ }
+
+ /**
+ * Returns the bytes encoded in the given address. Note that compressed
addresses are not
+ * supported by this method.
+ *
+ * @throws UnsupportedOperationException if the address has compressed data
+ */
+ byte[] decodeUncompressedAddressToBytes(long addr) {
+ if ((addr & COMPRESSED_BIT) != 0) {
+ throw new AssertionError("Did not expect encoded address to be
compressed");
+ }
+ return decodeAddressToRawBytes(addr);
+ }
+
+ /**
+ * Returns the "raw" bytes that have been encoded in the given address. Note
that if address is
+ * compressed then the raw bytes are the compressed bytes.
+ */
+ byte[] decodeAddressToRawBytes(long addr) {
+ if ((addr & ENCODED_BIT) == 0) {
+ throw new AssertionError("Invalid address: " + addr);
+ }
+ int size = (int) ((addr & SIZE_MASK) >> SIZE_SHIFT);
+ boolean isLong = (addr & LONG_BIT) != 0;
+ byte[] bytes;
+ if (isLong) {
+ bytes = new byte[9];
+ bytes[0] = DSCODE.LONG.toByte();
+ for (int i = 8; i >= 2; i--) {
+ addr >>= 8;
+ bytes[i] = (byte) (addr & 0x00ff);
+ }
+ if ((bytes[2] & 0x80) != 0) {
+ bytes[1] = -1;
+ } else {
+ bytes[1] = 0;
+ }
+ } else {
+ bytes = new byte[size];
+ for (int i = size - 1; i >= 0; i--) {
+ addr >>= 8;
+ bytes[i] = (byte) (addr & 0x00ff);
+ }
+ }
+ return bytes;
+ }
+
+ /**
+ * The previous value at the address in 're' will be @Released and then the
address in 're' will
+ * be set to the @Unretained address of 'v'.
+ */
+ public void setValue(@Released OffHeapRegionEntry re, @Unretained Object v) {
+ // setValue is called when synced so I don't need to worry
+ // about oldAddress being released by someone else.
+ final long newAddress = TokenAddress.objectToAddress(v);
+ long oldAddress;
+ do {
+ oldAddress = re.getAddress();
+ } while (!re.setAddress(oldAddress, newAddress));
+ ReferenceCountHelper.setReferenceCountOwner(re);
+ releaseAddress(oldAddress);
+ ReferenceCountHelper.setReferenceCountOwner(null);
+ }
+
+ public Token getValueAsToken(@Unretained OffHeapRegionEntry re) {
+ return addressToToken(re.getAddress());
+ }
+
+ @Unretained
+ public Object _getValue(@Unretained OffHeapRegionEntry re) {
+ return addressToObject(re.getAddress(), false, null); // no context needed
so decompress is
+ // false
+ }
+
+ public boolean isOffHeap(long addr) {
+ if ((addr & ENCODED_BIT) != 0)
+ return false;
+ if (addr < 0)
+ return true;
+ addr >>= 1; // shift right 1 to convert to array index;
+ return addr >= TokenAddress.addrToObj.length;
+ }
+
+ /**
+ * If the value stored at the location held in 're' is returned, then it
will be Retained. If the
+ * value returned is 're' decompressed into another off-heap location, then
're' will be
+ * Unretained but the new, decompressed value will be Retained. Therefore,
whichever is returned
+ * (the value at the address in 're' or the decompressed value) it will have
been Retained.
+ *
+ * @return possible OFF_HEAP_OBJECT (caller must release)
+ */
+ @Retained
+ public Object _getValueRetain(@Retained @Unretained OffHeapRegionEntry re,
+ boolean decompress, RegionEntryContext context) {
+ int retryCount = 0;
+ @Retained
+ long addr = re.getAddress();
+ while (isOffHeap(addr)) {
+ if (referenceCounter.retain(addr)) {
+ @Unretained
+ long addr2 = re.getAddress();
+ if (addr != addr2) {
+ retryCount = 0;
+ referenceCounter.release(addr);
+ // spin around and try again.
+ addr = addr2;
+ } else {
+ return addressToObject(addr, decompress, context);
+ }
+ } else {
+ // spin around and try again
+ long addr2 = re.getAddress();
+ retryCount++;
+ if (retryCount > 100) {
+ throw new IllegalStateException("retain failed addr=" + addr + "
addr2=" + addr
+ + " 100 times" + " history=" +
ReferenceCountHelper.getFreeRefCountInfo(addr));
+ }
+ addr = addr2;
+ // Since retain returned false our region entry should have a different
+ // value in it. However the actual address could be the exact same one
+ // because addr was released, then reallocated from the free list and
set
+ // back into this region entry. See bug 47782
+ }
+ }
+ return addressToObject(addr, decompress, context);
+ }
+
+ public boolean isSerialized(long address) {
+ return (address & SERIALIZED_BIT) != 0;
+ }
+
+ public boolean isCompressed(long address) {
+ return (address & COMPRESSED_BIT) != 0;
+ }
+
+ private static class TokenAddress {
+
+ @Immutable
+ private static final Token[] addrToObj =
+ new Token[] {null, Token.INVALID, Token.LOCAL_INVALID, Token.DESTROYED,
+ Token.REMOVED_PHASE1,
+ Token.REMOVED_PHASE2, Token.END_OF_STREAM, Token.NOT_AVAILABLE,
Token.TOMBSTONE,};
+
+ private static long objectToAddress(@Unretained Object v) {
+ if (v instanceof StoredObject)
Review comment:
While you're refactoring this code, would it be possible to add curly
brackets to these if statements?
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/offheap/ReferenceCountHelper.java
##########
@@ -81,72 +85,72 @@ public static Object createReferenceCountOwner() {
* of this method must also call unskipRefCountTracking after the allocation
or free is done.
*/
public static void skipRefCountTracking() {
- getInstance().skipRefCountTracking();
+ delegate().skipRefCountTracking();
}
/**
* Returns true if currently tracking reference counts.
*/
- public static boolean isRefCountTracking() {
- return getInstance().isRefCountTracking();
+ static boolean isRefCountTracking() {
Review comment:
This method (and several others in this class) are unused. Would it be
appropriate to remove them?
##########
File path:
geode-core/src/test/java/org/apache/geode/internal/offheap/ReferenceCountHelperImplTest.java
##########
@@ -12,976 +12,966 @@
* or implied. See the License for the specific language governing permissions
and limitations under
* the License.
*/
-
package org.apache.geode.internal.offheap;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
-import org.junit.Rule;
+import org.junit.Before;
import org.junit.Test;
-import org.junit.contrib.java.lang.system.SystemOutRule;
-import org.junit.runner.RunWith;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
import org.apache.geode.internal.cache.RegionEntry;
-/*
- * PowerMock used in this test to verify static method
MemoryAllocatorImpl.debugLog
- */
-@RunWith(PowerMockRunner.class)
-@PowerMockIgnore({"*.UnitTest"})
-@PrepareForTest({MemoryAllocatorImpl.class})
public class ReferenceCountHelperImplTest {
- ReferenceCountHelperImpl rchi;
+ private BiConsumer<String, Boolean> debugLogger;
- @Rule
- public SystemOutRule sor = new SystemOutRule();
+ @Before
+ public void setUp() {
+ debugLogger = uncheckedCast(mock(BiConsumer.class));
+ }
@Test
public void doTrackReferenceCountsWithTrackRefsTrueAndTrackFreesTrue() {
- rchi = getTrueTrue();
- assertTrue(rchi.trackReferenceCounts());
+ ReferenceCountHelperImpl referenceCountHelperImpl =
newReferenceCountHelperImpl_TrueTrue();
+ assertThat(referenceCountHelperImpl.trackReferenceCounts()).isTrue();
}
@Test
public void doTrackReferenceCountsWithTrackRefsTrueAndTrackFreesFalse() {
- rchi = getTrueFalse();
- assertTrue(rchi.trackReferenceCounts());
+ ReferenceCountHelperImpl referenceCountHelperImpl =
newReferenceCountHelperImpl_TrueFalse();
+ assertThat(referenceCountHelperImpl.trackReferenceCounts()).isTrue();
}
@Test
public void doTrackReferenceCountsWithTrackRefsFalseAndTrackFreesTrue() {
- rchi = getFalseTrue();
- assertFalse(rchi.trackReferenceCounts());
+ ReferenceCountHelperImpl referenceCountHelperImpl =
newReferenceCountHelperImpl_FalseTrue();
+ assertThat(referenceCountHelperImpl.trackReferenceCounts()).isFalse();
}
@Test
public void doTrackReferenceCountsWithTrackRefsFalseAndTrackFreesFalse() {
- rchi = getFalseFalse();
- assertFalse(rchi.trackReferenceCounts());
+ ReferenceCountHelperImpl referenceCountHelperImpl =
newReferenceCountHelperImpl_FalseFalse();
+ assertThat(referenceCountHelperImpl.trackReferenceCounts()).isFalse();
}
@Test
public void doTrackFreedReferenceCountsWithTrackRefsTrueAndTrackFreesTrue() {
- rchi = getTrueTrue();
- assertTrue(rchi.trackFreedReferenceCounts());
+ ReferenceCountHelperImpl referenceCountHelperImpl =
newReferenceCountHelperImpl_TrueTrue();
+ assertThat(referenceCountHelperImpl.trackFreedReferenceCounts()).isTrue();
}
@Test
public void doTrackFreedReferenceCountsWithTrackRefsTrueAndTrackFreesFalse()
{
- rchi = getTrueFalse();
- assertFalse(rchi.trackFreedReferenceCounts());
+ ReferenceCountHelperImpl referenceCountHelperImpl =
newReferenceCountHelperImpl_TrueFalse();
+ assertThat(referenceCountHelperImpl.trackFreedReferenceCounts()).isFalse();
}
@Test
public void doTrackFreedReferenceCountsWithTrackRefsFalseAndTrackFreesTrue()
{
- rchi = getFalseTrue();
- assertTrue(rchi.trackFreedReferenceCounts());
+ ReferenceCountHelperImpl referenceCountHelperImpl =
newReferenceCountHelperImpl_FalseTrue();
+ assertThat(referenceCountHelperImpl.trackFreedReferenceCounts()).isTrue();
}
@Test
public void
doTrackFreedReferenceCountsWithTrackRefsFalseAndTrackFreesFalse() {
- rchi = getFalseFalse();
- assertFalse(rchi.trackFreedReferenceCounts());
+ ReferenceCountHelperImpl referenceCountHelperImpl =
newReferenceCountHelperImpl_FalseFalse();
+ assertThat(referenceCountHelperImpl.trackFreedReferenceCounts()).isFalse();
}
@Test
public void doSkipRefCountTrackingWithTrackRefsTrueAndTrackFreesTrue() {
- rchi = getTrueTrue();
- Object preOwner = rchi.getReferenceCountOwner();
+ ReferenceCountHelperImpl referenceCountHelperImpl =
newReferenceCountHelperImpl_TrueTrue();
- rchi.skipRefCountTracking();
- Object postOwner = rchi.getReferenceCountOwner();
+ Object preOwner = referenceCountHelperImpl.getReferenceCountOwner();
+ referenceCountHelperImpl.skipRefCountTracking();
+ Object postOwner = referenceCountHelperImpl.getReferenceCountOwner();
- assertTrue(postOwner != preOwner); // skip sets owner to
SKIP_REF_COUNT_TRACKING
-
- assertFalse(rchi.isRefCountTracking());
+ // skip sets owner to SKIP_REF_COUNT_TRACKING
+ assertThat(postOwner).isNotEqualTo(preOwner);
+ assertThat(referenceCountHelperImpl.isRefCountTracking()).isFalse();
Long address = (long) 0x1000;
- boolean decRefCount = false;
- int rc = 1;
- rchi.refCountChanged(address, decRefCount, rc);
- List<RefCountChangeInfo> list = rchi.peekRefCountInfo(address);
- assertEquals(null, list);
+ referenceCountHelperImpl.refCountChanged(address, false, 1);
+ List<RefCountChangeInfo> list =
referenceCountHelperImpl.peekRefCountInfo(address);
+ assertThat(list).isNull();
- rchi.unskipRefCountTracking();
- postOwner = rchi.getReferenceCountOwner();
- assertEquals(postOwner, preOwner);
+ referenceCountHelperImpl.unskipRefCountTracking();
+ postOwner = referenceCountHelperImpl.getReferenceCountOwner();
- assertTrue(rchi.isRefCountTracking());
+ assertThat(preOwner).isEqualTo(postOwner);
+ assertThat(referenceCountHelperImpl.isRefCountTracking()).isTrue();
}
@Test
public void doSkipRefCountTrackingWithTrackRefsFalseAndTrackFreesTrue() {
- rchi = getFalseTrue();
- Object preOwner = rchi.getReferenceCountOwner();
- assertEquals(null, preOwner); // getReferenceCountOwner returns null if
not tracking
+ ReferenceCountHelperImpl referenceCountHelperImpl =
newReferenceCountHelperImpl_FalseTrue();
+ Object preOwner = referenceCountHelperImpl.getReferenceCountOwner();
+ // getReferenceCountOwner returns null if not tracking
+ assertThat(preOwner).isNull();
- rchi.skipRefCountTracking();
- assertFalse(rchi.isRefCountTracking());
+ referenceCountHelperImpl.skipRefCountTracking();
+ assertThat(referenceCountHelperImpl.isRefCountTracking()).isFalse();
- rchi.unskipRefCountTracking();
- assertFalse(rchi.isRefCountTracking()); // system prop not set
+ referenceCountHelperImpl.unskipRefCountTracking();
+ assertThat(referenceCountHelperImpl.isRefCountTracking()).isFalse(); //
system prop not set
}
@Test
public void doSkipRefCountTrackingWithTrackRefsFalseAndTrackFreesFalse() {
- rchi = getFalseFalse();
- Object preOwner = rchi.getReferenceCountOwner();
- assertEquals(null, preOwner); // getReferenceCountOwner returns null if
not tracking
+ ReferenceCountHelperImpl referenceCountHelperImpl =
newReferenceCountHelperImpl_FalseFalse();
+ Object preOwner = referenceCountHelperImpl.getReferenceCountOwner();
+ // getReferenceCountOwner returns null if not tracking
+ assertThat(preOwner).isNull();
- rchi.skipRefCountTracking();
- assertFalse(rchi.isRefCountTracking());
+ referenceCountHelperImpl.skipRefCountTracking();
+ assertThat(referenceCountHelperImpl.isRefCountTracking()).isFalse();
- rchi.unskipRefCountTracking();
- assertFalse(rchi.isRefCountTracking()); // system prop not set
+ referenceCountHelperImpl.unskipRefCountTracking();
+ // system prop not set
+ assertThat(referenceCountHelperImpl.isRefCountTracking()).isFalse();
}
@Test
public void doSkipRefCountTrackingWithTrackRefsTrueAndTrackFreesFalse() {
- rchi = getTrueFalse();
- Object preOwner = rchi.getReferenceCountOwner();
-
- rchi.skipRefCountTracking();
- Object postOwner = rchi.getReferenceCountOwner();
+ ReferenceCountHelperImpl referenceCountHelperImpl =
newReferenceCountHelperImpl_TrueFalse();
- assertTrue(postOwner != preOwner); // skip sets owner to
SKIP_REF_COUNT_TRACKING
+ Object preOwner = referenceCountHelperImpl.getReferenceCountOwner();
+ referenceCountHelperImpl.skipRefCountTracking();
+ Object postOwner = referenceCountHelperImpl.getReferenceCountOwner();
+ // skip sets owner to SKIP_REF_COUNT_TRACKING
+ assertThat(postOwner).isNotEqualTo(preOwner);
+ assertThat(referenceCountHelperImpl.isRefCountTracking()).isFalse();
- assertFalse(rchi.isRefCountTracking());
-
- rchi.unskipRefCountTracking();
- postOwner = rchi.getReferenceCountOwner();
- assertEquals(postOwner, preOwner);
-
- assertTrue(rchi.isRefCountTracking());
+ referenceCountHelperImpl.unskipRefCountTracking();
+ postOwner = referenceCountHelperImpl.getReferenceCountOwner();
+ assertThat(preOwner).isEqualTo(postOwner);
+ assertThat(referenceCountHelperImpl.isRefCountTracking()).isTrue();
}
@Test
public void doSetReferenceCountOwnerWithTrackRefsTrueAndTrackFreesTrue() {
- rchi = getTrueTrue();
+ ReferenceCountHelperImpl referenceCountHelperImpl =
newReferenceCountHelperImpl_TrueTrue();
String owner = null;
- rchi.setReferenceCountOwner(owner);
- AtomicInteger ai = rchi.getReenterCount();
- assertEquals(0, ai.get());
-
- owner = new String("SomeOwner");
- rchi.setReferenceCountOwner(owner);
- ai = rchi.getReenterCount();
- assertEquals(1, ai.get());
- assertEquals(rchi.getReferenceCountOwner(), owner);
-
- String owner2 = new String("SomeOwner2");
- rchi.setReferenceCountOwner(owner2);
- ai = rchi.getReenterCount();
- assertEquals(2, ai.get());
- assertTrue(rchi.getReferenceCountOwner() != owner2); // stays original
owner until cnt = 0
+ referenceCountHelperImpl.setReferenceCountOwner(owner);
+ AtomicInteger reenterCount = referenceCountHelperImpl.getReenterCount();
+ assertThat(reenterCount.get()).isZero();
+
+ owner = "SomeOwner";
+ referenceCountHelperImpl.setReferenceCountOwner(owner);
+ reenterCount = referenceCountHelperImpl.getReenterCount();
+ assertThat(reenterCount.get()).isOne();
+
assertThat(referenceCountHelperImpl.getReferenceCountOwner()).isEqualTo(owner);
+
+ String owner2 = "SomeOwner2";
+ referenceCountHelperImpl.setReferenceCountOwner(owner2);
+ reenterCount = referenceCountHelperImpl.getReenterCount();
+ assertThat(reenterCount.get()).isEqualTo(2);
+ // stays original owner until cnt = 0
+
assertThat(referenceCountHelperImpl.getReferenceCountOwner()).isNotEqualTo(owner2);
String owner3 = null;
- rchi.setReferenceCountOwner(owner3);
- ai = rchi.getReenterCount();
- assertEquals(1, ai.get());
- assertEquals(rchi.getReferenceCountOwner(), owner);
+ referenceCountHelperImpl.setReferenceCountOwner(owner3);
+ reenterCount = referenceCountHelperImpl.getReenterCount();
+ assertThat(reenterCount.get()).isOne();
+
assertThat(referenceCountHelperImpl.getReferenceCountOwner()).isEqualTo(owner);
owner = null;
- rchi.setReferenceCountOwner(owner);
- ai = rchi.getReenterCount();
- assertEquals(0, ai.get());
- assertEquals(rchi.getReferenceCountOwner(), null);
+ referenceCountHelperImpl.setReferenceCountOwner(owner);
+ reenterCount = referenceCountHelperImpl.getReenterCount();
+ assertThat(reenterCount.get()).isZero();
+ assertThat(referenceCountHelperImpl.getReferenceCountOwner()).isNull();
- RegionEntry re = mock(RegionEntry.class);
- rchi.setReferenceCountOwner(re);
- ai = rchi.getReenterCount();
- assertEquals(1, ai.get());
- assertEquals(rchi.getReferenceCountOwner(), re);
+ RegionEntry regionEntry = mock(RegionEntry.class);
+ referenceCountHelperImpl.setReferenceCountOwner(regionEntry);
+ reenterCount = referenceCountHelperImpl.getReenterCount();
+ assertThat(reenterCount.get()).isOne();
+
assertThat(regionEntry).isEqualTo(referenceCountHelperImpl.getReferenceCountOwner());
Long address = (long) 0x1000;
Review comment:
Multiple compiler warnings in this class can be fixed by replacing lines
like this with just `long address = 0x1000;`. Alternately, as the address used
is always the same in every test, it could be extracted to a constant.
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelperInstance.java
##########
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.offheap;
+
+import java.util.function.Function;
+
+import org.apache.geode.annotations.Immutable;
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.internal.cache.CachedDeserializableFactory;
+import org.apache.geode.internal.cache.DiskId;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.RegionEntryContext;
+import org.apache.geode.internal.cache.Token;
+import org.apache.geode.internal.cache.entries.DiskEntry;
+import org.apache.geode.internal.cache.entries.OffHeapRegionEntry;
+import org.apache.geode.internal.offheap.annotations.Released;
+import org.apache.geode.internal.offheap.annotations.Retained;
+import org.apache.geode.internal.offheap.annotations.Unretained;
+import org.apache.geode.internal.serialization.DSCODE;
+
+/**
+ * The class just has static methods that operate on instances of {@link
OffHeapRegionEntry}. It
+ * allows common code to be shared for all the classes we have that implement
+ * {@link OffHeapRegionEntry}.
+ *
+ * @since Geode 1.0
+ */
+class OffHeapRegionEntryHelperInstance {
+
+ static final long NULL_ADDRESS = 0L << 1;
+ static final long INVALID_ADDRESS = 1L << 1;
+ static final long LOCAL_INVALID_ADDRESS = 2L << 1;
+ static final long DESTROYED_ADDRESS = 3L << 1;
+ static final long REMOVED_PHASE1_ADDRESS = 4L << 1;
+ static final long REMOVED_PHASE2_ADDRESS = 5L << 1;
+ static final long END_OF_STREAM_ADDRESS = 6L << 1;
+ static final long NOT_AVAILABLE_ADDRESS = 7L << 1;
+ static final long TOMBSTONE_ADDRESS = 8L << 1;
+ static final int MAX_LENGTH_FOR_DATA_AS_ADDRESS = 8;
+
+ private final Function<Long, OffHeapStoredObject> offHeapStoredObjectFactory;
+ private final ReferenceCounterInstance referenceCounter;
+
+ OffHeapRegionEntryHelperInstance() {
+ this(OffHeapStoredObject::new, new ReferenceCounterInstance());
+ }
+
+ @VisibleForTesting
+ OffHeapRegionEntryHelperInstance(Function<Long, OffHeapStoredObject>
offHeapStoredObjectFactory,
+ ReferenceCounterInstance referenceCounter) {
+ this.offHeapStoredObjectFactory = offHeapStoredObjectFactory;
+ this.referenceCounter = referenceCounter;
+ }
+
+ /**
+ * This method may release the object stored at ohAddress if the result
needs to be decompressed
+ * and the decompress parameter is true. This decompressed result will be on
the heap.
+ *
+ * @param ohAddress OFF_HEAP_ADDRESS
+ * @param decompress true if off-heap value should be decompressed before
returning
+ * @param context used for decompression
+ * @return OFF_HEAP_OBJECT (sometimes)
+ */
+ @Unretained
+ @Retained
+ Object addressToObject(@Released @Retained long ohAddress, boolean
decompress,
+ RegionEntryContext context) {
+ if (isOffHeap(ohAddress)) {
+ @Unretained
+ OffHeapStoredObject chunk = offHeapStoredObjectFactory.apply(ohAddress);
+ @Unretained
+ Object result = chunk;
+ if (decompress && chunk.isCompressed()) {
+ try {
+ // to fix bug 47982 need to:
+ byte[] decompressedBytes = chunk.getDecompressedBytes(context);
+ if (chunk.isSerialized()) {
+ // return a VMCachedDeserializable with the decompressed
serialized bytes since chunk is
+ // serialized
+ result = CachedDeserializableFactory.create(decompressedBytes,
context.getCache());
+ } else {
+ // return a byte[] since chunk is not serialized
+ result = decompressedBytes;
+ }
+ } finally {
+ // decompress is only true when this method is called by
_getValueRetain.
+ // In that case the caller has already retained ohAddress because it
thought
+ // we would return it. But we have unwrapped it and are returning
the decompressed
+ // results.
+ // So we need to release the chunk here.
+ chunk.release();
+ }
+ }
+ return result;
+ } else if ((ohAddress & ENCODED_BIT) != 0) {
+ TinyStoredObject daa = new TinyStoredObject(ohAddress);
+ Object result = daa;
+ if (decompress && daa.isCompressed()) {
+ byte[] decompressedBytes = daa.getDecompressedBytes(context);
+ if (daa.isSerialized()) {
+ // return a VMCachedDeserializable with the decompressed serialized
bytes since daa is
+ // serialized
+ result = CachedDeserializableFactory.create(decompressedBytes,
context.getCache());
+ } else {
+ // return a byte[] since daa is not serialized
+ result = decompressedBytes;
+ }
+ }
+ return result;
+ } else {
+ return TokenAddress.addrToObj[(int) ohAddress >> 1];
+ }
+ }
+
+ int getSerializedLength(TinyStoredObject dataAsAddress) {
+ final long ohAddress = dataAsAddress.getAddress();
+
+ if ((ohAddress & ENCODED_BIT) != 0) {
+ boolean isLong = (ohAddress & LONG_BIT) != 0;
+ if (isLong) {
+ return 9;
+ } else {
+ return (int) ((ohAddress & SIZE_MASK) >> SIZE_SHIFT);
+ }
+ } else {
+ return 0;
+ }
+ }
+
+ /*
+ * This method is optimized for cases where if the caller wants to convert
address to a Token
+ * compared to addressToObject which would deserialize the value.
+ */
+ private Token addressToToken(long ohAddress) {
+ if (isOffHeap(ohAddress) || (ohAddress & ENCODED_BIT) != 0) {
+ return Token.NOT_A_TOKEN;
+ } else {
+ return TokenAddress.addrToObj[(int) ohAddress >> 1];
+ }
+ }
+
+ private void releaseAddress(@Released long ohAddress) {
+ if (isOffHeap(ohAddress)) {
+ referenceCounter.release(ohAddress);
+ }
+ }
+
+ /**
+ * The address in 're' will be @Released.
+ */
+ public void releaseEntry(@Released OffHeapRegionEntry re) {
+ if (re instanceof DiskEntry) {
+ DiskId did = ((DiskEntry) re).getDiskId();
+ if (did != null && did.isPendingAsync()) {
+ synchronized (did) {
+ // This may not be needed so remove this call if it causes problems.
+ // We no longer need this entry to be written to disk so unschedule
it
+ // before we change its value to REMOVED_PHASE2.
+ did.setPendingAsync(false);
+ setValue(re, Token.REMOVED_PHASE2);
+ return;
+ }
+ }
+ }
+ setValue(re, Token.REMOVED_PHASE2);
+ }
+
+ public void releaseEntry(@Unretained OffHeapRegionEntry re,
+ @Released StoredObject expectedValue) {
+ long oldAddress = TokenAddress.objectToAddress(expectedValue);
+ final long newAddress = TokenAddress.objectToAddress(Token.REMOVED_PHASE2);
+ if (re.setAddress(oldAddress, newAddress)) {
+ releaseAddress(oldAddress);
+ } /*
+ * else { if (!calledSetValue || re.getAddress() != newAddress) {
expectedValue.release(); } }
Review comment:
This comment can be removed, I think.
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/offheap/ReferenceCounterInstance.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.offheap;
+
+class ReferenceCounterInstance {
+
+ int getRefCount(long memAddr) {
+ return AddressableMemoryManager.readInt(memAddr +
OffHeapStoredObject.REF_COUNT_OFFSET)
+ & OffHeapStoredObject.REF_COUNT_MASK;
+ }
+
+ boolean retain(long memAddr) {
+ MemoryAllocatorImpl.validateAddress(memAddr);
+ int uc;
+ int rawBits;
+ int retryCount = 0;
+ do {
+ rawBits =
+ AddressableMemoryManager.readIntVolatile(memAddr +
OffHeapStoredObject.REF_COUNT_OFFSET);
+ if ((rawBits & OffHeapStoredObject.MAGIC_MASK) !=
OffHeapStoredObject.MAGIC_NUMBER) {
+ // same as uc == 0
+ // TODO MAGIC_NUMBER rethink its use and interaction with compactor
fragments
+ return false;
+ }
+ uc = rawBits & OffHeapStoredObject.REF_COUNT_MASK;
+ if (uc == OffHeapStoredObject.MAX_REF_COUNT) {
+ throw new IllegalStateException(
+ "Maximum use count exceeded. rawBits=" +
Integer.toHexString(rawBits));
+ } else if (uc == 0) {
+ return false;
+ }
+ retryCount++;
+ if (retryCount > 1000) {
+ throw new IllegalStateException("tried to write " + (rawBits + 1) + "
to @"
+ + Long.toHexString(memAddr) + " 1,000 times.");
+ }
+ } while (!AddressableMemoryManager.writeIntVolatile(
+ memAddr + OffHeapStoredObject.REF_COUNT_OFFSET, rawBits,
+ rawBits + 1));
+ // debugLog("use inced ref count " + (uc+1) + " @" +
Long.toHexString(memAddr), true);
+ if (ReferenceCountHelper.trackReferenceCounts()) {
+ ReferenceCountHelper.refCountChanged(memAddr, false, uc + 1);
+ }
+
+ return true;
+ }
+
+ void release(final long memAddr) {
+ release(memAddr, null);
+ }
+
+ void release(final long memAddr, FreeListManager freeListManager) {
+ MemoryAllocatorImpl.validateAddress(memAddr);
+ int newCount;
+ int rawBits;
+ boolean returnToAllocator;
+ do {
+ returnToAllocator = false;
+ rawBits =
+ AddressableMemoryManager.readIntVolatile(memAddr +
OffHeapStoredObject.REF_COUNT_OFFSET);
+ if ((rawBits & OffHeapStoredObject.MAGIC_MASK) !=
OffHeapStoredObject.MAGIC_NUMBER) {
+ String msg = "It looks like off heap memory @" +
Long.toHexString(memAddr)
+ + " was already freed. rawBits=" + Integer.toHexString(rawBits) +
" history="
+ + ReferenceCountHelper.getFreeRefCountInfo(memAddr);
+ // debugLog(msg, true);
+ throw new IllegalStateException(msg);
+ }
+ int curCount = rawBits & OffHeapStoredObject.REF_COUNT_MASK;
+ if ((curCount) == 0) {
+ // debugLog("too many frees @" + Long.toHexString(memAddr), true);
Review comment:
This commented out code can be removed.
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/offheap/ReferenceCounterInstance.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.offheap;
+
+class ReferenceCounterInstance {
+
+ int getRefCount(long memAddr) {
+ return AddressableMemoryManager.readInt(memAddr +
OffHeapStoredObject.REF_COUNT_OFFSET)
+ & OffHeapStoredObject.REF_COUNT_MASK;
+ }
+
+ boolean retain(long memAddr) {
+ MemoryAllocatorImpl.validateAddress(memAddr);
+ int uc;
+ int rawBits;
+ int retryCount = 0;
+ do {
+ rawBits =
+ AddressableMemoryManager.readIntVolatile(memAddr +
OffHeapStoredObject.REF_COUNT_OFFSET);
+ if ((rawBits & OffHeapStoredObject.MAGIC_MASK) !=
OffHeapStoredObject.MAGIC_NUMBER) {
+ // same as uc == 0
+ // TODO MAGIC_NUMBER rethink its use and interaction with compactor
fragments
+ return false;
+ }
+ uc = rawBits & OffHeapStoredObject.REF_COUNT_MASK;
+ if (uc == OffHeapStoredObject.MAX_REF_COUNT) {
+ throw new IllegalStateException(
+ "Maximum use count exceeded. rawBits=" +
Integer.toHexString(rawBits));
+ } else if (uc == 0) {
+ return false;
+ }
+ retryCount++;
+ if (retryCount > 1000) {
+ throw new IllegalStateException("tried to write " + (rawBits + 1) + "
to @"
+ + Long.toHexString(memAddr) + " 1,000 times.");
+ }
+ } while (!AddressableMemoryManager.writeIntVolatile(
+ memAddr + OffHeapStoredObject.REF_COUNT_OFFSET, rawBits,
+ rawBits + 1));
+ // debugLog("use inced ref count " + (uc+1) + " @" +
Long.toHexString(memAddr), true);
+ if (ReferenceCountHelper.trackReferenceCounts()) {
+ ReferenceCountHelper.refCountChanged(memAddr, false, uc + 1);
+ }
+
+ return true;
+ }
+
+ void release(final long memAddr) {
+ release(memAddr, null);
+ }
+
+ void release(final long memAddr, FreeListManager freeListManager) {
+ MemoryAllocatorImpl.validateAddress(memAddr);
+ int newCount;
+ int rawBits;
+ boolean returnToAllocator;
+ do {
+ returnToAllocator = false;
+ rawBits =
+ AddressableMemoryManager.readIntVolatile(memAddr +
OffHeapStoredObject.REF_COUNT_OFFSET);
+ if ((rawBits & OffHeapStoredObject.MAGIC_MASK) !=
OffHeapStoredObject.MAGIC_NUMBER) {
+ String msg = "It looks like off heap memory @" +
Long.toHexString(memAddr)
+ + " was already freed. rawBits=" + Integer.toHexString(rawBits) +
" history="
+ + ReferenceCountHelper.getFreeRefCountInfo(memAddr);
+ // debugLog(msg, true);
Review comment:
This commented out code can be removed.
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/offheap/ReferenceCounterInstance.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.offheap;
+
+class ReferenceCounterInstance {
+
+ int getRefCount(long memAddr) {
+ return AddressableMemoryManager.readInt(memAddr +
OffHeapStoredObject.REF_COUNT_OFFSET)
+ & OffHeapStoredObject.REF_COUNT_MASK;
+ }
+
+ boolean retain(long memAddr) {
+ MemoryAllocatorImpl.validateAddress(memAddr);
+ int uc;
+ int rawBits;
+ int retryCount = 0;
+ do {
+ rawBits =
+ AddressableMemoryManager.readIntVolatile(memAddr +
OffHeapStoredObject.REF_COUNT_OFFSET);
+ if ((rawBits & OffHeapStoredObject.MAGIC_MASK) !=
OffHeapStoredObject.MAGIC_NUMBER) {
+ // same as uc == 0
+ // TODO MAGIC_NUMBER rethink its use and interaction with compactor
fragments
+ return false;
+ }
+ uc = rawBits & OffHeapStoredObject.REF_COUNT_MASK;
+ if (uc == OffHeapStoredObject.MAX_REF_COUNT) {
+ throw new IllegalStateException(
+ "Maximum use count exceeded. rawBits=" +
Integer.toHexString(rawBits));
+ } else if (uc == 0) {
+ return false;
+ }
+ retryCount++;
+ if (retryCount > 1000) {
+ throw new IllegalStateException("tried to write " + (rawBits + 1) + "
to @"
+ + Long.toHexString(memAddr) + " 1,000 times.");
+ }
+ } while (!AddressableMemoryManager.writeIntVolatile(
+ memAddr + OffHeapStoredObject.REF_COUNT_OFFSET, rawBits,
+ rawBits + 1));
+ // debugLog("use inced ref count " + (uc+1) + " @" +
Long.toHexString(memAddr), true);
Review comment:
This commented out code can be removed.
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/offheap/OffHeapRegionEntryHelperInstance.java
##########
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.offheap;
+
+import java.util.function.Function;
+
+import org.apache.geode.annotations.Immutable;
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.internal.cache.CachedDeserializableFactory;
+import org.apache.geode.internal.cache.DiskId;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.RegionEntryContext;
+import org.apache.geode.internal.cache.Token;
+import org.apache.geode.internal.cache.entries.DiskEntry;
+import org.apache.geode.internal.cache.entries.OffHeapRegionEntry;
+import org.apache.geode.internal.offheap.annotations.Released;
+import org.apache.geode.internal.offheap.annotations.Retained;
+import org.apache.geode.internal.offheap.annotations.Unretained;
+import org.apache.geode.internal.serialization.DSCODE;
+
+/**
+ * The class just has static methods that operate on instances of {@link
OffHeapRegionEntry}. It
+ * allows common code to be shared for all the classes we have that implement
+ * {@link OffHeapRegionEntry}.
+ *
+ * @since Geode 1.0
+ */
+class OffHeapRegionEntryHelperInstance {
+
+ static final long NULL_ADDRESS = 0L << 1;
+ static final long INVALID_ADDRESS = 1L << 1;
+ static final long LOCAL_INVALID_ADDRESS = 2L << 1;
+ static final long DESTROYED_ADDRESS = 3L << 1;
+ static final long REMOVED_PHASE1_ADDRESS = 4L << 1;
+ static final long REMOVED_PHASE2_ADDRESS = 5L << 1;
+ static final long END_OF_STREAM_ADDRESS = 6L << 1;
+ static final long NOT_AVAILABLE_ADDRESS = 7L << 1;
+ static final long TOMBSTONE_ADDRESS = 8L << 1;
+ static final int MAX_LENGTH_FOR_DATA_AS_ADDRESS = 8;
+
+ private final Function<Long, OffHeapStoredObject> offHeapStoredObjectFactory;
+ private final ReferenceCounterInstance referenceCounter;
+
+ OffHeapRegionEntryHelperInstance() {
+ this(OffHeapStoredObject::new, new ReferenceCounterInstance());
+ }
+
+ @VisibleForTesting
+ OffHeapRegionEntryHelperInstance(Function<Long, OffHeapStoredObject>
offHeapStoredObjectFactory,
+ ReferenceCounterInstance referenceCounter) {
+ this.offHeapStoredObjectFactory = offHeapStoredObjectFactory;
+ this.referenceCounter = referenceCounter;
+ }
+
+ /**
+ * This method may release the object stored at ohAddress if the result
needs to be decompressed
+ * and the decompress parameter is true. This decompressed result will be on
the heap.
+ *
+ * @param ohAddress OFF_HEAP_ADDRESS
+ * @param decompress true if off-heap value should be decompressed before
returning
+ * @param context used for decompression
+ * @return OFF_HEAP_OBJECT (sometimes)
+ */
+ @Unretained
+ @Retained
+ Object addressToObject(@Released @Retained long ohAddress, boolean
decompress,
+ RegionEntryContext context) {
+ if (isOffHeap(ohAddress)) {
+ @Unretained
+ OffHeapStoredObject chunk = offHeapStoredObjectFactory.apply(ohAddress);
+ @Unretained
+ Object result = chunk;
+ if (decompress && chunk.isCompressed()) {
+ try {
+ // to fix bug 47982 need to:
+ byte[] decompressedBytes = chunk.getDecompressedBytes(context);
+ if (chunk.isSerialized()) {
+ // return a VMCachedDeserializable with the decompressed
serialized bytes since chunk is
+ // serialized
+ result = CachedDeserializableFactory.create(decompressedBytes,
context.getCache());
+ } else {
+ // return a byte[] since chunk is not serialized
+ result = decompressedBytes;
+ }
+ } finally {
+ // decompress is only true when this method is called by
_getValueRetain.
+ // In that case the caller has already retained ohAddress because it
thought
+ // we would return it. But we have unwrapped it and are returning
the decompressed
+ // results.
+ // So we need to release the chunk here.
+ chunk.release();
+ }
+ }
+ return result;
+ } else if ((ohAddress & ENCODED_BIT) != 0) {
+ TinyStoredObject daa = new TinyStoredObject(ohAddress);
+ Object result = daa;
+ if (decompress && daa.isCompressed()) {
+ byte[] decompressedBytes = daa.getDecompressedBytes(context);
+ if (daa.isSerialized()) {
+ // return a VMCachedDeserializable with the decompressed serialized
bytes since daa is
+ // serialized
+ result = CachedDeserializableFactory.create(decompressedBytes,
context.getCache());
+ } else {
+ // return a byte[] since daa is not serialized
+ result = decompressedBytes;
+ }
+ }
+ return result;
+ } else {
+ return TokenAddress.addrToObj[(int) ohAddress >> 1];
+ }
+ }
+
+ int getSerializedLength(TinyStoredObject dataAsAddress) {
+ final long ohAddress = dataAsAddress.getAddress();
+
+ if ((ohAddress & ENCODED_BIT) != 0) {
+ boolean isLong = (ohAddress & LONG_BIT) != 0;
+ if (isLong) {
+ return 9;
+ } else {
+ return (int) ((ohAddress & SIZE_MASK) >> SIZE_SHIFT);
+ }
+ } else {
+ return 0;
+ }
+ }
+
+ /*
+ * This method is optimized for cases where if the caller wants to convert
address to a Token
+ * compared to addressToObject which would deserialize the value.
+ */
+ private Token addressToToken(long ohAddress) {
+ if (isOffHeap(ohAddress) || (ohAddress & ENCODED_BIT) != 0) {
+ return Token.NOT_A_TOKEN;
+ } else {
+ return TokenAddress.addrToObj[(int) ohAddress >> 1];
+ }
+ }
+
+ private void releaseAddress(@Released long ohAddress) {
+ if (isOffHeap(ohAddress)) {
+ referenceCounter.release(ohAddress);
+ }
+ }
+
+ /**
+ * The address in 're' will be @Released.
+ */
+ public void releaseEntry(@Released OffHeapRegionEntry re) {
+ if (re instanceof DiskEntry) {
+ DiskId did = ((DiskEntry) re).getDiskId();
+ if (did != null && did.isPendingAsync()) {
+ synchronized (did) {
+ // This may not be needed so remove this call if it causes problems.
+ // We no longer need this entry to be written to disk so unschedule
it
+ // before we change its value to REMOVED_PHASE2.
+ did.setPendingAsync(false);
+ setValue(re, Token.REMOVED_PHASE2);
+ return;
+ }
+ }
+ }
+ setValue(re, Token.REMOVED_PHASE2);
+ }
+
+ public void releaseEntry(@Unretained OffHeapRegionEntry re,
+ @Released StoredObject expectedValue) {
+ long oldAddress = TokenAddress.objectToAddress(expectedValue);
+ final long newAddress = TokenAddress.objectToAddress(Token.REMOVED_PHASE2);
+ if (re.setAddress(oldAddress, newAddress)) {
+ releaseAddress(oldAddress);
+ } /*
+ * else { if (!calledSetValue || re.getAddress() != newAddress) {
expectedValue.release(); } }
+ */
+ }
+
+ /**
+ * This bit is set to indicate that this address has data encoded in it.
+ */
+ private static final long ENCODED_BIT = 1L;
+ /**
+ * This bit is set to indicate that the encoded data is serialized.
+ */
+ static final long SERIALIZED_BIT = 2L;
+ /**
+ * This bit is set to indicate that the encoded data is compressed.
+ */
+ static final long COMPRESSED_BIT = 4L;
+ /**
+ * This bit is set to indicate that the encoded data is a long whose value
fits in 7 bytes.
+ */
+ private static final long LONG_BIT = 8L;
+ /**
+ * size is in the range 0..7 so we only need 3 bits.
+ */
+ private static final long SIZE_MASK = 0x70L;
+ /**
+ * number of bits to shift the size by.
+ */
+ private static final int SIZE_SHIFT = 4;
+ // the msb of this byte is currently unused
+
+ /**
+ * Returns 0 if the data could not be encoded as an address.
+ */
+ long encodeDataAsAddress(byte[] v, boolean isSerialized, boolean
isCompressed) {
+ if (v.length < MAX_LENGTH_FOR_DATA_AS_ADDRESS) {
+ long result = 0L;
+ for (int i = 0; i < v.length; i++) {
+ result |= v[i] & 0x00ff;
+ result <<= 8;
+ }
+ result |= (v.length << SIZE_SHIFT) | ENCODED_BIT;
+ if (isSerialized) {
+ result |= SERIALIZED_BIT;
+ }
+ if (isCompressed) {
+ result |= COMPRESSED_BIT;
+ }
+ return result;
+ } else if (isSerialized && !isCompressed) {
+ // Check for some special types that take more than 7 bytes to serialize
+ // but that might be able to be inlined with less than 8 bytes.
+ if (v[0] == DSCODE.LONG.toByte()) {
+ // A long is currently always serialized as 8 bytes (9 if you include
the dscode).
+ // But many long values will actually be small enough for is to encode
in 7 bytes.
+ if ((v[1] == 0 && (v[2] & 0x80) == 0) || (v[1] == -1 && (v[2] & 0x80)
!= 0)) {
+ // The long can be encoded as 7 bytes since the most signification
byte
+ // is simply an extension of the sign byte on the second most
signification byte.
+ long result = 0L;
+ for (int i = 2; i < v.length; i++) {
+ result |= v[i] & 0x00ff;
+ result <<= 8;
+ }
+ result |= (7 << SIZE_SHIFT) | LONG_BIT | SERIALIZED_BIT |
ENCODED_BIT;
+ return result;
+ }
+ }
+ }
+ return 0L;
+ }
+
+ Object decodeAddressToObject(long ohAddress) {
+ byte[] bytes = decodeUncompressedAddressToBytes(ohAddress);
+
+ boolean isSerialized = (ohAddress & SERIALIZED_BIT) != 0;
+ if (isSerialized) {
+ return EntryEventImpl.deserialize(bytes);
+ } else {
+ return bytes;
+ }
+ }
+
+ int decodeAddressToDataSize(long addr) {
+ if ((addr & ENCODED_BIT) == 0) {
+ throw new AssertionError("Invalid address: " + addr);
+ }
+ boolean isLong = (addr & LONG_BIT) != 0;
+ if (isLong) {
+ return 9;
+ }
+ return (int) ((addr & SIZE_MASK) >> SIZE_SHIFT);
+ }
+
+ /**
+ * Returns the bytes encoded in the given address. Note that compressed
addresses are not
+ * supported by this method.
+ *
+ * @throws UnsupportedOperationException if the address has compressed data
+ */
+ byte[] decodeUncompressedAddressToBytes(long addr) {
+ if ((addr & COMPRESSED_BIT) != 0) {
+ throw new AssertionError("Did not expect encoded address to be
compressed");
+ }
+ return decodeAddressToRawBytes(addr);
+ }
+
+ /**
+ * Returns the "raw" bytes that have been encoded in the given address. Note
that if address is
+ * compressed then the raw bytes are the compressed bytes.
+ */
+ byte[] decodeAddressToRawBytes(long addr) {
+ if ((addr & ENCODED_BIT) == 0) {
+ throw new AssertionError("Invalid address: " + addr);
+ }
+ int size = (int) ((addr & SIZE_MASK) >> SIZE_SHIFT);
+ boolean isLong = (addr & LONG_BIT) != 0;
+ byte[] bytes;
+ if (isLong) {
+ bytes = new byte[9];
+ bytes[0] = DSCODE.LONG.toByte();
+ for (int i = 8; i >= 2; i--) {
+ addr >>= 8;
+ bytes[i] = (byte) (addr & 0x00ff);
+ }
+ if ((bytes[2] & 0x80) != 0) {
+ bytes[1] = -1;
+ } else {
+ bytes[1] = 0;
+ }
+ } else {
+ bytes = new byte[size];
+ for (int i = size - 1; i >= 0; i--) {
+ addr >>= 8;
+ bytes[i] = (byte) (addr & 0x00ff);
+ }
+ }
+ return bytes;
+ }
+
+ /**
+ * The previous value at the address in 're' will be @Released and then the
address in 're' will
+ * be set to the @Unretained address of 'v'.
+ */
+ public void setValue(@Released OffHeapRegionEntry re, @Unretained Object v) {
+ // setValue is called when synced so I don't need to worry
+ // about oldAddress being released by someone else.
+ final long newAddress = TokenAddress.objectToAddress(v);
+ long oldAddress;
+ do {
+ oldAddress = re.getAddress();
+ } while (!re.setAddress(oldAddress, newAddress));
+ ReferenceCountHelper.setReferenceCountOwner(re);
+ releaseAddress(oldAddress);
+ ReferenceCountHelper.setReferenceCountOwner(null);
+ }
+
+ public Token getValueAsToken(@Unretained OffHeapRegionEntry re) {
+ return addressToToken(re.getAddress());
+ }
+
+ @Unretained
+ public Object _getValue(@Unretained OffHeapRegionEntry re) {
+ return addressToObject(re.getAddress(), false, null); // no context needed
so decompress is
+ // false
+ }
+
+ public boolean isOffHeap(long addr) {
+ if ((addr & ENCODED_BIT) != 0)
Review comment:
Could these if statements have curly brackets added?
##########
File path:
geode-core/src/main/java/org/apache/geode/internal/offheap/ReferenceCountHelperImpl.java
##########
@@ -278,28 +290,47 @@ void freeRefCountInfo(Long address) {
* Returns the thread local owner
*/
Object getReferenceCountOwner() {
- if (!trackReferenceCounts())
+ if (!trackReferenceCounts()) {
return null;
+ }
return refCountOwner.get();
}
/**
* Returns the thread local count of the number of times ref count has been
updated
*/
AtomicInteger getReenterCount() {
- if (!trackReferenceCounts())
+ if (!trackReferenceCounts()) {
return null;
+ }
return refCountReenterCount.get();
}
/**
* Returns a list of any free operation tracking information. This is used
to describe who did the
* previous free(s) when an extra one ends up being done and fails.
*/
- public List<RefCountChangeInfo> getFreeRefCountInfo(long address) {
- if (!trackReferenceCounts() || !trackFreedReferenceCounts())
+ List<RefCountChangeInfo> getFreeRefCountInfo(long address) {
+ if (!trackReferenceCounts() || !trackFreedReferenceCounts()) {
return null;
+ }
return freedStacktraces.get(address);
}
+ /**
+ * This method is overridden during testing to allow simulation of a
concurrent update occurring
+ * between stacktraces.get and stacktraces.replace
+ */
+ void getReferenceCountInfoTestHook(ConcurrentMap<Long,
List<RefCountChangeInfo>> stacktraces,
+ long address) {
+ // nothing
+ }
+
+ /**
+ * This method is overridden during testing to allow simulation of a race to
be the first to
+ * reference a given address
+ */
+ void refCountChangedTestHook(Long address, boolean decRefCount, int rc) {
Review comment:
The `address` argument in this method is a `Long` but the `address`
argument in `getReferenceCountInfoTestHook()` is a `long`. For consistency,
would it be best for both to be the same type?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]