Author: mduerig
Date: Wed Mar 19 13:58:58 2014
New Revision: 1579234
URL: http://svn.apache.org/r1579234
Log:
OAK-1484: additional observation queue jmx attributes
Add time series for the maximum number of pending revisions in the observation
revision queue
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stats/TimeSeriesMax.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/RepositoryStatsMBean.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stats/RepositoryStats.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stats/StatisticManager.java
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/RepositoryStatsMBean.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/RepositoryStatsMBean.java?rev=1579234&r1=1579233&r2=1579234&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/RepositoryStatsMBean.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/RepositoryStatsMBean.java
Wed Mar 19 13:58:58 2014
@@ -99,4 +99,9 @@ public interface RepositoryStatsMBean {
* @see
org.apache.jackrabbit.api.stats.RepositoryStatistics.Type#OBSERVATION_EVENT_AVERAGE
*/
CompositeData getObservationEventAverage();
+
+ /**
+ * Maximum length of observation queue in the respective time period.
+ */
+ CompositeData getObservationQueueMaxLength();
}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stats/RepositoryStats.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stats/RepositoryStats.java?rev=1579234&r1=1579233&r2=1579234&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stats/RepositoryStats.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stats/RepositoryStats.java
Wed Mar 19 13:58:58 2014
@@ -53,9 +53,11 @@ public class RepositoryStats implements
private static final Logger LOG =
LoggerFactory.getLogger(RepositoryStats.class);
private final RepositoryStatistics repoStats;
+ private final TimeSeries maxQueueLength;
- public RepositoryStats(RepositoryStatistics repoStats) {
+ public RepositoryStats(RepositoryStatistics repoStats, TimeSeries
maxQueueLength) {
this.repoStats = repoStats;
+ this.maxQueueLength = maxQueueLength;
}
@Override
@@ -128,29 +130,40 @@ public class RepositoryStats implements
return asCompositeData(OBSERVATION_EVENT_AVERAGE);
}
+ @Override
+ public CompositeData getObservationQueueMaxLength() {
+ return asCompositeData(maxQueueLength, "maximal length of observation
queue");
+ }
+
public static final String[] ITEM_NAMES = new String[] {
"per second", "per minute", "per hour", "per week"};
+ private TimeSeries getTimeSeries(Type type) {
+ return repoStats.getTimeSeries(type);
+ }
+
private CompositeData asCompositeData(Type type) {
+ return asCompositeData(getTimeSeries(type), type.name());
+ }
+
+ private static CompositeData asCompositeData(TimeSeries timeSeries, String
name) {
try {
- TimeSeries timeSeries = repoStats.getTimeSeries(type);
long[][] values = new long[][] {
timeSeries.getValuePerSecond(),
timeSeries.getValuePerMinute(),
timeSeries.getValuePerHour(),
timeSeries.getValuePerWeek()};
- return new CompositeDataSupport(getCompositeType(type),
ITEM_NAMES, values);
+ return new CompositeDataSupport(getCompositeType(name),
ITEM_NAMES, values);
} catch (Exception e) {
LOG.error("Error creating CompositeData instance from TimeSeries",
e);
return null;
}
}
- private static CompositeType getCompositeType(Type type) throws
OpenDataException {
+ private static CompositeType getCompositeType(String name) throws
OpenDataException {
ArrayType<int[]> longArrayType = new ArrayType<int[]>(SimpleType.LONG,
true);
OpenType<?>[] itemTypes = new OpenType[] {
longArrayType, longArrayType, longArrayType, longArrayType};
- String name = type.toString();
return new CompositeType(name, name + " time series", ITEM_NAMES,
ITEM_NAMES, itemTypes);
}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stats/StatisticManager.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stats/StatisticManager.java?rev=1579234&r1=1579233&r2=1579234&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stats/StatisticManager.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stats/StatisticManager.java
Wed Mar 19 13:58:58 2014
@@ -41,6 +41,7 @@ import org.apache.jackrabbit.stats.jmx.Q
public class StatisticManager {
private final QueryStatImpl queryStat = new QueryStatImpl();
private final RepositoryStatisticsImpl repoStats;
+ private final TimeSeriesMax maxQueueLength;
private final CompositeRegistration registration;
/**
@@ -50,10 +51,11 @@ public class StatisticManager {
*/
public StatisticManager(Whiteboard whiteboard, ScheduledExecutorService
executor) {
repoStats = new RepositoryStatisticsImpl(executor);
+ maxQueueLength = new TimeSeriesMax(executor);
registration = new CompositeRegistration(
registerMBean(whiteboard, QueryStatManagerMBean.class, new
QueryStatManager(queryStat),
"QueryStat", "Oak Query Statistics"),
- registerMBean(whiteboard, RepositoryStatsMBean.class, new
RepositoryStats(repoStats),
+ registerMBean(whiteboard, RepositoryStatsMBean.class, new
RepositoryStats(repoStats, maxQueueLength),
RepositoryStats.TYPE, "Oak Repository Statistics"));
}
@@ -78,6 +80,10 @@ public class StatisticManager {
return repoStats.getCounter(type);
}
+ public TimeSeriesMax maxQueLengthRecorder() {
+ return maxQueueLength;
+ }
+
/**
* Unregister all statistics previously registered with the whiteboard
passed
* to the constructor.
@@ -85,4 +91,5 @@ public class StatisticManager {
public void dispose() {
registration.unregister();
}
+
}
Added:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stats/TimeSeriesMax.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stats/TimeSeriesMax.java?rev=1579234&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stats/TimeSeriesMax.java
(added)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stats/TimeSeriesMax.java
Wed Mar 19 13:58:58 2014
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.stats;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.jackrabbit.api.stats.TimeSeries;
+
+/**
+ * Time series of the maximum value recorded in a period
+ */
+public class TimeSeriesMax implements TimeSeries {
+ private final MaxValue max = new MaxValue(0);
+ private final long[] perSecond = new long[60];
+ private final long[] perMinute = new long[60];
+ private final long[] perHour = new long[7 * 24];
+ private final long[] perWeek = new long[3 * 52];
+
+ /** Current second (index in {@link #perSecond}) */
+ private int seconds;
+
+ /** Current minute (index in {@link #perMinute}) */
+ private int minutes;
+
+ /** Current hour (index in {@link #perHour}) */
+ private int hours;
+
+ /** Current week (index in {@link #perWeek}) */
+ private int weeks;
+
+ public TimeSeriesMax(ScheduledExecutorService executor) {
+ executor.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ perSecond[seconds++] = max.getAndSetValue(0);
+ if (seconds == perSecond.length) {
+ seconds = 0;
+ perMinute[minutes++] = max(perSecond);
+ }
+ if (minutes == perMinute.length) {
+ minutes = 0;
+ perHour[hours++] = max(perMinute);
+ }
+ if (hours == perHour.length) {
+ hours = 0;
+ perWeek[weeks++] = max(perHour);
+ }
+ if (weeks == perWeek.length) {
+ weeks = 0;
+ }
+ }
+ }, 1, 1, TimeUnit.SECONDS);
+ }
+
+ public void recordValue(long value) {
+ max.setIfMaximal(value);
+ }
+
+ @Override
+ public synchronized long[] getValuePerSecond() {
+ return cyclicCopyFrom(perSecond, seconds);
+ }
+
+ @Override
+ public synchronized long[] getValuePerMinute() {
+ return cyclicCopyFrom(perMinute, minutes);
+ }
+
+ @Override
+ public synchronized long[] getValuePerHour() {
+ return cyclicCopyFrom(perHour, hours);
+ }
+
+ @Override
+ public synchronized long[] getValuePerWeek() {
+ return cyclicCopyFrom(perWeek, weeks);
+ }
+
+ /**
+ * Returns the maximum of all entries in the given array.
+ */
+ private static long max(long[] array) {
+ long max = Long.MIN_VALUE;
+ for (long v : array) {
+ if (v > max) {
+ max = v;
+ }
+ }
+ return max;
+ }
+
+ /**
+ * Returns a copy of the given cyclical array, with the element at
+ * the given position as the first element of the returned array.
+ *
+ * @param array cyclical array
+ * @param pos position of the first element
+ * @return copy of the array
+ */
+ private static long[] cyclicCopyFrom(long[] array, int pos) {
+ long[] reverse = new long[array.length];
+ for (int i = 0; i < array.length; i++) {
+ reverse[i] = array[(pos + i) % array.length];
+ }
+ return reverse;
+ }
+
+ private static class MaxValue {
+ private long value;
+
+ public MaxValue(long value) {
+ this.value = value;
+ }
+
+ public synchronized long getAndSetValue(long value) {
+ long v = this.value;
+ this.value = value;
+ return v;
+ }
+
+ public synchronized void setIfMaximal(long value) {
+ if (value > this.value) {
+ this.value = value;
+ }
+ }
+ }
+}
Modified:
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java?rev=1579234&r1=1579233&r2=1579234&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
(original)
+++
jackrabbit/oak/trunk/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
Wed Mar 19 13:58:58 2014
@@ -60,6 +60,7 @@ import org.apache.jackrabbit.oak.spi.whi
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardExecutor;
import org.apache.jackrabbit.oak.stats.StatisticManager;
+import org.apache.jackrabbit.oak.stats.TimeSeriesMax;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,6 +82,7 @@ class ChangeProcessor implements Observe
private final AtomicReference<List<FilterProvider>> filterProvider;
private final AtomicLong eventCount;
private final AtomicLong eventDuration;
+ private final TimeSeriesMax maxQueueLength;
private final int queueLength;
private final CommitRateLimiter commitRateLimiter;
@@ -104,6 +106,7 @@ class ChangeProcessor implements Observe
filterProvider = new AtomicReference<List<FilterProvider>>(filters);
this.eventCount =
statisticManager.getCounter(OBSERVATION_EVENT_COUNTER);
this.eventDuration =
statisticManager.getCounter(OBSERVATION_EVENT_DURATION);
+ this.maxQueueLength = statisticManager.maxQueLengthRecorder();
this.queueLength = queueLength;
this.commitRateLimiter = commitRateLimiter;
}
@@ -151,6 +154,7 @@ class ChangeProcessor implements Observe
@Override
protected void added(int queueSize) {
+ maxQueueLength.recordValue(queueSize);
if (warnWhenFull && queueSize == queueLength) {
warnWhenFull = false;
if (commitRateLimiter != null) {