Author: chetanm
Date: Tue Nov 15 11:18:52 2016
New Revision: 1769778
URL: http://svn.apache.org/viewvc?rev=1769778&view=rev
Log:
OAK-5110 - Use Metrics based stats collection in AsyncIndexUpdate
-- Switched to Metrics based stats
-- Retained few JMX methods so as to prevent bumping the major version. Those
methods are noops
-- Removed the scheduled job for ticking the stats. Not required with metrics
based setup
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexMBeanRegistration.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java?rev=1769778&r1=1769777&r2=1769778&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java
Tue Nov 15 11:18:52 2016
@@ -139,6 +139,7 @@ public interface IndexStatsMBean {
*
* @return the execution times time series
*/
+ @Deprecated
CompositeData getExecutionTime();
/**
@@ -157,6 +158,7 @@ public interface IndexStatsMBean {
/**
* Resets the consolidated stats.
*/
+ @Deprecated
void resetConsolidatedExecutionStats();
/**
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1769778&r1=1769777&r2=1769778&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
Tue Nov 15 11:18:52 2016
@@ -37,8 +37,6 @@ import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
@@ -50,6 +48,7 @@ import javax.management.openmbean.OpenTy
import javax.management.openmbean.SimpleType;
import com.google.common.collect.Lists;
+import org.apache.jackrabbit.api.stats.TimeSeries;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
@@ -76,7 +75,12 @@ import org.apache.jackrabbit.oak.spi.sta
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
-import org.apache.jackrabbit.stats.TimeSeriesRecorder;
+import org.apache.jackrabbit.oak.stats.Counting;
+import org.apache.jackrabbit.oak.stats.HistogramStats;
+import org.apache.jackrabbit.oak.stats.MeterStats;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.apache.jackrabbit.oak.stats.StatsOptions;
+import org.apache.jackrabbit.oak.stats.TimerStats;
import org.apache.jackrabbit.stats.TimeSeriesStatsUtil;
import org.apache.jackrabbit.util.ISO8601;
import org.slf4j.Logger;
@@ -131,7 +135,7 @@ public class AsyncIndexUpdate implements
private final long lifetime = DEFAULT_LIFETIME; // TODO: make configurable
- private final AsyncIndexStats indexStats = new AsyncIndexStats();
+ private final AsyncIndexStats indexStats;
/** Flag to switch to synchronous updates once the index caught up to the
repo */
private final boolean switchOnSync;
@@ -193,13 +197,19 @@ public class AsyncIndexUpdate implements
private List<ValidatorProvider> validatorProviders =
Collections.emptyList();
public AsyncIndexUpdate(@Nonnull String name, @Nonnull NodeStore store,
- @Nonnull IndexEditorProvider provider, boolean switchOnSync) {
+ @Nonnull IndexEditorProvider provider, boolean
switchOnSync) {
+ this(name, store, provider, StatisticsProvider.NOOP, switchOnSync);
+ }
+
+ public AsyncIndexUpdate(@Nonnull String name, @Nonnull NodeStore store,
+ @Nonnull IndexEditorProvider provider,
StatisticsProvider statsProvider, boolean switchOnSync) {
this.name = checkNotNull(name);
this.lastIndexedTo = name + "-LastIndexedTo";
this.store = checkNotNull(store);
this.provider = checkNotNull(provider);
this.switchOnSync = switchOnSync;
this.leaseTimeOut = DEFAULT_ASYNC_TIMEOUT;
+ this.indexStats = new AsyncIndexStats(name, statsProvider);
}
public AsyncIndexUpdate(@Nonnull String name, @Nonnull NodeStore store,
@@ -785,11 +795,11 @@ public class AsyncIndexUpdate implements
return indexStats.getStatus() == STATUS_DONE;
}
- final class AsyncIndexStats extends AnnotatedStandardMBean implements
- IndexStatsMBean, Runnable {
+ final class AsyncIndexStats extends AnnotatedStandardMBean implements
IndexStatsMBean {
- protected AsyncIndexStats() {
+ protected AsyncIndexStats(String name, StatisticsProvider
statsProvider) {
super(IndexStatsMBean.class);
+ this.execStats = new ExecutionStats(name, statsProvider);
}
private String start = "";
@@ -802,7 +812,7 @@ public class AsyncIndexUpdate implements
private volatile boolean isPaused;
private volatile long updates;
private final Stopwatch watch = Stopwatch.createUnstarted();
- private final ExecutionStats execStats = new ExecutionStats();
+ private final ExecutionStats execStats;
/** Flag to avoid repeatedly logging failure warnings */
private volatile boolean failing = false;
@@ -830,8 +840,7 @@ public class AsyncIndexUpdate implements
if (watch.isRunning()) {
watch.stop();
}
- execStats.incrementCounter();
- execStats.recordExecution(watch.elapsed(TimeUnit.MILLISECONDS),
updates);
+ execStats.doneOneCycle(watch.elapsed(TimeUnit.MILLISECONDS),
updates);
watch.reset();
}
@@ -986,7 +995,8 @@ public class AsyncIndexUpdate implements
@Override
public CompositeData getExecutionTime() {
- return execStats.getExecutionTime();
+ //Do nothing. Kept for backward compatibility
+ return null;
}
@Override
@@ -1001,7 +1011,7 @@ public class AsyncIndexUpdate implements
@Override
public void resetConsolidatedExecutionStats() {
- execStats.resetConsolidatedStats();
+ //Do nothing. Kept for backward compatibility
}
@Override
@@ -1016,68 +1026,70 @@ public class AsyncIndexUpdate implements
+ latestErrorTime + ", latestError=" + latestError + " ]";
}
- @Override
- public void run() {
- execStats.recordTick();
+ ExecutionStats getExecutionStats() {
+ return execStats;
}
- private class ExecutionStats {
- private final TimeSeriesRecorder execCounter;
- private final TimeSeriesRecorder execTimer;
- private final TimeSeriesRecorder indexedNodesCounter;
-
- /**
- * Captures consolidated execution stats since last reset
- */
- private final AtomicLong consolidatedExecTime = new AtomicLong();
- private final AtomicInteger consolidatedExecRuns = new
AtomicInteger();
- private final AtomicLong consolidatedNodes = new AtomicLong();
- private final String[] names = {"Executions", "Execution Time",
"Nodes"};
- private CompositeType consolidatedType;
+ class ExecutionStats {
+ public static final String INDEXER_COUNT = "INDEXER_COUNT";
+ public static final String INDEXER_NODE_COUNT =
"INDEXER_NODE_COUNT";
+ private final MeterStats indexerExecutionCountMeter;
+ private final MeterStats indexedNodeCountMeter;
+ private final TimerStats indexerTimer;
+ private final HistogramStats indexedNodePerCycleHisto;
+ private StatisticsProvider statisticsProvider;
- private ExecutionStats() {
- execCounter = new TimeSeriesRecorder(true);
- execTimer = new TimeSeriesRecorder(true);
- indexedNodesCounter = new TimeSeriesRecorder(true);
+ private final String[] names = {"Executions", "Nodes"};
+ private final String name;
+ private CompositeType consolidatedType;
+ public ExecutionStats(String name, StatisticsProvider
statsProvider) {
+ this.name = name;
+ this.statisticsProvider = statsProvider;
+ indexerExecutionCountMeter =
statsProvider.getMeter(stats(INDEXER_COUNT), StatsOptions.DEFAULT);
+ indexedNodeCountMeter =
statsProvider.getMeter(stats(INDEXER_NODE_COUNT), StatsOptions.DEFAULT);
+ indexerTimer = statsProvider.getTimer(stats("INDEXER_TIME"),
StatsOptions.METRICS_ONLY);
+ indexedNodePerCycleHisto =
statsProvider.getHistogram(stats("INDEXER_NODE_COUNT_HISTO"), StatsOptions
+ .METRICS_ONLY);
try {
consolidatedType = new CompositeType("ConsolidatedStats",
"Consolidated stats", names,
names,
- new OpenType[] {SimpleType.LONG, SimpleType.LONG,
SimpleType.LONG});
+ new OpenType[] {SimpleType.LONG, SimpleType.LONG});
} catch (OpenDataException e) {
- log.warn("[{}] Error in creating CompositeType for
consolidated stats", name, e);
+ log.warn("[{}] Error in creating CompositeType for
consolidated stats", AsyncIndexUpdate.this.name, e);
}
}
- private void incrementCounter() {
- execCounter.getCounter().incrementAndGet();
- consolidatedExecRuns.incrementAndGet();
+ public void doneOneCycle(long timeInMillis, long updates){
+ indexerExecutionCountMeter.mark();
+ indexedNodeCountMeter.mark(updates);
+ indexerTimer.update(timeInMillis, TimeUnit.MILLISECONDS);
+ indexedNodePerCycleHisto.update(updates);
}
- private void recordExecution(long time, long updates) {
- execTimer.getCounter().addAndGet(time);
- indexedNodesCounter.getCounter().addAndGet(updates);
- consolidatedExecTime.addAndGet(time);
- consolidatedNodes.addAndGet(updates);
+ public Counting getExecutionCounter() {
+ return indexerExecutionCountMeter;
}
- private CompositeData getExecutionCount() {
- return TimeSeriesStatsUtil.asCompositeData(execCounter,
"ExecutionCount");
+ public Counting getIndexedNodeCount() {
+ return indexedNodeCountMeter;
}
- private CompositeData getExecutionTime() {
- return TimeSeriesStatsUtil.asCompositeData(execTimer,
"ExecutionTime");
+ private CompositeData getExecutionCount() {
+ return
TimeSeriesStatsUtil.asCompositeData(getTimeSeries(stats(INDEXER_COUNT)),
+ "Indexer Execution Count");
}
private CompositeData getIndexedNodesCount() {
- return
TimeSeriesStatsUtil.asCompositeData(indexedNodesCounter, "ExecutionNodesCount");
+ return
TimeSeriesStatsUtil.asCompositeData(getTimeSeries(stats(INDEXER_NODE_COUNT)),
+ "Indexer Node Count");
}
private CompositeData getConsolidatedStats() {
try {
- Long[] values = new
Long[]{consolidatedExecRuns.longValue(),
- consolidatedExecTime.longValue(),
consolidatedNodes.longValue()};
+ Long[] values = new
Long[]{indexerExecutionCountMeter.getCount(),
+ indexedNodeCountMeter.getCount()};
return new CompositeDataSupport(consolidatedType, names,
values);
} catch (Exception e) {
log.error("[{}] Error retrieving consolidated stats",
name, e);
@@ -1085,16 +1097,12 @@ public class AsyncIndexUpdate implements
}
}
- private void resetConsolidatedStats() {
- consolidatedExecRuns.set(0);
- consolidatedExecTime.set(0);
- consolidatedNodes.set(0);
+ private String stats(String suffix){
+ return name + ":" + suffix;
}
- private void recordTick() {
- execCounter.recordOneSecond();
- execTimer.recordOneSecond();
- indexedNodesCounter.recordOneSecond();
+ private TimeSeries getTimeSeries(String name) {
+ return statisticsProvider.getStats().getTimeSeries(name, true);
}
}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java?rev=1769778&r1=1769777&r2=1769778&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerService.java
Tue Nov 15 11:18:52 2016
@@ -39,6 +39,7 @@ import org.apache.jackrabbit.oak.spi.com
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardIndexEditorProvider;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.osgi.framework.BundleContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,6 +82,9 @@ public class AsyncIndexerService {
@Reference(target = "(type=" + ChangeCollectorProvider.TYPE + ")")
private ValidatorProvider validatorProvider;
+ @Reference
+ private StatisticsProvider statisticsProvider;
+
private IndexMBeanRegistration indexRegistration;
@Activate
@@ -94,7 +98,8 @@ public class AsyncIndexerService {
long leaseTimeOutMin =
PropertiesUtil.toInteger(config.get(PROP_LEASE_TIME_OUT),
PROP_LEASE_TIMEOUT_DEFAULT);
for (AsyncConfig c : asyncIndexerConfig) {
- AsyncIndexUpdate task = new AsyncIndexUpdate(c.name, nodeStore,
indexEditorProvider);
+ AsyncIndexUpdate task = new AsyncIndexUpdate(c.name, nodeStore,
indexEditorProvider,
+ statisticsProvider, false);
task.setValidatorProviders(Collections.singletonList(validatorProvider));
task.setLeaseTimeOut(TimeUnit.MINUTES.toMillis(leaseTimeOutMin));
indexRegistration.registerAsyncIndexer(task, c.timeIntervalInSecs);
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexMBeanRegistration.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexMBeanRegistration.java?rev=1769778&r1=1769777&r2=1769778&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexMBeanRegistration.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexMBeanRegistration.java
Tue Nov 15 11:18:52 2016
@@ -45,8 +45,6 @@ public class IndexMBeanRegistration impl
regs.add(scheduleWithFixedDelay(whiteboard, task, config,
delayInSeconds, true, true));
regs.add(registerMBean(whiteboard, IndexStatsMBean.class,
task.getIndexStats(), IndexStatsMBean.TYPE, task.getName()));
- // Register AsyncIndexStats for execution stats update
- regs.add(scheduleWithFixedDelay(whiteboard, task.getIndexStats(), 1));
}
@Override
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java?rev=1769778&r1=1769777&r2=1769778&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
Tue Nov 15 11:18:52 2016
@@ -35,6 +35,7 @@ import static org.junit.Assert.assertTha
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
@@ -42,6 +43,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -59,12 +62,14 @@ import org.apache.jackrabbit.oak.api.Pro
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean;
import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
import
org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.AsyncIndexStats;
import
org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate.IndexTaskSpliter;
import
org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider;
import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexLookup;
import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
+import org.apache.jackrabbit.oak.plugins.metric.MetricStatisticsProvider;
import org.apache.jackrabbit.oak.query.index.FilterImpl;
import org.apache.jackrabbit.oak.spi.commit.CommitContext;
import org.apache.jackrabbit.oak.spi.commit.CommitHook;
@@ -83,6 +88,7 @@ import org.apache.jackrabbit.oak.spi.sta
import org.apache.jackrabbit.oak.spi.state.ProxyNodeStore;
import org.apache.jackrabbit.oak.stats.Clock;
import org.apache.jackrabbit.util.ISO8601;
+import org.junit.After;
import org.junit.Test;
import ch.qos.logback.classic.Level;
@@ -92,6 +98,15 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
public class AsyncIndexUpdateTest {
+ private ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
+ private MetricStatisticsProvider statsProvider =
+ new
MetricStatisticsProvider(ManagementFactory.getPlatformMBeanServer(),executor);
+
+ @After
+ public void shutDown(){
+ statsProvider.close();
+ new ExecutorCloser(executor).close();
+ }
// TODO test index config deletes
@@ -1034,17 +1049,18 @@ public class AsyncIndexUpdateTest {
// merge it back in
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
- AsyncIndexUpdate async = new AsyncIndexUpdate("async", store,
provider);
+ AsyncIndexUpdate async = new AsyncIndexUpdate("async", store,
provider, statsProvider, false);
runOneCycle(async);
- assertEquals(1,
lastExecutionStats(async.getIndexStats().getExecutionCount()));
+ assertEquals(1,
async.getIndexStats().getExecutionStats().getExecutionCounter().getCount());
//Run a cycle so that change of reindex flag gets indexed
runOneCycle(async);
- assertEquals(1,
lastExecutionStats(async.getIndexStats().getExecutionCount()));
+ assertEquals(2,
async.getIndexStats().getExecutionStats().getExecutionCounter().getCount());
+ long indexedNodeCount =
async.getIndexStats().getExecutionStats().getIndexedNodeCount().getCount();
//Now run so that it results in an empty cycle
runOneCycle(async);
- assertEquals(1,
lastExecutionStats(async.getIndexStats().getExecutionCount()));
+ assertEquals(indexedNodeCount,
async.getIndexStats().getExecutionStats().getIndexedNodeCount().getCount());
//Do some updates and counter should increase
builder = store.getRoot().builder();
@@ -1052,7 +1068,7 @@ public class AsyncIndexUpdateTest {
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
runOneCycle(async);
- assertEquals(1,
lastExecutionStats(async.getIndexStats().getExecutionCount()));
+ assertEquals(4,
async.getIndexStats().getExecutionStats().getExecutionCounter().getCount());
//Do some updates but disable checkpoints. Counter should not increase
builder = store.getRoot().builder();
@@ -1073,7 +1089,6 @@ public class AsyncIndexUpdateTest {
private static void runOneCycle(AsyncIndexUpdate async){
async.run();
- async.getIndexStats().run();
}
@Test
Modified:
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java?rev=1769778&r1=1769777&r2=1769778&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexerServiceTest.java
Tue Nov 15 11:18:52 2016
@@ -37,6 +37,7 @@ import org.apache.jackrabbit.oak.spi.com
import org.apache.jackrabbit.oak.spi.commit.ValidatorProvider;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.apache.sling.testing.mock.osgi.MockOsgi;
import org.apache.sling.testing.mock.osgi.junit.OsgiContext;
import org.junit.Before;
@@ -57,6 +58,7 @@ public class AsyncIndexerServiceTest {
@Before
public void setUp() {
+ context.registerService(StatisticsProvider.class,
StatisticsProvider.NOOP);
context.registerService(NodeStore.class, nodeStore);
context.registerService(ValidatorProvider.class, new
ChangeCollectorProvider());
MockOsgi.injectServices(service, context.bundleContext());