Removes obsolete plugins. Adds tests and implements entity-data plugin.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2689a015 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2689a015 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2689a015 Branch: refs/heads/USERGRID-933 Commit: 2689a015938e6b05ceb6f6cc2c228f61cb5e3b17 Parents: acbf3a1 Author: Todd Nine <tn...@apigee.com> Authored: Thu Sep 17 16:17:43 2015 -0600 Committer: Todd Nine <tn...@apigee.com> Committed: Thu Sep 17 16:17:43 2015 -0600 ---------------------------------------------------------------------- .../usergrid/corepersistence/CoreModule.java | 53 +++++-- .../migration/CoreMigration.java | 45 ++---- .../migration/CoreMigrationPlugin.java | 31 ++-- .../migration/DeDupConnectionDataMigration.java | 93 ++++++++++++ .../migration/EntityTypeMappingMigration.java | 104 -------------- .../corepersistence/migration/Versions.java | 31 ---- .../DeDupConnectionDataMigrationTest.java | 144 +++++++++++++++++++ .../migration/EntityTypeMappingMigrationIT.java | 125 ---------------- .../migration/data/TestProgressObserver.java | 10 +- 9 files changed, 314 insertions(+), 322 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/2689a015/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java index 7ed95e2..11707be 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java @@ -16,22 +16,43 @@ package org.apache.usergrid.corepersistence; -import com.google.inject.AbstractModule; -import com.google.inject.TypeLiteral; -import com.google.inject.assistedinject.FactoryModuleBuilder; -import com.google.inject.multibindings.Multibinder; +import org.safehaus.guicyfig.GuicyFigModule; + import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.corepersistence.asyncevents.AsyncIndexProvider; import org.apache.usergrid.corepersistence.asyncevents.EventBuilder; import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl; -import org.apache.usergrid.corepersistence.index.*; +import org.apache.usergrid.corepersistence.index.ApplicationIndexBucketLocator; +import org.apache.usergrid.corepersistence.index.CoreIndexFig; +import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory; +import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactoryImpl; +import org.apache.usergrid.corepersistence.index.IndexProcessorFig; +import org.apache.usergrid.corepersistence.index.IndexService; +import org.apache.usergrid.corepersistence.index.IndexServiceImpl; +import org.apache.usergrid.corepersistence.index.ReIndexService; +import org.apache.usergrid.corepersistence.index.ReIndexServiceImpl; import org.apache.usergrid.corepersistence.migration.CoreMigration; import org.apache.usergrid.corepersistence.migration.CoreMigrationPlugin; -import org.apache.usergrid.corepersistence.migration.EntityTypeMappingMigration; +import org.apache.usergrid.corepersistence.migration.DeDupConnectionDataMigration; import org.apache.usergrid.corepersistence.migration.MigrationModuleVersionPlugin; import org.apache.usergrid.corepersistence.pipeline.PipelineModule; -import org.apache.usergrid.corepersistence.rx.impl.*; -import org.apache.usergrid.corepersistence.service.*; +import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservable; +import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservableImpl; +import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemImpl; +import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable; +import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservableImpl; +import org.apache.usergrid.corepersistence.rx.impl.AllNodesInGraphImpl; +import org.apache.usergrid.corepersistence.service.AggregationService; +import org.apache.usergrid.corepersistence.service.AggregationServiceFactory; +import org.apache.usergrid.corepersistence.service.AggregationServiceImpl; +import org.apache.usergrid.corepersistence.service.ApplicationService; +import org.apache.usergrid.corepersistence.service.ApplicationServiceImpl; +import org.apache.usergrid.corepersistence.service.CollectionService; +import org.apache.usergrid.corepersistence.service.CollectionServiceImpl; +import org.apache.usergrid.corepersistence.service.ConnectionService; +import org.apache.usergrid.corepersistence.service.ConnectionServiceImpl; +import org.apache.usergrid.corepersistence.service.StatusService; +import org.apache.usergrid.corepersistence.service.StatusServiceImpl; import org.apache.usergrid.persistence.collection.guice.CollectionModule; import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; import org.apache.usergrid.persistence.core.guice.CommonModule; @@ -42,7 +63,11 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.guice.GraphModule; import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphNode; import org.apache.usergrid.persistence.index.guice.IndexModule; -import org.safehaus.guicyfig.GuicyFigModule; + +import com.google.inject.AbstractModule; +import com.google.inject.TypeLiteral; +import com.google.inject.assistedinject.FactoryModuleBuilder; +import com.google.inject.multibindings.Multibinder; /** @@ -105,7 +130,7 @@ public class CoreModule extends AbstractModule { new TypeLiteral<DataMigration>() {}, CoreMigration.class ); - dataMigrationMultibinder.addBinding().to( EntityTypeMappingMigration.class ); + dataMigrationMultibinder.addBinding().to( DeDupConnectionDataMigration.class ); //wire up the collection migration plugin @@ -164,6 +189,14 @@ public class CoreModule extends AbstractModule { bind( ApplicationService.class ).to( ApplicationServiceImpl.class ); bind( StatusService.class ).to( StatusServiceImpl.class ); + + /** + * Install migration services + */ + + //migrations + //we want to make sure our generics are retained, so we use a typeliteral + } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/2689a015/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/CoreMigration.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/CoreMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/CoreMigration.java index 4df32da..b491e1f 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/CoreMigration.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/CoreMigration.java @@ -1,40 +1,21 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ -package org.apache.usergrid.corepersistence.migration;/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ +package org.apache.usergrid.corepersistence.migration; import java.lang.annotation.Retention; http://git-wip-us.apache.org/repos/asf/usergrid/blob/2689a015/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/CoreMigrationPlugin.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/CoreMigrationPlugin.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/CoreMigrationPlugin.java index 810f9fb..b69451a 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/CoreMigrationPlugin.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/CoreMigrationPlugin.java @@ -1,20 +1,18 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.usergrid.corepersistence.migration; @@ -22,9 +20,10 @@ package org.apache.usergrid.corepersistence.migration; import java.util.Set; -import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization; +import org.apache.usergrid.persistence.collection.serialization.impl.migration.CollectionMigration; import org.apache.usergrid.persistence.core.migration.data.AbstractMigrationPlugin; import org.apache.usergrid.persistence.core.migration.data.DataMigration; +import org.apache.usergrid.persistence.core.migration.data.MigrationInfoSerialization; import org.apache.usergrid.persistence.core.migration.data.PluginPhase; import com.google.inject.Inject; @@ -37,7 +36,7 @@ import com.google.inject.Singleton; @Singleton public class CoreMigrationPlugin extends AbstractMigrationPlugin { - public static final String PLUGIN_NAME = "core-data"; + public static final String PLUGIN_NAME = "collections-entity-data"; http://git-wip-us.apache.org/repos/asf/usergrid/blob/2689a015/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/DeDupConnectionDataMigration.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/DeDupConnectionDataMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/DeDupConnectionDataMigration.java new file mode 100644 index 0000000..f65864a --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/DeDupConnectionDataMigration.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.usergrid.corepersistence.migration; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservable; +import org.apache.usergrid.corepersistence.service.ConnectionService; +import org.apache.usergrid.persistence.core.migration.data.DataMigration; +import org.apache.usergrid.persistence.core.migration.data.ProgressObserver; + +import com.google.inject.Inject; +import com.google.inject.Singleton; + + +@Singleton +public class DeDupConnectionDataMigration implements DataMigration { + + private static final Logger logger = LoggerFactory.getLogger(DeDupConnectionDataMigration.class); + + private static final long UPDATE_COUNT = 1000; + + private final ConnectionService connectionService; + private final AllApplicationsObservable allApplicationsObservable; + + + @Inject + public DeDupConnectionDataMigration( final ConnectionService connectionService, + final AllApplicationsObservable allApplicationsObservable ) { + this.connectionService = connectionService; + this.allApplicationsObservable = allApplicationsObservable; + } + + + @Override + public int migrate( final int currentVersion, final ProgressObserver observer ) { + + final int migrationVersion = getMaxVersion(); + + observer.start(); + + connectionService.deDupeConnections( allApplicationsObservable.getData() ).reduce( 0l, (count, deDuped) ->{ + + final long newCount = count+1; + + /** + * Update our progress observer + */ + if(newCount % UPDATE_COUNT == 0){ + logger.info( "De duped {} edges", newCount ); + observer.update( migrationVersion, String.format("De duped %d edges", newCount) ); + } + + return newCount; + + }).doOnNext( total -> { + logger.info("Completed de-duping {} edges", total ); + observer.complete();; + }).subscribe(); + + return migrationVersion; + + } + + + @Override + public boolean supports( final int currentVersion ) { + return currentVersion == getMaxVersion() - 1; + } + + + @Override + public int getMaxVersion() { + return 1; + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/2689a015/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java deleted file mode 100644 index ef65ff2..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.migration; - - -import java.util.UUID; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.usergrid.corepersistence.ManagerCache; -import org.apache.usergrid.corepersistence.util.CpNamingUtils; -import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; -import org.apache.usergrid.persistence.core.migration.data.DataMigration; -import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider; -import org.apache.usergrid.persistence.core.migration.data.ProgressObserver; -import org.apache.usergrid.persistence.map.MapManager; -import org.apache.usergrid.persistence.map.MapScope; - -import com.google.inject.Inject; - -import rx.Observable; -import rx.schedulers.Schedulers; - - -/** - * Migration to ensure that our entity id is written into our map data - */ -public class EntityTypeMappingMigration implements DataMigration { - - private final ManagerCache managerCache; - private final MigrationDataProvider<EntityIdScope> allEntitiesInSystemObservable; - - - @Inject - public EntityTypeMappingMigration( final ManagerCache managerCache, - final MigrationDataProvider<EntityIdScope> allEntitiesInSystemObservable ) { - this.managerCache = managerCache; - this.allEntitiesInSystemObservable = allEntitiesInSystemObservable; - } - - - @Override - public int migrate( final int currentVersion, - final ProgressObserver observer ) { - - final AtomicLong atomicLong = new AtomicLong(); - - - //migrate up to 100 types simultaneously - allEntitiesInSystemObservable.getData().flatMap( entityIdScope -> { - return Observable.just( entityIdScope ).doOnNext( entityIdScopeObservable -> { - final MapScope ms = CpNamingUtils - .getEntityTypeMapScope( entityIdScope.getApplicationScope().getApplication() ); - - final MapManager mapManager = managerCache.getMapManager( ms ); - - final UUID entityUuid = entityIdScope.getId().getUuid(); - final String entityType = entityIdScope.getId().getType(); - - mapManager.putString( entityUuid.toString(), entityType ); - - if ( atomicLong.incrementAndGet() % 100 == 0 ) { - observer.update( getMaxVersion(), - String.format( "Updated %d entities", atomicLong.get() ) ); - } - - } ).subscribeOn( Schedulers.io() ); - }, 100 ).count().toBlocking().last(); - - - return getMaxVersion(); - - - } - - - @Override - public boolean supports( final int currentVersion ) { - //we move from the migration version fix to the current version - return CoreDataVersions.INITIAL.getVersion() == currentVersion; - } - - - @Override - public int getMaxVersion() { - return CoreDataVersions.ID_MAP_FIX.getVersion(); - } -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/2689a015/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java deleted file mode 100644 index d921353..0000000 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * - * * Licensed to the Apache Software Foundation (ASF) under one - * * or more contributor license agreements. See the NOTICE file - * * distributed with this work for additional information - * * regarding copyright ownership. The ASF licenses this file - * * to you under the Apache License, Version 2.0 (the - * * "License"); you may not use this file except in compliance - * * with the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, - * * software distributed under the License is distributed on an - * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * * KIND, either express or implied. See the License for the - * * specific language governing permissions and limitations - * * under the License. - * - */ - -package org.apache.usergrid.corepersistence.migration; - - -/** - * Simple class to hold the constants of all versions - */ -public class Versions { - - -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/2689a015/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/DeDupConnectionDataMigrationTest.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/DeDupConnectionDataMigrationTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/DeDupConnectionDataMigrationTest.java new file mode 100644 index 0000000..c84ad2b --- /dev/null +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/DeDupConnectionDataMigrationTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.usergrid.corepersistence.migration; + + +import java.util.List; + +import org.junit.Test; + +import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservable; +import org.apache.usergrid.corepersistence.service.ConnectionScope; +import org.apache.usergrid.corepersistence.service.ConnectionService; +import org.apache.usergrid.corepersistence.util.CpNamingUtils; +import org.apache.usergrid.persistence.core.migration.data.TestProgressObserver; +import org.apache.usergrid.persistence.core.scope.ApplicationScope; +import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; +import org.apache.usergrid.persistence.graph.Edge; + +import rx.Observable; +import rx.Subscriber; + +import static org.apache.usergrid.persistence.core.util.IdGenerator.createId; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class DeDupConnectionDataMigrationTest { + + @Test + public void testVersion() { + + //mock up an initial system state + final int version = 0; + + final ConnectionService connectionService = mock( ConnectionService.class ); + + final AllApplicationsObservable allApplicationsObservable = mock( AllApplicationsObservable.class ); + + + final DeDupConnectionDataMigration plugin = + new DeDupConnectionDataMigration( connectionService, allApplicationsObservable ); + + + assertTrue( plugin.supports( version ) ); + + assertFalse( plugin.supports( plugin.getMaxVersion() ) ); + } + + + /** + * Test + */ + @Test + public void testEmitEdges() { + + //mock up an initial system state + final int version = 0; + + final ConnectionService connectionService = mock( ConnectionService.class ); + + + final AllApplicationsObservable allApplicationsObservable = mock( AllApplicationsObservable.class ); + + + final int count = 3000; + + final Observable<ConnectionScope> edgeEmitter = Observable.create( new DupConnectionEmitter( count ) ); + + when( connectionService.deDupeConnections( any( Observable.class ) ) ).thenReturn( edgeEmitter ); + + + final DeDupConnectionDataMigration plugin = + new DeDupConnectionDataMigration( connectionService, allApplicationsObservable ); + + + final TestProgressObserver testProgressObserver = new TestProgressObserver(); + + + plugin.migrate( version, testProgressObserver ); + + final List<String> updates = testProgressObserver.getUpdates(); + + assertEquals( "Expected 3 updates", 3, updates.size() ); + + + assertEquals( "De duped 1000 edges", updates.get( 0 ) ); + assertEquals( "De duped 2000 edges", updates.get( 1 ) ); + assertEquals( "De duped 3000 edges", updates.get( 2 ) ); + + + assertTrue( "Should complete", testProgressObserver.isComplete() ); + assertTrue( "Should start", testProgressObserver.isStarted() ); + assertFalse( "Should not fail", testProgressObserver.isFailed() ); + } + + + private final class DupConnectionEmitter implements Observable.OnSubscribe<ConnectionScope> { + + + private final int count; + + + private DupConnectionEmitter( final int count ) {this.count = count;} + + + @Override + public void call( final Subscriber<? super ConnectionScope> subscriber ) { + + final ApplicationScope applicationScope = new ApplicationScopeImpl( createId( "application" ) ); + final Edge edge = CpNamingUtils.createConnectionEdge( createId( "source" ), "test", createId( "target" ) ); + + + final ConnectionScope scope = new ConnectionScope( applicationScope, edge ); + + + subscriber.onStart(); + + for ( int i = 0; i < count; i++ ) { + subscriber.onNext( scope ); + } + + subscriber.onCompleted(); + } + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/2689a015/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java ---------------------------------------------------------------------- diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java deleted file mode 100644 index 7d22abe..0000000 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.usergrid.corepersistence.migration; - - -import org.junit.Test; - -import org.apache.usergrid.AbstractCoreIT; -import org.apache.usergrid.corepersistence.ManagerCache; -import org.apache.usergrid.corepersistence.util.CpNamingUtils; -import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; -import org.apache.usergrid.persistence.core.migration.data.TestMigrationDataProvider; -import org.apache.usergrid.persistence.core.migration.data.TestProgressObserver; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl; -import org.apache.usergrid.persistence.map.MapManager; -import org.apache.usergrid.persistence.map.MapScope; -import org.apache.usergrid.persistence.map.impl.MapScopeImpl; -import org.apache.usergrid.persistence.model.entity.Id; - -import rx.Observable; - -import static org.apache.usergrid.persistence.core.util.IdGenerator.createId; -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - - -/** - * Test for our entity type mapping - */ -public class EntityTypeMappingMigrationIT { - - - @Test - public void testIdMapping() throws Throwable { - - final Id applicationId = createId("application"); - - final ApplicationScope scope1 = new ApplicationScopeImpl( applicationId ); - - final Id entityId1 = createId("thing"); - - final EntityIdScope idScope1 = new EntityIdScope(scope1, entityId1 ); - - final MapScope mapScope1 = new MapScopeImpl(applicationId, CpNamingUtils.TYPES_BY_UUID_MAP ); - - - - final ApplicationScope scope2 = new ApplicationScopeImpl( applicationId); - - final Id entityId2 = createId("foo"); - - final EntityIdScope idScope2 = new EntityIdScope( scope2, entityId2 ); - - final MapScope mapScope2 = new MapScopeImpl(applicationId, CpNamingUtils.TYPES_BY_UUID_MAP ); - - - final Observable<EntityIdScope> scopes = Observable.just(idScope1, idScope2); - - final TestMigrationDataProvider<EntityIdScope> migrationDataProvider = new TestMigrationDataProvider<>(); - - //set our scopes - migrationDataProvider.setObservable( scopes ); - - - - - //mock up returning our map manager - final MapManager mapManager = mock(MapManager.class); - final ManagerCache managerCache = mock(ManagerCache.class); - - when(managerCache.getMapManager( eq( mapScope1 ) )).thenReturn( mapManager ); - - when(managerCache.getMapManager( eq( mapScope2 ) )).thenReturn( mapManager ); - - final TestProgressObserver progressObserver = new TestProgressObserver(); - - - //wire it up - final EntityTypeMappingMigration migration = new EntityTypeMappingMigration( managerCache, migrationDataProvider ); - - //run it - - final int returnedVersion = migration.migrate(CoreDataVersions.INITIAL.getVersion(), progressObserver ); - - - assertEquals(CoreDataVersions.ID_MAP_FIX.getVersion(), returnedVersion); - - //verify we saved it - - verify(mapManager).putString(entityId1.getUuid().toString(), entityId1.getType() ); - - verify(mapManager).putString(entityId2.getUuid().toString(), entityId2.getType() ); - - - - - - - - - - - } -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/2689a015/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/TestProgressObserver.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/TestProgressObserver.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/TestProgressObserver.java index 7f59151..21d6013 100644 --- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/TestProgressObserver.java +++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/TestProgressObserver.java @@ -30,19 +30,21 @@ public class TestProgressObserver implements ProgressObserver { private boolean started = false; + private boolean complete = false; + private List<String> updates = new ArrayList<>( 100 ); @Override public void start() { - started = true; + started = true; } @Override public void complete() { - started = false; + complete = true; } @@ -66,7 +68,6 @@ public class TestProgressObserver implements ProgressObserver { /** * Get if we failed - * @return */ public boolean isFailed() { return failed; @@ -78,10 +79,11 @@ public class TestProgressObserver implements ProgressObserver { } + public boolean isComplete() { return complete;} + /** * Get update messages - * @return */ public List<String> getUpdates() { return updates;