Author: mduerig Date: Tue Oct 30 12:58:57 2018 New Revision: 1845234 URL: http://svn.apache.org/viewvc?rev=1845234&view=rev Log: OAK-7838: oak-run check crashes JVM Do away with the background thread and piggy back purge operations instead
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CommitsTracker.java jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreStats.java jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CommitsTrackerTest.java jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/tooling/CheckInvalidRepositoryTest.java Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CommitsTracker.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CommitsTracker.java?rev=1845234&r1=1845233&r2=1845234&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CommitsTracker.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CommitsTracker.java Tue Oct 30 12:58:57 2018 @@ -19,17 +19,17 @@ package org.apache.jackrabbit.oak.segment; -import static java.util.concurrent.TimeUnit.MINUTES; +import static com.google.common.collect.Maps.newHashMap; +import static com.google.common.collect.Queues.newConcurrentLinkedQueue; -import java.io.Closeable; import java.util.HashMap; import java.util.Map; +import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.stream.Stream; import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; -import org.apache.jackrabbit.oak.segment.file.Scheduler; /** * A simple tracker for the source of commits (writes) in @@ -42,26 +42,28 @@ import org.apache.jackrabbit.oak.segment * * This class delegates thread-safety to its underlying state variables. */ -class CommitsTracker implements Closeable { - private final boolean collectStackTraces; +class CommitsTracker { private final String[] threadGroups; + private final int otherWritersLimit; + private final boolean collectStackTraces; private final ConcurrentMap<String, String> queuedWritersMap; - private final ConcurrentMap<String, Long> commitsCountPerThreadGroup; - private final ConcurrentMap<String, Long> commitsCountOtherThreads; - private final ConcurrentMap<String, Long> commitsCountPerThreadGroupLastMinute; - private final Scheduler commitsTrackerScheduler = new Scheduler("CommitsTracker background tasks"); + private final Queue<Commit> commits = newConcurrentLinkedQueue(); + + private static final class Commit { + final long timeStamp; + final String thread; + + Commit(long timeStamp, String thread) { + this.timeStamp = timeStamp; + this.thread = thread; + } + } CommitsTracker(String[] threadGroups, int otherWritersLimit, boolean collectStackTraces) { this.threadGroups = threadGroups; + this.otherWritersLimit = otherWritersLimit; this.collectStackTraces = collectStackTraces; - this.commitsCountPerThreadGroup = new ConcurrentHashMap<>(); - this.commitsCountPerThreadGroupLastMinute = new ConcurrentHashMap<>(); - this.commitsCountOtherThreads = new ConcurrentLinkedHashMap.Builder<String, Long>() - .maximumWeightedCapacity(otherWritersLimit).build(); this.queuedWritersMap = new ConcurrentHashMap<>(); - - commitsTrackerScheduler.scheduleWithFixedDelay("TarMK commits tracker stats resetter", 1, MINUTES, - this::resetStatistics); } public void trackQueuedCommitOf(Thread t) { @@ -79,23 +81,23 @@ class CommitsTracker implements Closeabl queuedWritersMap.remove(t.getName()); } - public void trackExecutedCommitOf(Thread t) { - String group = findGroupFor(t); - - if (group.equals("other")) { - commitsCountOtherThreads.compute(t.getName(), (w, v) -> v == null ? 1 : v + 1); - } + public void trackExecutedCommitOf(Thread thread) { + long t = System.currentTimeMillis(); + commits.removeIf(c -> c.timeStamp < t - 60000); + commits.offer(new Commit(t, thread.getName())); + } - commitsCountPerThreadGroup.compute(group, (w, v) -> v == null ? 1 : v + 1); + public Map<String, String> getQueuedWritersMap() { + return new HashMap<>(queuedWritersMap); } - private String findGroupFor(Thread t) { + private String findGroupFor(String thread) { if (threadGroups == null) { return "other"; } - + for (String group : threadGroups) { - if (t.getName().matches(group)) { + if (thread.matches(group)) { return group; } } @@ -103,31 +105,32 @@ class CommitsTracker implements Closeabl return "other"; } - private void resetStatistics() { - commitsCountPerThreadGroupLastMinute.clear(); - commitsCountPerThreadGroupLastMinute.putAll(commitsCountPerThreadGroup); - commitsCountPerThreadGroup.clear(); - commitsCountOtherThreads.clear(); - } - - @Override - public void close() { - commitsTrackerScheduler.close(); - } - - public Map<String, String> getQueuedWritersMap() { - return new HashMap<>(queuedWritersMap); - } - public Map<String, Long> getCommitsCountPerGroupLastMinute() { - return new HashMap<>(commitsCountPerThreadGroupLastMinute); + Map<String, Long> commitsPerGroup = newHashMap(); + long t = System.currentTimeMillis() - 60000; + for (Commit commit : commits) { + if (commit.timeStamp > t) { + String group = findGroupFor(commit.thread); + if (!"other".equals(group)) { + commitsPerGroup.compute(group, (w, v) -> v == null ? 1 : v + 1); + } + } + } + return commitsPerGroup; } public Map<String, Long> getCommitsCountOthers() { - return new HashMap<>(commitsCountOtherThreads); - } - - Map<String, Long> getCommitsCountPerGroup() { - return new HashMap<>(commitsCountPerThreadGroup); + Map<String, Long> commitsOther = new ConcurrentLinkedHashMap.Builder<String, Long>() + .maximumWeightedCapacity(otherWritersLimit).build(); + long t = System.currentTimeMillis() - 60000; + for (Commit commit : commits) { + if (commit.timeStamp > t) { + String group = findGroupFor(commit.thread); + if ("other".equals(group)) { + commitsOther.compute(commit.thread, (w, v) -> v == null ? 1 : v + 1); + } + } + } + return commitsOther; } } Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreStats.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreStats.java?rev=1845234&r1=1845233&r2=1845234&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreStats.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreStats.java Tue Oct 30 12:58:57 2018 @@ -181,7 +181,6 @@ public class SegmentNodeStoreStats imple @Override public void setCollectStackTraces(boolean flag) { this.collectStackTraces = flag; - commitsTracker.close(); commitsTracker = new CommitsTracker(writerGroups, otherWritersLimit, collectStackTraces); } @@ -198,7 +197,6 @@ public class SegmentNodeStoreStats imple @Override public void setNumberOfOtherWritersToDetail(int otherWritersLimit) { this.otherWritersLimit = otherWritersLimit; - commitsTracker.close(); commitsTracker = new CommitsTracker(writerGroups, otherWritersLimit, collectStackTraces); } @@ -210,7 +208,6 @@ public class SegmentNodeStoreStats imple @Override public void setWriterGroupsForLastMinuteCounts(String[] writerGroups) { this.writerGroups = writerGroups; - commitsTracker.close(); commitsTracker = new CommitsTracker(writerGroups, otherWritersLimit, collectStackTraces); } Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CommitsTrackerTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CommitsTrackerTest.java?rev=1845234&r1=1845233&r2=1845234&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CommitsTrackerTest.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CommitsTrackerTest.java Tue Oct 30 12:58:57 2018 @@ -96,7 +96,6 @@ public class CommitsTrackerTest { queuedWritersMap = commitsTracker.getQueuedWritersMap(); assertEquals(0, queuedWritersMap.size()); } finally { - commitsTracker.close(); new ExecutorCloser(executorService).close(); } } @@ -122,7 +121,7 @@ public class CommitsTrackerTest { latch.await(); - Map<String, Long> commitsCountPerGroup = commitsTracker.getCommitsCountPerGroup(); + Map<String, Long> commitsCountPerGroup = commitsTracker.getCommitsCountPerGroupLastMinute(); assertEquals(3, commitsCountPerGroup.size()); for (String group : groups) { @@ -131,7 +130,6 @@ public class CommitsTrackerTest { assertEquals(10, (long) groupCount); } } finally { - commitsTracker.close(); new ExecutorCloser(executorService).close(); } } Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/tooling/CheckInvalidRepositoryTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/tooling/CheckInvalidRepositoryTest.java?rev=1845234&r1=1845233&r2=1845234&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/tooling/CheckInvalidRepositoryTest.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/tooling/CheckInvalidRepositoryTest.java Tue Oct 30 12:58:57 2018 @@ -34,7 +34,6 @@ import org.apache.jackrabbit.oak.segment import org.apache.jackrabbit.oak.segment.file.tar.LocalJournalFile; import org.apache.jackrabbit.oak.segment.tool.Check; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; /** @@ -218,7 +217,6 @@ public class CheckInvalidRepositoryTest "Error while traversing /b: java.lang.IllegalArgumentException: Segment reference out of bounds")); } - @Ignore("OAK-7838") @Test public void testLargeJournal() throws IOException { StringWriter strOut = new StringWriter();