Fix leak detection strong reference loop using weak reference patch by Ariel Weisberg; reviewed by by Jeremiah Jordan for CASSANDRA-11120
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/15092e63 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/15092e63 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/15092e63 Branch: refs/heads/trunk Commit: 15092e6344a23612cb1793b82d1f80a1cbb1dafa Parents: 0a1cfaa Author: Ariel Weisberg <ariel.weisb...@datastax.com> Authored: Fri Feb 5 11:09:00 2016 -0500 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Mon Feb 8 15:38:19 2016 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../io/sstable/format/SSTableReader.java | 17 +++++---- .../apache/cassandra/utils/concurrent/Ref.java | 15 ++++++-- .../utils/concurrent/RefCountedTest.java | 36 ++++++++++++++++++++ 4 files changed, 60 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/15092e63/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1fbe301..03a8bc8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.4 + * Fix leak detection strong reference loop using weak reference (CASSANDRA-11120) * Configurie BatchlogManager to stop delayed tasks on shutdown (CASSANDRA-11062) * Hadoop integration is incompatible with Cassandra Driver 3.0.0 (CASSANDRA-11001) Merged from 2.2.6 http://git-wip-us.apache.org/repos/asf/cassandra/blob/15092e63/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index 8788766..1618516 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -18,6 +18,7 @@ package org.apache.cassandra.io.sstable.format; import java.io.*; +import java.lang.ref.WeakReference; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.*; @@ -2200,6 +2201,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS */ static final class GlobalTidy implements Tidy { + static WeakReference<ScheduledFuture<?>> NULL = new WeakReference<>(null); // keyed by descriptor, mapping to the shared GlobalTidy for that descriptor static final ConcurrentMap<Descriptor, Ref<GlobalTidy>> lookup = new ConcurrentHashMap<>(); @@ -2209,7 +2211,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS private RestorableMeter readMeter; // the scheduled persistence of the readMeter, that we will cancel once all instances of this logical // sstable have been released - private ScheduledFuture readMeterSyncFuture; + private WeakReference<ScheduledFuture<?>> readMeterSyncFuture = NULL; // shared state managing if the logical sstable has been compacted; this is used in cleanup private volatile Runnable obsoletion; @@ -2228,13 +2230,13 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS if (Schema.isSystemKeyspace(desc.ksname)) { readMeter = null; - readMeterSyncFuture = null; + readMeterSyncFuture = NULL; return; } readMeter = SystemKeyspace.getSSTableReadMeter(desc.ksname, desc.cfname, desc.generation); // sync the average read rate to system.sstable_activity every five minutes, starting one minute from now - readMeterSyncFuture = syncExecutor.scheduleAtFixedRate(new Runnable() + readMeterSyncFuture = new WeakReference<>(syncExecutor.scheduleAtFixedRate(new Runnable() { public void run() { @@ -2244,15 +2246,16 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS SystemKeyspace.persistSSTableReadMeter(desc.ksname, desc.cfname, desc.generation, readMeter); } } - }, 1, 5, TimeUnit.MINUTES); + }, 1, 5, TimeUnit.MINUTES)); } private void stopReadMeterPersistence() { - if (readMeterSyncFuture != null) + ScheduledFuture<?> readMeterSyncFutureLocal = readMeterSyncFuture.get(); + if (readMeterSyncFutureLocal != null) { - readMeterSyncFuture.cancel(true); - readMeterSyncFuture = null; + readMeterSyncFutureLocal.cancel(true); + readMeterSyncFuture = NULL; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/15092e63/src/java/org/apache/cassandra/utils/concurrent/Ref.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/concurrent/Ref.java b/src/java/org/apache/cassandra/utils/concurrent/Ref.java index 25ebde9..02eccbb 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/Ref.java +++ b/src/java/org/apache/cassandra/utils/concurrent/Ref.java @@ -1,7 +1,9 @@ package org.apache.cassandra.utils.concurrent; import java.lang.ref.PhantomReference; +import java.lang.ref.Reference; import java.lang.ref.ReferenceQueue; +import java.lang.ref.WeakReference; import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.util.*; @@ -14,7 +16,6 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.Iterables; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.db.ColumnFamilyStore; @@ -466,7 +467,7 @@ public final class Ref<T> implements RefCounted<T> while (collectionIterator.hasNext() && (nextItem = collectionIterator.next()) == null){} if (nextItem != null) { - if (isMapIterator && nextItem instanceof Map.Entry) + if (isMapIterator & nextItem instanceof Map.Entry) { Map.Entry entry = (Map.Entry)nextItem; mapEntryValue = entry.getValue(); @@ -487,6 +488,13 @@ public final class Ref<T> implements RefCounted<T> Field nextField = nextField(); if (nextField == null) return null; + + //A weak reference isn't strongly reachable + //subclasses of WeakReference contain strong references in their fields, so those need to be traversed + //The weak reference fields are in the common Reference class base so filter those out + if (o instanceof WeakReference & nextField.getDeclaringClass() == Reference.class) + continue; + Object nextObject = nextField.get(o); if (nextObject != null) return Pair.create(nextField.get(o), nextField); @@ -509,6 +517,7 @@ public final class Ref<T> implements RefCounted<T> @VisibleForTesting long iterations = 0; GlobalState visiting; + Set<GlobalState> haveLoops; public void run() { @@ -576,6 +585,8 @@ public final class Ref<T> implements RefCounted<T> } else if (visiting == child) { + if (haveLoops != null) + haveLoops.add(visiting); NoSpamLogger.log(logger, NoSpamLogger.Level.ERROR, rootObject.getClass().getName(), http://git-wip-us.apache.org/repos/asf/cassandra/blob/15092e63/test/unit/org/apache/cassandra/utils/concurrent/RefCountedTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/concurrent/RefCountedTest.java b/test/unit/org/apache/cassandra/utils/concurrent/RefCountedTest.java index 1a1864f..0582ad4 100644 --- a/test/unit/org/apache/cassandra/utils/concurrent/RefCountedTest.java +++ b/test/unit/org/apache/cassandra/utils/concurrent/RefCountedTest.java @@ -23,7 +23,9 @@ import org.junit.Test; import junit.framework.Assert; import java.io.File; +import java.lang.ref.WeakReference; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -369,4 +371,38 @@ public class RefCountedTest //Should iterate over the array touching roughly the same number of objects as entries Assert.assertTrue(visitor.iterations > (entryCount / 2) && visitor.iterations < (entryCount / 2) + fudgeFactor); } + + //Make sure a weak ref is ignored by the visitor looking for strong ref leaks + @Test + public void testWeakRef() throws Exception + { + AtomicReference dontRefMe = new AtomicReference(); + + WeakReference<Object> weakRef = new WeakReference(dontRefMe); + + RefCounted.Tidy tidier = new RefCounted.Tidy() { + WeakReference<Object> ref = weakRef; + + @Override + public void tidy() throws Exception + { + } + + @Override + public String name() + { + return "42"; + } + }; + + Ref<Object> ref = new Ref(dontRefMe, tidier); + dontRefMe.set(ref); + + Visitor visitor = new Visitor(); + visitor.haveLoops = new HashSet<>(); + visitor.run(); + ref.close(); + + Assert.assertTrue(visitor.haveLoops.isEmpty()); + } }