This is an automated email from the ASF dual-hosted git repository. kdoran pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-registry.git
The following commit(s) were added to refs/heads/main by this push: new f1757f5 NIFIREG-409 Refactoring revision management f1757f5 is described below commit f1757f5ece14ca140f3526164e7ae03cf6c218b1 Author: Bryan Bende <bbe...@apache.org> AuthorDate: Tue Aug 4 13:14:12 2020 -0400 NIFIREG-409 Refactoring revision management Make RevisionManager responsible for incrementing revisions rather than caller. This creates consistency between the naive and JDBC implementations. This closes #291. Signed-off-by: Kevin Doran <kdo...@apache.org> --- .../{UpdateRevisionTask.java => UpdateResult.java} | 22 +++++--- .../registry/revision/api/UpdateRevisionTask.java | 8 +-- .../revision/naive/NaiveRevisionManager.java | 42 +++++++++++---- .../revision/standard/StandardUpdateResult.java | 59 ++++++++++++++++++++++ .../entity/StandardRevisableEntityService.java | 50 +++++++----------- .../revision/jdbc/JdbcRevisionManager.java | 26 ++++++++-- .../revision/jdbc/TestJdbcRevisionManager.java | 24 ++++----- 7 files changed, 161 insertions(+), 70 deletions(-) diff --git a/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-api/src/main/java/org/apache/nifi/registry/revision/api/UpdateRevisionTask.java b/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-api/src/main/java/org/apache/nifi/registry/revision/api/UpdateResult.java similarity index 76% copy from nifi-registry-core/nifi-registry-revision/nifi-registry-revision-api/src/main/java/org/apache/nifi/registry/revision/api/UpdateRevisionTask.java copy to nifi-registry-core/nifi-registry-revision/nifi-registry-revision-api/src/main/java/org/apache/nifi/registry/revision/api/UpdateResult.java index 3db8f9f..4181460 100644 --- a/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-api/src/main/java/org/apache/nifi/registry/revision/api/UpdateRevisionTask.java +++ b/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-api/src/main/java/org/apache/nifi/registry/revision/api/UpdateResult.java @@ -18,17 +18,27 @@ package org.apache.nifi.registry.revision.api; /** * <p> - * A task that is responsible for updating some entities. + * The result of an update task. * </p> * * NOTE: This API is considered a framework level API for the NiFi ecosystem and may evolve as * the NiFi PMC and committers deem necessary. It is not considered a public extension point. */ -public interface UpdateRevisionTask<T> { +public interface UpdateResult<T> { + + /** + * @return the entity that was updated + */ + T getEntity(); + /** - * Updates one or more entities and returns updated Revisions for those entities. - * - * @return the updated revisions for the entities + * @return the id of the entity that was updated */ - RevisionUpdate<T> update(); + String getEntityId(); + + /** + * @return the identity of the user that updated the entity + */ + String updaterIdentity(); + } diff --git a/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-api/src/main/java/org/apache/nifi/registry/revision/api/UpdateRevisionTask.java b/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-api/src/main/java/org/apache/nifi/registry/revision/api/UpdateRevisionTask.java index 3db8f9f..c9d5748 100644 --- a/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-api/src/main/java/org/apache/nifi/registry/revision/api/UpdateRevisionTask.java +++ b/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-api/src/main/java/org/apache/nifi/registry/revision/api/UpdateRevisionTask.java @@ -18,7 +18,7 @@ package org.apache.nifi.registry.revision.api; /** * <p> - * A task that is responsible for updating some entities. + * A task that is responsible for updating an entity. * </p> * * NOTE: This API is considered a framework level API for the NiFi ecosystem and may evolve as @@ -26,9 +26,9 @@ package org.apache.nifi.registry.revision.api; */ public interface UpdateRevisionTask<T> { /** - * Updates one or more entities and returns updated Revisions for those entities. + * Updates an entity and returns the resulting entity. * - * @return the updated revisions for the entities + * @return the update result containing the updated entity */ - RevisionUpdate<T> update(); + UpdateResult<T> update(); } diff --git a/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-common/src/main/java/org/apache/nifi/registry/revision/naive/NaiveRevisionManager.java b/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-common/src/main/java/org/apache/nifi/registry/revision/naive/NaiveRevisionManager.java index 0d161cd..641a360 100644 --- a/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-common/src/main/java/org/apache/nifi/registry/revision/naive/NaiveRevisionManager.java +++ b/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-common/src/main/java/org/apache/nifi/registry/revision/naive/NaiveRevisionManager.java @@ -17,22 +17,27 @@ package org.apache.nifi.registry.revision.naive; import org.apache.nifi.registry.revision.api.DeleteRevisionTask; +import org.apache.nifi.registry.revision.api.EntityModification; import org.apache.nifi.registry.revision.api.ExpiredRevisionClaimException; import org.apache.nifi.registry.revision.api.InvalidRevisionException; import org.apache.nifi.registry.revision.api.Revision; import org.apache.nifi.registry.revision.api.RevisionClaim; import org.apache.nifi.registry.revision.api.RevisionManager; import org.apache.nifi.registry.revision.api.RevisionUpdate; +import org.apache.nifi.registry.revision.api.UpdateResult; import org.apache.nifi.registry.revision.api.UpdateRevisionTask; import org.apache.nifi.registry.revision.standard.RevisionComparator; +import org.apache.nifi.registry.revision.standard.StandardRevisionUpdate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -103,7 +108,8 @@ public class NaiveRevisionManager implements RevisionManager { } @Override - public <T> RevisionUpdate<T> updateRevision(final RevisionClaim originalClaim, final UpdateRevisionTask<T> task) throws ExpiredRevisionClaimException { + public <T> RevisionUpdate<T> updateRevision(final RevisionClaim originalClaim, final UpdateRevisionTask<T> task) + throws ExpiredRevisionClaimException { logger.debug("Attempting to update revision using {}", originalClaim); final List<Revision> revisionList = new ArrayList<>(originalClaim.getRevisions()); @@ -123,17 +129,35 @@ public class NaiveRevisionManager implements RevisionManager { logger.debug("Successfully verified Revision Claim for all revisions"); // Perform the update - final RevisionUpdate<T> updatedComponent = task.update(); + // If an exception is thrown we don't want to update revision so it is ok to bounce out of this method + final UpdateResult<T> updateResult = task.update(); + if (updateResult == null) { + return null; + } - // If the update succeeded then put the updated revisions into the revisionMap - // If an exception is thrown during the update we don't want to update revision so it is ok to bounce out of this method - if (updatedComponent != null) { - for (final Revision updatedRevision : updatedComponent.getUpdatedRevisions()) { - revisionMap.put(updatedRevision.getEntityId(), updatedRevision); - } + // The update succeeded so increment the revisions + final Set<Revision> incrementedRevisions = new HashSet<>(); + for (final Revision incomingRevision : revisionList) { + final String entityId = incomingRevision.getEntityId(); + final String clientId = incomingRevision.getClientId(); + + // retrieve the revision from the map here because the incoming revision may have been + // verified based on the client id and may not contain the latest version + final Revision existingRevision = revisionMap.get(entityId); + final Revision incrementedRevision = existingRevision.incrementRevision(clientId); + incrementedRevisions.add(incrementedRevision); + + revisionMap.put(entityId, incrementedRevision); } - return updatedComponent; + // Create the result with the updated entity and updated revisions + final T updatedEntity = updateResult.getEntity(); + final String updaterIdentity = updateResult.updaterIdentity(); + + final Revision updatedEntityRevision = revisionMap.get(updateResult.getEntityId()); + final EntityModification entityModification = new EntityModification(updatedEntityRevision, updaterIdentity); + + return new StandardRevisionUpdate<>(updatedEntity, entityModification, incrementedRevisions); } } diff --git a/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-common/src/main/java/org/apache/nifi/registry/revision/standard/StandardUpdateResult.java b/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-common/src/main/java/org/apache/nifi/registry/revision/standard/StandardUpdateResult.java new file mode 100644 index 0000000..a09ed0a --- /dev/null +++ b/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-common/src/main/java/org/apache/nifi/registry/revision/standard/StandardUpdateResult.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.revision.standard; + +import org.apache.nifi.registry.revision.api.UpdateResult; + +public class StandardUpdateResult<T> implements UpdateResult<T> { + + private final T entity; + private final String entityId; + private final String updaterIdentity; + + public StandardUpdateResult(final T entity, final String entityId, final String updaterIdentity) { + this.entity = entity; + this.entityId = entityId; + this.updaterIdentity = updaterIdentity; + + if (this.entity == null) { + throw new IllegalArgumentException("Entity is required"); + } + + if (this.entityId == null || this.entityId.trim().isEmpty()) { + throw new IllegalArgumentException("Entity id is required"); + } + + if (this.updaterIdentity == null || this.updaterIdentity.trim().isEmpty()) { + throw new IllegalArgumentException("Updater identity is required"); + } + } + + @Override + public T getEntity() { + return entity; + } + + @Override + public String getEntityId() { + return entityId; + } + + @Override + public String updaterIdentity() { + return updaterIdentity; + } +} diff --git a/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-entity-service/src/main/java/org/apache/nifi/registry/revision/entity/StandardRevisableEntityService.java b/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-entity-service/src/main/java/org/apache/nifi/registry/revision/entity/StandardRevisableEntityService.java index fa3d4f5..a0f0ab7 100644 --- a/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-entity-service/src/main/java/org/apache/nifi/registry/revision/entity/StandardRevisableEntityService.java +++ b/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-entity-service/src/main/java/org/apache/nifi/registry/revision/entity/StandardRevisableEntityService.java @@ -22,7 +22,7 @@ import org.apache.nifi.registry.revision.api.RevisionClaim; import org.apache.nifi.registry.revision.api.RevisionManager; import org.apache.nifi.registry.revision.api.RevisionUpdate; import org.apache.nifi.registry.revision.standard.StandardRevisionClaim; -import org.apache.nifi.registry.revision.standard.StandardRevisionUpdate; +import org.apache.nifi.registry.revision.standard.StandardUpdateResult; import java.util.Collection; import java.util.List; @@ -54,26 +54,7 @@ public class StandardRevisableEntityService implements RevisableEntityService { throw new IllegalArgumentException("A revision version of 0 must be specified when creating a new entity"); } - if (creatorIdentity == null || creatorIdentity.trim().isEmpty()) { - throw new IllegalArgumentException("Creator identity is required"); - } - - final Revision revision = createRevision(requestEntity.getIdentifier(), requestEntity.getRevision()); - final RevisionClaim claim = new StandardRevisionClaim(revision); - - final RevisionUpdate<T> revisionUpdate = revisionManager.updateRevision(claim, () -> { - final T updatedEntity = createEntity.get(); - - final Revision updatedRevision = revision.incrementRevision(revision.getClientId()); - final EntityModification entityModification = new EntityModification(updatedRevision, creatorIdentity); - - final RevisionInfo updatedRevisionInfo = createRevisionInfo(updatedRevision, entityModification); - updatedEntity.setRevision(updatedRevisionInfo); - - return new StandardRevisionUpdate<>(updatedEntity, entityModification); - }); - - return revisionUpdate.getEntity(); + return createOrUpdate(requestEntity, creatorIdentity, createEntity); } @Override @@ -94,6 +75,10 @@ public class StandardRevisableEntityService implements RevisableEntityService { @Override public <T extends RevisableEntity> T update(final T requestEntity, final String updaterIdentity, final Supplier<T> updateEntity) { + return createOrUpdate(requestEntity, updaterIdentity, updateEntity); + } + + private <T extends RevisableEntity> T createOrUpdate(final T requestEntity, final String userIdentity, final Supplier<T> createOrUpdateEntity) { if (requestEntity == null) { throw new IllegalArgumentException("Request entity is required"); } @@ -102,26 +87,21 @@ public class StandardRevisableEntityService implements RevisableEntityService { throw new IllegalArgumentException("Revision info is required"); } - if (updaterIdentity == null || updaterIdentity.trim().isEmpty()) { - throw new IllegalArgumentException("Updater identity is required"); + if (userIdentity == null || userIdentity.trim().isEmpty()) { + throw new IllegalArgumentException("User identity is required"); } final Revision revision = createRevision(requestEntity.getIdentifier(), requestEntity.getRevision()); final RevisionClaim claim = new StandardRevisionClaim(revision); final RevisionUpdate<T> revisionUpdate = revisionManager.updateRevision(claim, () -> { - final T updatedEntity = updateEntity.get(); - - final Revision updatedRevision = revisionManager.getRevision(requestEntity.getIdentifier()).incrementRevision(revision.getClientId()); - final EntityModification entityModification = new EntityModification(updatedRevision, updaterIdentity); - - final RevisionInfo updatedRevisionInfo = createRevisionInfo(updatedRevision, entityModification); - updatedEntity.setRevision(updatedRevisionInfo); - - return new StandardRevisionUpdate<>(updatedEntity, entityModification); + final T updatedEntity = createOrUpdateEntity.get(); + return new StandardUpdateResult<>(updatedEntity, updatedEntity.getIdentifier(), userIdentity); }); - return revisionUpdate.getEntity(); + final T resultEntity = revisionUpdate.getEntity(); + resultEntity.setRevision(createRevisionInfo(revisionUpdate.getLastModification())); + return resultEntity; } @Override @@ -199,6 +179,10 @@ public class StandardRevisableEntityService implements RevisableEntityService { return createRevisionInfo(revision, null); } + private RevisionInfo createRevisionInfo(final EntityModification entityModification) { + return createRevisionInfo(entityModification.getRevision(), entityModification); + } + private RevisionInfo createRevisionInfo(final Revision revision, final EntityModification entityModification) { final RevisionInfo revisionInfo = new RevisionInfo(); revisionInfo.setVersion(revision.getVersion()); diff --git a/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-spring-jdbc/src/main/java/org/apache/nifi/registry/revision/jdbc/JdbcRevisionManager.java b/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-spring-jdbc/src/main/java/org/apache/nifi/registry/revision/jdbc/JdbcRevisionManager.java index 74b2393..9aad556 100644 --- a/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-spring-jdbc/src/main/java/org/apache/nifi/registry/revision/jdbc/JdbcRevisionManager.java +++ b/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-spring-jdbc/src/main/java/org/apache/nifi/registry/revision/jdbc/JdbcRevisionManager.java @@ -17,14 +17,17 @@ package org.apache.nifi.registry.revision.jdbc; import org.apache.nifi.registry.revision.api.DeleteRevisionTask; +import org.apache.nifi.registry.revision.api.EntityModification; import org.apache.nifi.registry.revision.api.ExpiredRevisionClaimException; import org.apache.nifi.registry.revision.api.InvalidRevisionException; import org.apache.nifi.registry.revision.api.Revision; import org.apache.nifi.registry.revision.api.RevisionClaim; import org.apache.nifi.registry.revision.api.RevisionManager; import org.apache.nifi.registry.revision.api.RevisionUpdate; +import org.apache.nifi.registry.revision.api.UpdateResult; import org.apache.nifi.registry.revision.api.UpdateRevisionTask; import org.apache.nifi.registry.revision.standard.RevisionComparator; +import org.apache.nifi.registry.revision.standard.StandardRevisionUpdate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.dao.EmptyResultDataAccessException; @@ -33,9 +36,11 @@ import org.springframework.jdbc.core.JdbcTemplate; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; /** * A database implementation of {@link RevisionManager} that use's Spring's {@link JdbcTemplate}. @@ -105,20 +110,35 @@ public class JdbcRevisionManager implements RevisionManager { // Since we are in transaction these changes won't be committed unless the entire task completes successfully. // It is important this happens first so that the task won't execute unless the revision can be updated. // This prevents any other changes from happening that might not be part of the database transaction. + final Set<Revision> incrementedRevisions = new HashSet<>(); for (final Revision incomingRevision : revisionList) { + final String entityId = incomingRevision.getEntityId(); + // calling getRevision here will lazily create an initial revision - getRevision(incomingRevision.getEntityId()); + getRevision(entityId); updateRevision(incomingRevision); + + // retrieve the updated revision since the incoming revision may have matched on the client id + // and may not have the latest version which we want to return with the result + final Revision incrementedRevision = getRevision(entityId); + incrementedRevisions.add(incrementedRevision); } // We successfully verified all revisions. LOGGER.debug("Successfully verified Revision Claim for all revisions"); // Perform the update - final RevisionUpdate<T> updatedEntity = task.update(); + final UpdateResult<T> updateResult = task.update(); LOGGER.debug("Update task completed"); - return updatedEntity; + // Create the result with the updated entity and updated revisions + final T updatedEntity = updateResult.getEntity(); + final String updaterIdentity = updateResult.updaterIdentity(); + + final Revision updatedEntityRevision = getRevision(updateResult.getEntityId()); + final EntityModification entityModification = new EntityModification(updatedEntityRevision, updaterIdentity); + + return new StandardRevisionUpdate<>(updatedEntity, entityModification, incrementedRevisions); } /* diff --git a/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-spring-jdbc/src/test/java/org/apache/nifi/registry/revision/jdbc/TestJdbcRevisionManager.java b/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-spring-jdbc/src/test/java/org/apache/nifi/registry/revision/jdbc/TestJdbcRevisionManager.java index 3eecc9a..cbb7e46 100644 --- a/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-spring-jdbc/src/test/java/org/apache/nifi/registry/revision/jdbc/TestJdbcRevisionManager.java +++ b/nifi-registry-core/nifi-registry-revision/nifi-registry-revision-spring-jdbc/src/test/java/org/apache/nifi/registry/revision/jdbc/TestJdbcRevisionManager.java @@ -26,7 +26,7 @@ import org.apache.nifi.registry.revision.api.RevisionManager; import org.apache.nifi.registry.revision.api.RevisionUpdate; import org.apache.nifi.registry.revision.api.UpdateRevisionTask; import org.apache.nifi.registry.revision.standard.StandardRevisionClaim; -import org.apache.nifi.registry.revision.standard.StandardRevisionUpdate; +import org.apache.nifi.registry.revision.standard.StandardUpdateResult; import org.flywaydb.core.internal.jdbc.DatabaseType; import org.junit.Assert; import org.junit.Before; @@ -350,12 +350,7 @@ public class TestJdbcRevisionManager { final RevisableEntity entity = new RevisableEntity(); entity.setId(entityId); - // get the latest revision which has already been incremented - final Revision updatedRevision = revisionManager.getRevision(entity.getId()); - entity.setRevision(updatedRevision); - - final EntityModification entityModification = new EntityModification(updatedRevision, "user1"); - return new StandardRevisionUpdate<>(entity, entityModification); + return new StandardUpdateResult<>(entity, entityId,"user1"); }; } @@ -366,18 +361,17 @@ public class TestJdbcRevisionManager { assertNotNull(updatedEntity); assertEquals(entityId, updatedEntity.getId()); - // verify the revision in the entity is set and is the updated revision (i.e. version of 100, not 99) - final Revision updatedRevision = updatedEntity.getRevision(); - assertNotNull(updatedRevision); - assertEquals(entityId, updatedRevision.getEntityId()); - assertEquals(expectedVersion, updatedRevision.getVersion()); - assertEquals(expectedClientId, updatedRevision.getClientId()); - // verify the entity modification is correctly populated final EntityModification entityModification = revisionUpdate.getLastModification(); assertNotNull(entityModification); Assert.assertEquals("user1", entityModification.getLastModifier()); - assertEquals(updatedRevision, entityModification.getRevision()); + + // verify the revision in the entity modification is set and is the updated revision (i.e. version of 100, not 99) + final Revision updatedRevision = entityModification.getRevision(); + assertNotNull(updatedRevision); + assertEquals(entityId, updatedRevision.getEntityId()); + assertEquals(expectedVersion, updatedRevision.getVersion()); + assertEquals(expectedClientId, updatedRevision.getClientId()); // verify the updated revisions is correctly populated and matches the updated entity revision final Set<Revision> updatedRevisions = revisionUpdate.getUpdatedRevisions();