looks good now. thanks for the fix.

regards
 marcel

On 30/10/14 10:12, "Amit Jain" <[email protected]> wrote:

>Hi,
>
>>>Would it be an option to use the ScheduledExecutorService from
>org.apache.jackrabbit.oak.Oak?
>
>In revision 1635436, I removed the custom executor service and used
>whiteboard instance to schedule in org.apache.jackrabbit.oak.Oak.
>
>Thanks
>Amit
>
>On Wed, Oct 29, 2014 at 10:10 PM, Marcel Reutegger <[email protected]>
>wrote:
>
>> Hi,
>>
>> I think this commit causes OOME when the build is run with the node
>>store
>> fixtures for MongoDB and RDB.
>>
>> The heap dump shows many DocumentNodeStore instances, which cannot be
>> garbage collected because the ScheduledExecutorService instance
>>introduce
>> with this change is still running and references the store.
>>
>> Would it be an option to use the ScheduledExecutorService from
>> org.apache.jackrabbit.oak.Oak?
>>
>> Regards
>>  Marcel
>>
>> On 29/10/14 07:17, "[email protected]" <[email protected]> wrote:
>>
>> >Author: amitj
>> >Date: Wed Oct 29 06:17:00 2014
>> >New Revision: 1635058
>> >
>> >URL: http://svn.apache.org/r1635058
>> >Log:
>> >OAK-2230: Execution Stats for async indexing
>> >Added resettable consolidated execution stats
>> >TimeSeries for trends
>> >
>> >Added:
>> >
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/st
>>>at
>> >s/TimeSeriesStatsUtil.java   (with props)
>> >Modified:
>> >
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/ap
>>>i/
>> >jmx/IndexStatsMBean.java
>> >
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/pl
>>>ug
>> >ins/index/AsyncIndexUpdate.java
>> >
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/st
>>>at
>> >s/RepositoryStats.java
>> >
>> >Modified:
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/ap
>>>i/
>> >jmx/IndexStatsMBean.java
>> >URL:
>> >
>> 
>>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/
>>o
>> 
>>>rg/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java?rev=1635058&r1=163
>>>50
>> >57&r2=1635058&view=diff
>> 
>>>========================================================================
>>>==
>> >====
>> >---
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/ap
>>>i/
>> >jmx/IndexStatsMBean.java (original)
>> >+++
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/ap
>>>i/
>> >jmx/IndexStatsMBean.java Wed Oct 29 06:17:00 2014
>> >@@ -17,6 +17,8 @@
>> >
>> > package org.apache.jackrabbit.oak.api.jmx;
>> >
>> >+import javax.management.openmbean.CompositeData;
>> >+
>> > public interface IndexStatsMBean {
>> >
>> >     String TYPE = "IndexStats";
>> >@@ -114,4 +116,28 @@ public interface IndexStatsMBean {
>> >      */
>> >     String getTemporaryCheckpoints();
>> >
>> >+    /**
>> >+     * Returns the number of executions as a {@link
>> >org.apache.jackrabbit.api.stats.TimeSeries}.
>> >+     *
>> >+     * @return the execution count time series
>> >+     */
>> >+    CompositeData getExecutionCount();
>> >+
>> >+    /**
>> >+     * Returns the execution time as a {@link
>> >org.apache.jackrabbit.api.stats.TimeSeries}.
>> >+     *
>> >+     * @return the execution times time series
>> >+     */
>> >+    CompositeData getExecutionTime();
>> >+
>> >+    /**
>> >+     * Returns the consolidated execution stats since last reset
>> >+     * @return consolidated execution stats
>> >+     */
>> >+    CompositeData getConsolidatedExecutionStats();
>> >+
>> >+    /**
>> >+     * Resets the consolidated stats.
>> >+     */
>> >+    void resetConsolidatedExecutionStats();
>> > }
>> >
>> >Modified:
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/pl
>>>ug
>> >ins/index/AsyncIndexUpdate.java
>> >URL:
>> >
>> 
>>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/
>>o
>> 
>>>rg/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1635058
>>>&r
>> >1=1635057&r2=1635058&view=diff
>> 
>>>========================================================================
>>>==
>> >====
>> >---
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/pl
>>>ug
>> >ins/index/AsyncIndexUpdate.java (original)
>> >+++
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/pl
>>>ug
>> >ins/index/AsyncIndexUpdate.java Wed Oct 29 06:17:00 2014
>> >@@ -28,10 +28,22 @@ import static org.apache.jackrabbit.oak.
>> > import java.util.Calendar;
>> > import java.util.HashSet;
>> > import java.util.Set;
>> >+import java.util.concurrent.Executors;
>> >+import java.util.concurrent.ScheduledExecutorService;
>> >+import java.util.concurrent.ThreadFactory;
>> > import java.util.concurrent.TimeUnit;
>> >+import java.util.concurrent.atomic.AtomicInteger;
>> >+import java.util.concurrent.atomic.AtomicLong;
>> >
>> > import javax.annotation.Nonnull;
>> >+import javax.management.openmbean.CompositeData;
>> >+import javax.management.openmbean.CompositeDataSupport;
>> >+import javax.management.openmbean.CompositeType;
>> >+import javax.management.openmbean.OpenDataException;
>> >+import javax.management.openmbean.OpenType;
>> >+import javax.management.openmbean.SimpleType;
>> >
>> >+import com.google.common.base.Stopwatch;
>> > import org.apache.jackrabbit.oak.api.CommitFailedException;
>> > import org.apache.jackrabbit.oak.api.PropertyState;
>> > import org.apache.jackrabbit.oak.api.Type;
>> >@@ -51,6 +63,8 @@ import org.apache.jackrabbit.oak.spi.sta
>> > import org.apache.jackrabbit.oak.spi.state.NodeState;
>> > import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
>> > import org.apache.jackrabbit.oak.spi.state.NodeStore;
>> >+import org.apache.jackrabbit.oak.stats.TimeSeriesStatsUtil;
>> >+import org.apache.jackrabbit.stats.TimeSeriesRecorder;
>> > import org.apache.jackrabbit.util.ISO8601;
>> > import org.slf4j.Logger;
>> > import org.slf4j.LoggerFactory;
>> >@@ -380,6 +394,7 @@ public class AsyncIndexUpdate implements
>> >                     reindexedDefinitions.clear();
>> >                 }
>> >                 postAsyncRunStatsStatus(indexStats);
>> >+                indexStats.resetConsolidatedExecutionStats();
>> >             }
>> >             mergeWithConcurrencyCheck(builder, beforeCheckpoint,
>> >callback.lease);
>> >         } finally {
>> >@@ -443,17 +458,29 @@ public class AsyncIndexUpdate implements
>> >
>> >         private volatile boolean isPaused;
>> >         private volatile long updates;
>> >+        private Stopwatch watch = Stopwatch.createUnstarted();
>> >+        private ExecutionStats execStats = new ExecutionStats();
>> >
>> >         public void start(String now) {
>> >             status = STATUS_RUNNING;
>> >             start = now;
>> >             done = "";
>> >+
>> >+            if (watch.isRunning()) {
>> >+                watch.reset();
>> >+            }
>> >+            watch.start();
>> >         }
>> >
>> >         public void done(String now) {
>> >             status = STATUS_DONE;
>> >-            start = "";
>> >             done = now;
>> >+            if (watch.isRunning()) {
>> >+                watch.stop();
>> >+            }
>> >+            execStats.incrementCounter();
>> >+
>> >execStats.recordExecution(watch.elapsed(TimeUnit.MILLISECONDS),
>>updates);
>> >+            watch.reset();
>> >         }
>> >
>> >         @Override
>> >@@ -534,6 +561,26 @@ public class AsyncIndexUpdate implements
>> >         }
>> >
>> >         @Override
>> >+        public CompositeData getExecutionCount() {
>> >+            return execStats.getExecutionCount();
>> >+        }
>> >+
>> >+        @Override
>> >+        public CompositeData getExecutionTime() {
>> >+            return execStats.getExecutionTime();
>> >+        }
>> >+
>> >+        @Override
>> >+        public CompositeData getConsolidatedExecutionStats() {
>> >+            return execStats.getConsolidatedStats();
>> >+        }
>> >+
>> >+        @Override
>> >+        public void resetConsolidatedExecutionStats() {
>> >+            execStats.resetConsolidatedStats();
>> >+        }
>> >+
>> >+        @Override
>> >         public String toString() {
>> >             return "AsyncIndexStats [start=" + start + ", done=" +
>>done
>> >                     + ", status=" + status + ", paused=" + isPaused
>> >@@ -541,6 +588,86 @@ public class AsyncIndexUpdate implements
>> >                     + referenceCp + ", processedCheckpoint=" +
>> >processedCp
>> >                     + " ,tempCheckpoints=" + tempCps + " ]";
>> >         }
>> >+
>> >+        class ExecutionStats {
>> >+            final ScheduledExecutorService executor;
>> >+            final TimeSeriesRecorder execCounter;
>> >+            final TimeSeriesRecorder execTimer;
>> >+
>> >+            /**
>> >+             * Captures consolidated execution stats since last reset
>> >+             */
>> >+            final AtomicLong consolidatedExecTime = new AtomicLong();
>> >+            final AtomicInteger consolidatedExecRuns = new
>> >AtomicInteger();
>> >+            final AtomicLong consolidatedNodes = new AtomicLong();
>> >+            final String[] names = {"Executions", "Execution Time",
>> >"Nodes"};
>> >+            CompositeType consolidatedType;
>> >+
>> >+            ExecutionStats() {
>> >+                executor =
>> >Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
>> >+                    @Override
>> >+                    public Thread newThread(@Nonnull Runnable r) {
>> >+                        Thread thread = new Thread(r,
>> >"AsyncIndexStats-ExecutionsStats");
>> >+                        thread.setDaemon(true);
>> >+                        return thread;
>> >+                    }
>> >+                });
>> >+                execCounter = new TimeSeriesRecorder(true);
>> >+                execTimer = new TimeSeriesRecorder(true);
>> >+
>> >+                executor.scheduleAtFixedRate(new Runnable() {
>> >+                    public void run() {
>> >+                        execCounter.recordOneSecond();
>> >+                        execTimer.recordOneSecond();
>> >+                    }
>> >+                }, 1, 1, TimeUnit.SECONDS);
>> >+
>> >+                try {
>> >+                    consolidatedType = new
>> >CompositeType("ConsolidatedStats",
>> >+                        "Consolidated stats", names,
>> >+                        names,
>> >+                        new OpenType[] {SimpleType.LONG,
>> >SimpleType.LONG, SimpleType.LONG});
>> >+                } catch (OpenDataException e) {
>> >+                    log.warn("Error in creating CompositeType for
>> >consolidated stats", e);
>> >+                }
>> >+            }
>> >+
>> >+            void incrementCounter() {
>> >+                execCounter.getCounter().incrementAndGet();
>> >+                consolidatedExecRuns.incrementAndGet();
>> >+            }
>> >+
>> >+            void recordExecution(long time, long updates) {
>> >+                execTimer.getCounter().addAndGet(time);
>> >+                consolidatedExecTime.addAndGet(time);
>> >+                consolidatedNodes.addAndGet(updates);
>> >+            }
>> >+
>> >+            public CompositeData getExecutionCount() {
>> >+                return
>>TimeSeriesStatsUtil.asCompositeData(execCounter,
>> >"ExecutionCount");
>> >+            }
>> >+
>> >+            public CompositeData getExecutionTime() {
>> >+                return TimeSeriesStatsUtil.asCompositeData(execTimer,
>> >"ExecutionTime");
>> >+            }
>> >+
>> >+            public CompositeData getConsolidatedStats() {
>> >+                try {
>> >+                    Long[] values = new
>> >Long[]{consolidatedExecRuns.longValue(),
>> >+                        consolidatedExecTime.longValue(),
>> >consolidatedNodes.longValue()};
>> >+                    return new CompositeDataSupport(consolidatedType,
>> >names, values);
>> >+                } catch (Exception e) {
>> >+                    log.error("Error retrieving consolidated stats",
>>e);
>> >+                    return null;
>> >+                }
>> >+            }
>> >+
>> >+            public void resetConsolidatedStats() {
>> >+                consolidatedExecRuns.set(0);
>> >+                consolidatedExecTime.set(0);
>> >+                consolidatedNodes.set(0);
>> >+            }
>> >+        }
>> >     }
>> >
>> >     /**
>> >
>> >Modified:
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/st
>>>at
>> >s/RepositoryStats.java
>> >URL:
>> >
>> 
>>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/
>>o
>> 
>>>rg/apache/jackrabbit/oak/stats/RepositoryStats.java?rev=1635058&r1=16350
>>>57
>> >&r2=1635058&view=diff
>> 
>>>========================================================================
>>>==
>> >====
>> >---
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/st
>>>at
>> >s/RepositoryStats.java (original)
>> >+++
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/st
>>>at
>> >s/RepositoryStats.java Wed Oct 29 06:17:00 2014
>> >@@ -35,13 +35,7 @@ import static org.apache.jackrabbit.api.
>> > import static
>> 
>>>org.apache.jackrabbit.api.stats.RepositoryStatistics.Type.SESSION_WRITE_
>>>CO
>> >UNTER;
>> > import static
>> 
>>>org.apache.jackrabbit.api.stats.RepositoryStatistics.Type.SESSION_WRITE_
>>>DU
>> >RATION;
>> >
>> >-import javax.management.openmbean.ArrayType;
>> > import javax.management.openmbean.CompositeData;
>> >-import javax.management.openmbean.CompositeDataSupport;
>> >-import javax.management.openmbean.CompositeType;
>> >-import javax.management.openmbean.OpenDataException;
>> >-import javax.management.openmbean.OpenType;
>> >-import javax.management.openmbean.SimpleType;
>> >
>> > import org.apache.jackrabbit.api.stats.RepositoryStatistics;
>> > import org.apache.jackrabbit.api.stats.TimeSeries;
>> >@@ -132,39 +126,16 @@ public class RepositoryStats implements
>> >
>> >     @Override
>> >     public CompositeData getObservationQueueMaxLength() {
>> >-        return asCompositeData(maxQueueLength, "maximal length of
>> >observation queue");
>> >+        return TimeSeriesStatsUtil
>> >+            .asCompositeData(maxQueueLength, "maximal length of
>> >observation queue");
>> >     }
>> >
>> >-    public static final String[] ITEM_NAMES = new String[] {
>> >-            "per second", "per minute", "per hour", "per week"};
>> >-
>> >     private TimeSeries getTimeSeries(Type type) {
>> >         return repoStats.getTimeSeries(type);
>> >     }
>> >
>> >     private CompositeData asCompositeData(Type type) {
>> >-        return asCompositeData(getTimeSeries(type), type.name());
>> >-    }
>> >-
>> >-    private static CompositeData asCompositeData(TimeSeries
>>timeSeries,
>> >String name) {
>> >-        try {
>> >-            long[][] values = new long[][] {
>> >-                timeSeries.getValuePerSecond(),
>> >-                timeSeries.getValuePerMinute(),
>> >-                timeSeries.getValuePerHour(),
>> >-                timeSeries.getValuePerWeek()};
>> >-            return new CompositeDataSupport(getCompositeType(name),
>> >ITEM_NAMES, values);
>> >-        } catch (Exception e) {
>> >-            LOG.error("Error creating CompositeData instance from
>> >TimeSeries", e);
>> >-            return null;
>> >-        }
>> >-    }
>> >-
>> >-    private static CompositeType getCompositeType(String name) throws
>> >OpenDataException {
>> >-        ArrayType<int[]> longArrayType = new
>> >ArrayType<int[]>(SimpleType.LONG, true);
>> >-        OpenType<?>[] itemTypes = new OpenType[] {
>> >-                longArrayType, longArrayType, longArrayType,
>> >longArrayType};
>> >-        return new CompositeType(name, name + " time series",
>> >ITEM_NAMES, ITEM_NAMES, itemTypes);
>> >+        return
>>TimeSeriesStatsUtil.asCompositeData(getTimeSeries(type),
>> >type.name());
>> >     }
>> >
>> > }
>> >
>> >Added:
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/st
>>>at
>> >s/TimeSeriesStatsUtil.java
>> >URL:
>> >
>> 
>>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/
>>o
>> 
>>>rg/apache/jackrabbit/oak/stats/TimeSeriesStatsUtil.java?rev=1635058&view
>>>=a
>> >uto
>> 
>>>========================================================================
>>>==
>> >====
>> >---
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/st
>>>at
>> >s/TimeSeriesStatsUtil.java (added)
>> >+++
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/st
>>>at
>> >s/TimeSeriesStatsUtil.java Wed Oct 29 06:17:00 2014
>> >@@ -0,0 +1,58 @@
>> >+/*
>> >+ * Licensed to the Apache Software Foundation (ASF) under one
>> >+ * or more contributor license agreements.  See the NOTICE file
>> >+ * distributed with this work for additional information
>> >+ * regarding copyright ownership.  The ASF licenses this file
>> >+ * to you under the Apache License, Version 2.0 (the
>> >+ * "License"); you may not use this file except in compliance
>> >+ * with the License.  You may obtain a copy of the License at
>> >+ *
>> >+ *   http://www.apache.org/licenses/LICENSE-2.0
>> >+ *
>> >+ * Unless required by applicable law or agreed to in writing,
>> >+ * software distributed under the License is distributed on an
>> >+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> >+ * KIND, either express or implied.  See the License for the
>> >+ * specific language governing permissions and limitations
>> >+ * under the License.
>> >+ */
>> >+package org.apache.jackrabbit.oak.stats;
>> >+
>> >+import org.apache.jackrabbit.api.stats.TimeSeries;
>> >+import org.slf4j.Logger;
>> >+import org.slf4j.LoggerFactory;
>> >+
>> >+import javax.management.openmbean.ArrayType;
>> >+import javax.management.openmbean.CompositeData;
>> >+import javax.management.openmbean.CompositeDataSupport;
>> >+import javax.management.openmbean.CompositeType;
>> >+import javax.management.openmbean.OpenDataException;
>> >+import javax.management.openmbean.OpenType;
>> >+import javax.management.openmbean.SimpleType;
>> >+
>> >+/**
>> >+ * Utility class for retrieving {@link
>> >javax.management.openmbean.CompositeData} for
>> >+ * {@link org.apache.jackrabbit.api.stats.TimeSeries}.
>> >+ */
>> >+public class TimeSeriesStatsUtil {
>> >+    public static final String[] ITEM_NAMES = new String[] {"per
>> >second", "per minute", "per hour", "per week"};
>> >+
>> >+    private static final Logger LOG =
>> >LoggerFactory.getLogger(TimeSeriesStatsUtil.class);
>> >+
>> >+    public static CompositeData asCompositeData(TimeSeries timeSeries,
>> >String name) {
>> >+        try {
>> >+            long[][] values = new long[][]
>> >{timeSeries.getValuePerSecond(), timeSeries.getValuePerMinute(),
>> >+                timeSeries.getValuePerHour(),
>> >timeSeries.getValuePerWeek()};
>> >+            return new CompositeDataSupport(getCompositeType(name),
>> >ITEM_NAMES, values);
>> >+        } catch (Exception e) {
>> >+            LOG.error("Error creating CompositeData instance from
>> >TimeSeries", e);
>> >+            return null;
>> >+        }
>> >+    }
>> >+
>> >+    static CompositeType getCompositeType(String name) throws
>> >OpenDataException {
>> >+        ArrayType<int[]> longArrayType = new
>> >ArrayType<int[]>(SimpleType.LONG, true);
>> >+        OpenType<?>[] itemTypes = new OpenType[] {longArrayType,
>> >longArrayType, longArrayType, longArrayType};
>> >+        return new CompositeType(name, name + " time series",
>> >ITEM_NAMES, ITEM_NAMES, itemTypes);
>> >+    }
>> >+}
>> >
>> >Propchange:
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/st
>>>at
>> >s/TimeSeriesStatsUtil.java
>> 
>>>------------------------------------------------------------------------
>>>--
>> >----
>> >    svn:eol-style = native
>> >
>> >
>>
>>

Reply via email to