Author: davide Date: Fri Jun 5 10:07:56 2015 New Revision: 1683700 URL: http://svn.apache.org/r1683700 Log: OAK-2961 - Async index fails with OakState0001: Unresolved conflicts in /:async
- ignored the test - generic class for extending document cluster tests. DocumentClusterIT. - test dependencies for running the IT Added: jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/DocumentClusterIT.java jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/jcr/AsyncConflictsIT.java Modified: jackrabbit/oak/trunk/oak-lucene/pom.xml Added: jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/DocumentClusterIT.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/DocumentClusterIT.java?rev=1683700&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/DocumentClusterIT.java (added) +++ jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/DocumentClusterIT.java Fri Jun 5 10:07:56 2015 @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.jcr; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.jackrabbit.oak.jcr.AbstractRepositoryTest.dispose; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.jcr.Credentials; +import javax.jcr.Repository; +import javax.jcr.Session; +import javax.jcr.SimpleCredentials; + +import org.apache.jackrabbit.oak.plugins.document.DocumentMK; +import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection; +import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider; +import org.junit.After; +import org.junit.Before; +import org.slf4j.Logger; + +/** + * abstract class that can be inherited by an IT who has to run tests against a cluster of + * DocumentMKs for having some utility methods available. + */ +public abstract class DocumentClusterIT { + List<Repository> repos = new ArrayList<Repository>(); + List<DocumentMK> mks = new ArrayList<DocumentMK>(); + + /** + * the number of nodes we'd like to run against + */ + static final int NUM_CLUSTER_NODES = Integer.getInteger("it.documentmk.cluster.nodes", 5); + + /** + * credentials for logging in as {@code admin} + */ + static final Credentials ADMIN = new SimpleCredentials("admin", "admin".toCharArray()); + + static final int NOT_PROVIDED = Integer.MIN_VALUE; + + @Before + public void before() throws Exception { + dropDB(this.getClass()); + + List<Repository> rs = new ArrayList<Repository>(); + List<DocumentMK> ds = new ArrayList<DocumentMK>(); + + initRepository(this.getClass(), rs, ds, 1, NOT_PROVIDED); + + Repository repository = rs.get(0); + DocumentMK mk = ds.get(0); + + Session session = repository.login(ADMIN); + session.logout(); + dispose(repository); + mk.dispose(); // closes connection as well + } + + @After + public void after() throws Exception { + for (Repository repo : repos) { + dispose(repo); + } + for (DocumentMK mk : mks) { + mk.dispose(); + } + dropDB(this.getClass()); + } + + /** + * raise the exception passed into the provided Map + * + * @param exceptions + * @param log may be null. If valid Logger it will be logged + * @throws Exception + */ + static void raiseExceptions(@Nonnull final Map<String, Exception> exceptions, + @Nullable final Logger log) throws Exception { + if (exceptions != null) { + for (Map.Entry<String, Exception> entry : exceptions.entrySet()) { + if (log != null) { + log.error("Exception in thread {}", entry.getKey(), entry.getValue()); + } + throw entry.getValue(); + } + } + } + + /** + * <p> + * ensures that the cluster is aligned by running all the background operations + * </p> + * + * <p> + * In order to use this you have to initialise the cluster with {@code setAsyncDelay(0)}. + * </p> + * + * @param mks the list of {@link DocumentMK} composing the cluster. Cannot be null. + */ + static void alignCluster(@Nonnull final List<DocumentMK> mks) { + for (int i = 0; i < 2; i++) { + for (DocumentMK mk : mks) { + mk.getNodeStore().runBackgroundOperations(); + } + } + } + + /** + * set up the cluster connections + * + * @param clazz class used for logging into Mongo itself + * @param mks the list of mks to work on. + * @param repos list of {@link Repository} created on each {@code mks} + * @throws Exception + */ + void setUpCluster(@Nonnull final Class<?> clazz, + @Nonnull final List<DocumentMK> mks, + @Nonnull final List<Repository> repos) throws Exception { + setUpCluster(clazz, mks, repos, NOT_PROVIDED); + } + + void setUpCluster(@Nonnull final Class<?> clazz, + @Nonnull final List<DocumentMK> mks, + @Nonnull final List<Repository> repos, + final int asyncDelay) throws Exception { + for (int i = 0; i < NUM_CLUSTER_NODES; i++) { + initRepository(clazz, repos, mks, i + 1, asyncDelay); + } + } + + static MongoConnection createConnection(@Nonnull final Class<?> clazz) throws Exception { + return OakMongoNSRepositoryStub.createConnection( + checkNotNull(clazz).getSimpleName()); + } + + static void dropDB(@Nonnull final Class<?> clazz) throws Exception { + MongoConnection con = createConnection(checkNotNull(clazz)); + try { + con.getDB().dropDatabase(); + } finally { + con.close(); + } + } + + /** + * initialise the repository + * + * @param clazz the current class. Used for logging. Cannot be null. + * @param repos list to which add the created repository. Cannot be null. + * @param mks list to which add the created MK. Cannot be null. + * @param clusterId the cluster ID to use. Must be greater than 0. + * @param asyncDelay the async delay to set. For default use {@link #NOT_PROVIDED} + * @throws Exception + */ + protected void initRepository(@Nonnull final Class<?> clazz, + @Nonnull final List<Repository> repos, + @Nonnull final List<DocumentMK> mks, + final int clusterId, + final int asyncDelay) throws Exception { + DocumentMK.Builder builder = new DocumentMK.Builder(); + builder.setMongoDB(createConnection(checkNotNull(clazz)).getDB()); + if (asyncDelay != NOT_PROVIDED) { + builder.setAsyncDelay(asyncDelay); + } + builder.setClusterId(clusterId); + + DocumentMK mk = builder.open(); + Jcr j = new Jcr(mk.getNodeStore()); + + Set<IndexEditorProvider> ieps = additionalIndexEditorProviders(); + if (ieps != null) { + for (IndexEditorProvider p : ieps) { + j = j.with(p); + } + } + + if (isAsyncIndexing()) { + j = j.withAsyncIndexing(); + } + + Repository repository = j.createRepository(); + + checkNotNull(repos).add(repository); + checkNotNull(mks).add(mk); + } + + /** + * <p> + * the default {@link #initRepository(Class, List, List, int, int)} uses this for registering + * any additional {@link IndexEditorProvider}. Override and return all the provider you'd like + * to have running other than the OOTB one. + * </p> + * + * <p> + * the default implementation returns {@code null} + * </p> + * @return + */ + protected Set<IndexEditorProvider> additionalIndexEditorProviders() { + return null; + } + + /** + * override to change default behaviour. If {@code true} will enable the async indexing in the + * cluster. Default is {@code false} + * + * @return + */ + protected boolean isAsyncIndexing() { + return false; + } +} Modified: jackrabbit/oak/trunk/oak-lucene/pom.xml URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/pom.xml?rev=1683700&r1=1683699&r2=1683700&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-lucene/pom.xml (original) +++ jackrabbit/oak/trunk/oak-lucene/pom.xml Fri Jun 5 10:07:56 2015 @@ -222,11 +222,21 @@ <!-- Test Dependencies --> <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> <dependency> + <groupId>org.mongodb</groupId> + <artifactId>mongo-java-driver</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.jackrabbit</groupId> <artifactId>oak-core</artifactId> <version>${project.version}</version> @@ -245,6 +255,13 @@ <version>${project.version}</version> <type>test-jar</type> <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.jackrabbit</groupId> + <artifactId>oak-commons</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> </dependency> <dependency> <groupId>org.apache.jackrabbit</groupId> Added: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/jcr/AsyncConflictsIT.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/jcr/AsyncConflictsIT.java?rev=1683700&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/jcr/AsyncConflictsIT.java (added) +++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/jcr/AsyncConflictsIT.java Fri Jun 5 10:07:56 2015 @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.jcr; + +import static org.apache.jackrabbit.JcrConstants.NT_UNSTRUCTURED; +import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME; +import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.REINDEX_PROPERTY_NAME; +import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.TYPE_PROPERTY_NAME; +import static org.apache.jackrabbit.oak.plugins.nodetype.NodeTypeConstants.NT_OAK_UNSTRUCTURED; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +import javax.annotation.Nonnull; +import javax.jcr.Node; +import javax.jcr.Repository; +import javax.jcr.RepositoryException; +import javax.jcr.Session; + +import org.apache.jackrabbit.oak.commons.FixturesHelper; +import org.apache.jackrabbit.oak.commons.FixturesHelper.Fixture; +import org.apache.jackrabbit.oak.plugins.index.IndexConstants; +import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider; +import org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexEditorProvider; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.LoggerContext; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.classic.spi.IThrowableProxy; +import ch.qos.logback.core.Appender; +import ch.qos.logback.core.filter.Filter; +import ch.qos.logback.core.read.ListAppender; +import ch.qos.logback.core.spi.FilterReply; + +import com.google.common.collect.ImmutableSet; + +public class AsyncConflictsIT extends DocumentClusterIT { + private static final Set<Fixture> FIXTURES = FixturesHelper.getFixtures(); + private static final String INDEX_DEF_NODE = "asyncconflict"; + private static final String INDEX_PROPERTY = "number"; + private static final Logger LOG = LoggerFactory.getLogger(AsyncConflictsIT.class); + + @BeforeClass + public static void assumptions() { + assumeTrue(FIXTURES.contains(Fixture.DOCUMENT_NS)); + assumeTrue(OakMongoNSRepositoryStub.isMongoDBAvailable()); + } + + @Test @Ignore("OAK-2961") + public void updates() throws Exception { + final Map<String, Exception> exceptions = Collections.synchronizedMap(new HashMap<String, Exception>()); + final Random generator = new Random(3); + final ListAppender<ILoggingEvent> logAppender = subscribeAppender(); + + setUpCluster(this.getClass(), mks, repos, NOT_PROVIDED); + defineIndex(repos.get(0)); + + final int numberNodes = 10000; + + LOG.info("adding {} nodes", numberNodes); + Session s = repos.get(0).login(ADMIN); + Node test = s.getRootNode().addNode("test"); + test.setPrimaryType(NT_OAK_UNSTRUCTURED); + + try { + for (int i = 0; i < numberNodes; i++) { + test.addNode("node" + i); + test.setProperty(INDEX_PROPERTY, generator.nextInt(numberNodes/3)); + if (i % 1024 == 0) { + s.save(); + } + } + + s.save(); + } catch (Exception e) { + exceptions.put(Thread.currentThread().getName(), e); + } finally { + s.logout(); + } + + LOG.info("Nodes added."); + + // issuing re-index + LOG.info("issuing re-index and wait for finish"); + s = repos.get(0).login(ADMIN); + try { + Node index = s.getNode("/oak:index/" + INDEX_DEF_NODE); + index.setProperty(REINDEX_PROPERTY_NAME, true); + s.save(); + } catch (Exception e) { + exceptions.put(Thread.currentThread().getName(), e); + } finally { + s.logout(); + } + while (!isReindexFinished()) { + Thread.sleep(5000); + } + + raiseExceptions(exceptions, LOG); + + // if following fails it means the Async index failed at least once. + assertTrue( + String.format("We should have not any '%s' in the logs", AsyncLogFilter.MESSAGE), + logAppender.list.isEmpty()); + + unsubscribe(logAppender); + } + + private boolean isReindexFinished() throws RepositoryException { + Session s = repos.get(0).login(ADMIN); + try { + boolean reindex = s.getNode("/oak:index/" + INDEX_DEF_NODE) + .getProperty(REINDEX_PROPERTY_NAME).getBoolean(); + return !reindex; + } finally { + s.logout(); + } + } + + private void defineIndex(@Nonnull final Repository repo) throws RepositoryException { + Session session = repo.login(ADMIN); + try { + Node n = session.getRootNode().getNode("oak:index"); + + n = n.addNode(INDEX_DEF_NODE); + n.setPrimaryType(IndexConstants.INDEX_DEFINITIONS_NODE_TYPE); + n.setProperty("compatVersion", 2); + n.setProperty(TYPE_PROPERTY_NAME, "lucene"); + n.setProperty(ASYNC_PROPERTY_NAME, "async"); + n = n.addNode("indexRules"); + n.setPrimaryType(NT_UNSTRUCTURED); + n = n.addNode("nt:unstructured"); + n = n.addNode("properties"); + n.setPrimaryType(NT_UNSTRUCTURED); + n = n.addNode("number"); + n.setPrimaryType(NT_UNSTRUCTURED); + n.setProperty("propertyIndex", true); + n.setProperty("name", INDEX_PROPERTY); + + session.save(); + } finally { + session.logout(); + } + } + + @Override + protected Set<IndexEditorProvider> additionalIndexEditorProviders() { + return ImmutableSet.of((IndexEditorProvider) new LuceneIndexEditorProvider()); + } + + @Override + protected boolean isAsyncIndexing() { + return true; + } + + private ListAppender<ILoggingEvent> subscribeAppender() { + Filter<ILoggingEvent> filter = new AsyncLogFilter(); + filter.start(); + ListAppender<ILoggingEvent> appender = new ListAppender<ILoggingEvent>(); + appender.setContext((LoggerContext) LoggerFactory.getILoggerFactory()); + appender.setName("asynclogcollector"); + appender.addFilter(filter); + appender.start(); + ((LoggerContext) LoggerFactory.getILoggerFactory()).getLogger( + ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME).addAppender(appender); + return appender; + + } + + private void unsubscribe(@Nonnull final Appender<ILoggingEvent> appender) { + ((LoggerContext) LoggerFactory.getILoggerFactory()).getLogger( + ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME).detachAppender(appender); + } + + private static class AsyncLogFilter extends Filter<ILoggingEvent> { + public static final String MESSAGE = "Unresolved conflicts in /:async"; + + @Override + public FilterReply decide(ILoggingEvent event) { + final IThrowableProxy tp = event.getThrowableProxy(); + + if (event.getLevel().isGreaterOrEqual(Level.WARN) && + tp != null && + tp.getMessage().contains(MESSAGE)) { + return FilterReply.ACCEPT; + } else { + return FilterReply.DENY; + } + } + + } +}