http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/FlowSnapshotEntityV1.java ---------------------------------------------------------------------- diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/FlowSnapshotEntityV1.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/FlowSnapshotEntityV1.java new file mode 100644 index 0000000..ec6b9a5 --- /dev/null +++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/FlowSnapshotEntityV1.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.db.migration; + +import java.util.Date; +import java.util.Objects; + +/** + * FlowSnapshot DB entity from the original database schema in 0.1.0, used for migration purposes. + */ +public class FlowSnapshotEntityV1 { + + private String flowId; + + private Integer version; + + private Date created; + + private String createdBy; + + private String comments; + + public String getFlowId() { + return flowId; + } + + public void setFlowId(String flowId) { + this.flowId = flowId; + } + + public Integer getVersion() { + return version; + } + + public void setVersion(Integer version) { + this.version = version; + } + + public Date getCreated() { + return created; + } + + public void setCreated(Date created) { + this.created = created; + } + + public String getCreatedBy() { + return createdBy; + } + + public void setCreatedBy(String createdBy) { + this.createdBy = createdBy; + } + + public String getComments() { + return comments; + } + + public void setComments(String comments) { + this.comments = comments; + } + + @Override + public int hashCode() { + return Objects.hash(this.flowId, this.version); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + + if (!(obj instanceof FlowSnapshotEntityV1)) { + return false; + } + + final FlowSnapshotEntityV1 other = (FlowSnapshotEntityV1) obj; + return Objects.equals(this.flowId, other.flowId) && Objects.equals(this.version, other.version); + } + +}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyDataSourceFactory.java ---------------------------------------------------------------------- diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyDataSourceFactory.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyDataSourceFactory.java new file mode 100644 index 0000000..72d3acf --- /dev/null +++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyDataSourceFactory.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.db.migration; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.registry.properties.NiFiRegistryProperties; +import org.h2.jdbcx.JdbcConnectionPool; + +import javax.sql.DataSource; +import java.io.File; + +/** + * NOTE: This DataSource factory was used in the original 0.1.0 release and remains to migrate data from the old database. + * This class is intentionally not a Spring bean, and will be used manually in the custom Flyway migration. + */ +public class LegacyDataSourceFactory { + + private static final String DB_USERNAME_PASSWORD = "nifireg"; + private static final int MAX_CONNECTIONS = 5; + + // database file name + private static final String DATABASE_FILE_NAME = "nifi-registry"; + + private final NiFiRegistryProperties properties; + + private JdbcConnectionPool connectionPool; + + public LegacyDataSourceFactory(final NiFiRegistryProperties properties) { + this.properties = properties; + } + + public DataSource getDataSource() { + if (connectionPool == null) { + final String databaseUrl = getDatabaseUrl(properties); + connectionPool = JdbcConnectionPool.create(databaseUrl, DB_USERNAME_PASSWORD, DB_USERNAME_PASSWORD); + connectionPool.setMaxConnections(MAX_CONNECTIONS); + } + + return connectionPool; + } + + public static String getDatabaseUrl(final NiFiRegistryProperties properties) { + // locate the repository directory + final String repositoryDirectoryPath = properties.getLegacyDatabaseDirectory(); + + // ensure the repository directory is specified + if (repositoryDirectoryPath == null) { + throw new NullPointerException("Database directory must be specified."); + } + + // create a handle to the repository directory + final File repositoryDirectory = new File(repositoryDirectoryPath); + + // get a handle to the database file + final File databaseFile = new File(repositoryDirectory, DATABASE_FILE_NAME); + + // format the database url + String databaseUrl = "jdbc:h2:" + databaseFile + ";AUTOCOMMIT=OFF;DB_CLOSE_ON_EXIT=FALSE;LOCK_MODE=3"; + String databaseUrlAppend = properties.getLegacyDatabaseUrlAppend(); + if (StringUtils.isNotBlank(databaseUrlAppend)) { + databaseUrl += databaseUrlAppend; + } + + return databaseUrl; + } + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyDatabaseService.java ---------------------------------------------------------------------- diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyDatabaseService.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyDatabaseService.java new file mode 100644 index 0000000..533fadd --- /dev/null +++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyDatabaseService.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.db.migration; + +import org.springframework.jdbc.core.JdbcTemplate; + +import javax.sql.DataSource; +import java.util.List; + +/** + * Service used to load data from original database used in the 0.1.0 release. + */ +public class LegacyDatabaseService { + + private final JdbcTemplate jdbcTemplate; + + public LegacyDatabaseService(final DataSource dataSource) { + this.jdbcTemplate = new JdbcTemplate(dataSource); + } + + public List<BucketEntityV1> getAllBuckets() { + final String sql = "SELECT * FROM bucket ORDER BY name ASC"; + + return jdbcTemplate.query(sql, (rs, i) -> { + final BucketEntityV1 b = new BucketEntityV1(); + b.setId(rs.getString("ID")); + b.setName(rs.getString("NAME")); + b.setDescription(rs.getString("DESCRIPTION")); + b.setCreated(rs.getTimestamp("CREATED")); + return b; + }); + } + + public List<FlowEntityV1> getAllFlows() { + final String sql = "SELECT * FROM flow f, bucket_item item WHERE item.id = f.id"; + + return jdbcTemplate.query(sql, (rs, i) -> { + final FlowEntityV1 flowEntity = new FlowEntityV1(); + flowEntity.setId(rs.getString("ID")); + flowEntity.setName(rs.getString("NAME")); + flowEntity.setDescription(rs.getString("DESCRIPTION")); + flowEntity.setCreated(rs.getTimestamp("CREATED")); + flowEntity.setModified(rs.getTimestamp("MODIFIED")); + flowEntity.setBucketId(rs.getString("BUCKET_ID")); + return flowEntity; + }); + } + + public List<FlowSnapshotEntityV1> getAllFlowSnapshots() { + final String sql = "SELECT * FROM flow_snapshot fs"; + + return jdbcTemplate.query(sql, (rs, i) -> { + final FlowSnapshotEntityV1 fs = new FlowSnapshotEntityV1(); + fs.setFlowId(rs.getString("FLOW_ID")); + fs.setVersion(rs.getInt("VERSION")); + fs.setCreated(rs.getTimestamp("CREATED")); + fs.setCreatedBy(rs.getString("CREATED_BY")); + fs.setComments(rs.getString("COMMENTS")); + return fs; + }); + } + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyEntityMapper.java ---------------------------------------------------------------------- diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyEntityMapper.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyEntityMapper.java new file mode 100644 index 0000000..bf82aae --- /dev/null +++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyEntityMapper.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.db.migration; + +import org.apache.nifi.registry.db.entity.BucketEntity; +import org.apache.nifi.registry.db.entity.BucketItemEntityType; +import org.apache.nifi.registry.db.entity.FlowEntity; +import org.apache.nifi.registry.db.entity.FlowSnapshotEntity; + +/** + * Utility methods to map legacy DB entities to current DB entities. + * + * The initial implementations of these mappings will be almost a direct translation, but if future changes are made + * to the original tables these methods will handle the translation from old entity to new entity. + */ +public class LegacyEntityMapper { + + public static BucketEntity createBucketEntity(final BucketEntityV1 bucketEntityV1) { + final BucketEntity bucketEntity = new BucketEntity(); + bucketEntity.setId(bucketEntityV1.getId()); + bucketEntity.setName(bucketEntityV1.getName()); + bucketEntity.setDescription(bucketEntityV1.getDescription()); + bucketEntity.setCreated(bucketEntityV1.getCreated()); + return bucketEntity; + } + + public static FlowEntity createFlowEntity(final FlowEntityV1 flowEntityV1) { + final FlowEntity flowEntity = new FlowEntity(); + flowEntity.setId(flowEntityV1.getId()); + flowEntity.setName(flowEntityV1.getName()); + flowEntity.setDescription(flowEntityV1.getDescription()); + flowEntity.setCreated(flowEntityV1.getCreated()); + flowEntity.setModified(flowEntityV1.getModified()); + flowEntity.setBucketId(flowEntityV1.getBucketId()); + flowEntity.setType(BucketItemEntityType.FLOW); + return flowEntity; + } + + public static FlowSnapshotEntity createFlowSnapshotEntity(final FlowSnapshotEntityV1 flowSnapshotEntityV1) { + final FlowSnapshotEntity flowSnapshotEntity = new FlowSnapshotEntity(); + flowSnapshotEntity.setFlowId(flowSnapshotEntityV1.getFlowId()); + flowSnapshotEntity.setVersion(flowSnapshotEntityV1.getVersion()); + flowSnapshotEntity.setComments(flowSnapshotEntityV1.getComments()); + flowSnapshotEntity.setCreated(flowSnapshotEntityV1.getCreated()); + flowSnapshotEntity.setCreatedBy(flowSnapshotEntityV1.getCreatedBy()); + return flowSnapshotEntity; + } + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventFactory.java ---------------------------------------------------------------------- diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventFactory.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventFactory.java new file mode 100644 index 0000000..b837d6d --- /dev/null +++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventFactory.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.event; + +import org.apache.nifi.registry.bucket.Bucket; +import org.apache.nifi.registry.flow.VersionedFlow; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; +import org.apache.nifi.registry.hook.Event; +import org.apache.nifi.registry.hook.EventFieldName; +import org.apache.nifi.registry.hook.EventType; +import org.apache.nifi.registry.security.authorization.user.NiFiUserUtils; + +/** + * Factory to create Events from domain objects. + */ +public class EventFactory { + + public static Event bucketCreated(final Bucket bucket) { + return new StandardEvent.Builder() + .eventType(EventType.CREATE_BUCKET) + .addField(EventFieldName.BUCKET_ID, bucket.getIdentifier()) + .addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity()) + .build(); + } + + public static Event bucketUpdated(final Bucket bucket) { + return new StandardEvent.Builder() + .eventType(EventType.UPDATE_BUCKET) + .addField(EventFieldName.BUCKET_ID, bucket.getIdentifier()) + .addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity()) + .build(); + } + + public static Event bucketDeleted(final Bucket bucket) { + return new StandardEvent.Builder() + .eventType(EventType.DELETE_BUCKET) + .addField(EventFieldName.BUCKET_ID, bucket.getIdentifier()) + .addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity()) + .build(); + } + + public static Event flowCreated(final VersionedFlow versionedFlow) { + return new StandardEvent.Builder() + .eventType(EventType.CREATE_FLOW) + .addField(EventFieldName.BUCKET_ID, versionedFlow.getBucketIdentifier()) + .addField(EventFieldName.FLOW_ID, versionedFlow.getIdentifier()) + .addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity()) + .build(); + } + + public static Event flowUpdated(final VersionedFlow versionedFlow) { + return new StandardEvent.Builder() + .eventType(EventType.UPDATE_FLOW) + .addField(EventFieldName.BUCKET_ID, versionedFlow.getBucketIdentifier()) + .addField(EventFieldName.FLOW_ID, versionedFlow.getIdentifier()) + .addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity()) + .build(); + } + + public static Event flowDeleted(final VersionedFlow versionedFlow) { + return new StandardEvent.Builder() + .eventType(EventType.DELETE_FLOW) + .addField(EventFieldName.BUCKET_ID, versionedFlow.getBucketIdentifier()) + .addField(EventFieldName.FLOW_ID, versionedFlow.getIdentifier()) + .addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity()) + .build(); + } + + public static Event flowVersionCreated(final VersionedFlowSnapshot versionedFlowSnapshot) { + final String versionComments = versionedFlowSnapshot.getSnapshotMetadata().getComments() == null + ? "" : versionedFlowSnapshot.getSnapshotMetadata().getComments(); + + return new StandardEvent.Builder() + .eventType(EventType.CREATE_FLOW_VERSION) + .addField(EventFieldName.BUCKET_ID, versionedFlowSnapshot.getSnapshotMetadata().getBucketIdentifier()) + .addField(EventFieldName.FLOW_ID, versionedFlowSnapshot.getSnapshotMetadata().getFlowIdentifier()) + .addField(EventFieldName.VERSION, String.valueOf(versionedFlowSnapshot.getSnapshotMetadata().getVersion())) + .addField(EventFieldName.USER, versionedFlowSnapshot.getSnapshotMetadata().getAuthor()) + .addField(EventFieldName.COMMENT, versionComments) + .build(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventService.java ---------------------------------------------------------------------- diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventService.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventService.java new file mode 100644 index 0000000..8a11493 --- /dev/null +++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventService.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.event; + +import org.apache.nifi.registry.hook.Event; +import org.apache.nifi.registry.hook.EventHookProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * Service used for publishing events and passing events to the hook providers. + */ +@Service +public class EventService implements DisposableBean { + + private static final Logger LOGGER = LoggerFactory.getLogger(EventService.class); + + // Should only be a few events in the queue at a time, but setting a capacity just so it isn't unbounded + static final int EVENT_QUEUE_SIZE = 10_000; + + private final BlockingQueue<Event> eventQueue; + private final ExecutorService scheduledExecutorService; + private final List<EventHookProvider> eventHookProviders; + + @Autowired + public EventService(final List<EventHookProvider> eventHookProviders) { + this.eventQueue = new LinkedBlockingQueue<>(EVENT_QUEUE_SIZE); + this.scheduledExecutorService = Executors.newSingleThreadExecutor(); + this.eventHookProviders = new ArrayList<>(eventHookProviders); + } + + @PostConstruct + public void postConstruct() { + LOGGER.info("Starting event consumer..."); + + this.scheduledExecutorService.execute(() -> { + while (!Thread.interrupted()) { + try { + final Event event = eventQueue.poll(1000, TimeUnit.MILLISECONDS); + if (event == null) { + continue; + } + + // event was available so notify each provider, contain errors per-provider + for(final EventHookProvider provider : eventHookProviders) { + try { + if (event.getEventType() == null + || (event.getEventType() != null && provider.shouldHandle(event.getEventType()))) { + provider.handle(event); + } + } catch (Exception e) { + LOGGER.error("Error handling event hook", e); + } + } + } catch (InterruptedException e) { + LOGGER.warn("Interrupted while polling event queue"); + return; + } + } + }); + + LOGGER.info("Event consumer started!"); + } + + @Override + public void destroy() throws Exception { + LOGGER.info("Shutting down event consumer..."); + this.scheduledExecutorService.shutdownNow(); + LOGGER.info("Event consumer shutdown!"); + } + + public void publish(final Event event) { + if (event == null) { + return; + } + + try { + event.validate(); + + final boolean queued = eventQueue.offer(event); + if (!queued) { + LOGGER.error("Unable to queue event because queue is full"); + } + } catch (IllegalStateException e) { + LOGGER.error("Invalid event due to: " + e.getMessage(), e); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEvent.java ---------------------------------------------------------------------- diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEvent.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEvent.java new file mode 100644 index 0000000..4ad459d --- /dev/null +++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEvent.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.event; + +import org.apache.commons.lang3.Validate; +import org.apache.nifi.registry.hook.Event; +import org.apache.nifi.registry.hook.EventField; +import org.apache.nifi.registry.hook.EventFieldName; +import org.apache.nifi.registry.hook.EventType; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * Standard implementation of Event. + */ +public class StandardEvent implements Event { + + private final EventType eventType; + + private final List<EventField> eventFields; + + private StandardEvent(final Builder builder) { + this.eventType = builder.eventType; + this.eventFields = Collections.unmodifiableList(builder.eventFields == null + ? Collections.emptyList() : new ArrayList<>(builder.eventFields)); + Validate.notNull(this.eventType); + } + + @Override + public EventType getEventType() { + return eventType; + } + + @Override + public List<EventField> getFields() { + return eventFields; + } + + @Override + public EventField getField(final EventFieldName fieldName) { + if (fieldName == null) { + return null; + } + + return eventFields.stream().filter(e -> fieldName.equals(e.getName())).findFirst().orElse(null); + } + + @Override + public void validate() throws IllegalStateException { + final int numProvidedFields = eventFields.size(); + final int numRequiredFields = eventType.getFieldNames().size(); + + if (numProvidedFields != numRequiredFields) { + throw new IllegalStateException(numRequiredFields + " fields were required, but only " + numProvidedFields + " were provided"); + } + + for (int i=0; i < numRequiredFields; i++) { + final EventFieldName required = eventType.getFieldNames().get(i); + final EventFieldName provided = eventFields.get(i).getName(); + if (!required.equals(provided)) { + throw new IllegalStateException("Expected " + required.name() + ", but found " + provided.name()); + } + } + } + + /** + * Builder for Events. + */ + public static class Builder { + + private EventType eventType; + private List<EventField> eventFields = new ArrayList<>(); + + public Builder eventType(final EventType eventType) { + this.eventType = eventType; + return this; + } + + public Builder addField(final EventFieldName name, final String value) { + this.eventFields.add(new StandardEventField(name, value)); + return this; + } + + public Builder addField(final EventField arg) { + if (arg != null) { + this.eventFields.add(arg); + } + return this; + } + + public Builder addFields(final Collection<EventField> fields) { + if (fields != null) { + this.eventFields.addAll(fields); + } + return this; + } + + public Builder clearFields() { + this.eventFields.clear(); + return this; + } + + public Event build() { + return new StandardEvent(this); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEventField.java ---------------------------------------------------------------------- diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEventField.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEventField.java new file mode 100644 index 0000000..21266bb --- /dev/null +++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEventField.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.event; + +import org.apache.commons.lang3.Validate; +import org.apache.nifi.registry.hook.EventField; +import org.apache.nifi.registry.hook.EventFieldName; + +/** + * Standard implementation of EventField. + */ +public class StandardEventField implements EventField { + + private final EventFieldName name; + + private final String value; + + public StandardEventField(final EventFieldName name, final String value) { + this.name = name; + this.value = value; + Validate.notNull(this.name); + Validate.notNull(this.value); + } + + @Override + public EventFieldName getName() { + return name; + } + + @Override + public String getValue() { + return value; + } + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/AdministrationException.java ---------------------------------------------------------------------- diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/AdministrationException.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/AdministrationException.java new file mode 100644 index 0000000..8f9180c --- /dev/null +++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/AdministrationException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.exception; + +/** + * + */ +public class AdministrationException extends RuntimeException { + + public AdministrationException(Throwable cause) { + super(cause); + } + + public AdministrationException(String message, Throwable cause) { + super(message, cause); + } + + public AdministrationException(String message) { + super(message); + } + + public AdministrationException() { + } + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/ResourceNotFoundException.java ---------------------------------------------------------------------- diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/ResourceNotFoundException.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/ResourceNotFoundException.java new file mode 100644 index 0000000..a83e9e2 --- /dev/null +++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/ResourceNotFoundException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.exception; + +/** + * An exception that is thrown when an entity is not found. + */ +public class ResourceNotFoundException extends RuntimeException { + + public ResourceNotFoundException(String message) { + super(message); + } + + public ResourceNotFoundException(String message, Throwable cause) { + super(message, cause); + } + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionCloseable.java ---------------------------------------------------------------------- diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionCloseable.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionCloseable.java new file mode 100644 index 0000000..b24f950 --- /dev/null +++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionCloseable.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.extension; + +import java.io.Closeable; +import java.io.IOException; + +public class ExtensionCloseable implements Closeable { + private final ClassLoader toSet; + + private ExtensionCloseable(ClassLoader toSet) { + this.toSet = toSet; + } + + public static ExtensionCloseable withComponentClassLoader(final ExtensionManager manager, final Class componentClass) { + + final ClassLoader current = Thread.currentThread().getContextClassLoader(); + final ExtensionCloseable closeable = new ExtensionCloseable(current); + + ClassLoader componentClassLoader = manager.getExtensionClassLoader(componentClass.getName()); + if (componentClassLoader == null) { + componentClassLoader = componentClass.getClassLoader(); + } + + Thread.currentThread().setContextClassLoader(componentClassLoader); + return closeable; + } + + @Override + public void close() throws IOException { + if (toSet != null) { + Thread.currentThread().setContextClassLoader(toSet); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionManager.java ---------------------------------------------------------------------- diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionManager.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionManager.java new file mode 100644 index 0000000..ca3259d --- /dev/null +++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionManager.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.extension; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.registry.hook.EventHookProvider; +import org.apache.nifi.registry.security.authentication.IdentityProvider; +import org.apache.nifi.registry.security.authorization.AccessPolicyProvider; +import org.apache.nifi.registry.security.authorization.Authorizer; +import org.apache.nifi.registry.security.authorization.UserGroupProvider; +import org.apache.nifi.registry.flow.FlowPersistenceProvider; +import org.apache.nifi.registry.properties.NiFiRegistryProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.io.File; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +@Component +public class ExtensionManager { + + static final Logger LOGGER = LoggerFactory.getLogger(ExtensionManager.class); + + private static final List<Class> EXTENSION_CLASSES; + static { + final List<Class> classes = new ArrayList<>(); + classes.add(FlowPersistenceProvider.class); + classes.add(UserGroupProvider.class); + classes.add(AccessPolicyProvider.class); + classes.add(Authorizer.class); + classes.add(IdentityProvider.class); + classes.add(EventHookProvider.class); + EXTENSION_CLASSES = Collections.unmodifiableList(classes); + } + + private final NiFiRegistryProperties properties; + private final Map<String,ExtensionClassLoader> classLoaderMap = new HashMap<>(); + private final AtomicBoolean loaded = new AtomicBoolean(false); + + @Autowired + public ExtensionManager(final NiFiRegistryProperties properties) { + this.properties = properties; + } + + @PostConstruct + public synchronized void discoverExtensions() { + if (!loaded.get()) { + // get the list of class loaders to consider + final List<ExtensionClassLoader> classLoaders = getClassLoaders(); + + // for each class loader, attempt to load each extension class using the ServiceLoader + for (final ExtensionClassLoader extensionClassLoader : classLoaders) { + for (final Class extensionClass : EXTENSION_CLASSES) { + loadExtensions(extensionClass, extensionClassLoader); + } + } + + loaded.set(true); + } + } + + public ClassLoader getExtensionClassLoader(final String canonicalClassName) { + if (StringUtils.isBlank(canonicalClassName)) { + throw new IllegalArgumentException("Class name can not be null"); + } + + return classLoaderMap.get(canonicalClassName); + } + + /** + * Loads implementations of the given extension class from the given class loader. + * + * @param extensionClass the extension/service class + * @param extensionClassLoader the class loader to search + */ + private void loadExtensions(final Class extensionClass, final ExtensionClassLoader extensionClassLoader) { + final ServiceLoader<?> serviceLoader = ServiceLoader.load(extensionClass, extensionClassLoader); + for (final Object o : serviceLoader) { + final String extensionClassName = o.getClass().getCanonicalName(); + if (classLoaderMap.containsKey(extensionClassName)) { + final String currDir = extensionClassLoader.getRootDir(); + final String existingDir = classLoaderMap.get(extensionClassName).getRootDir(); + LOGGER.warn("Skipping {} from {} which was already found in {}", new Object[]{extensionClassName, currDir, existingDir}); + } else { + classLoaderMap.put(o.getClass().getCanonicalName(), extensionClassLoader); + } + } + } + + /** + * Gets all of the class loaders to consider for loading extensions. + * + * Includes the class loader of the web-app running the framework, plus a class loader for each additional + * directory specified in nifi-registry.properties. + * + * @return a list of extension class loaders + */ + private List<ExtensionClassLoader> getClassLoaders() { + final List<ExtensionClassLoader> classLoaders = new ArrayList<>(); + + // start with the class loader that loaded ExtensionManager, should be WebAppClassLoader for API WAR + final ExtensionClassLoader frameworkClassLoader = new ExtensionClassLoader("web-api", new URL[0], this.getClass().getClassLoader()); + classLoaders.add(frameworkClassLoader); + + // we want to use the system class loader as the parent of the extension class loaders + ClassLoader systemClassLoader = FlowPersistenceProvider.class.getClassLoader(); + + // add a class loader for each extension dir + final Set<String> extensionDirs = properties.getExtensionsDirs(); + for (final String dir : extensionDirs) { + if (!StringUtils.isBlank(dir)) { + final ExtensionClassLoader classLoader = createClassLoader(dir, systemClassLoader); + if (classLoader != null) { + classLoaders.add(classLoader); + } + } + } + + return classLoaders; + } + + /** + * Creates a class loader for the given directory of resources. + * + * @param dir the dir of resources to add to the class loader + * @param parentClassLoader the parent class loader + * @return a class loader including all of the resources in the given dir, with the specified parent class loader + */ + private ExtensionClassLoader createClassLoader(final String dir, final ClassLoader parentClassLoader) { + final File dirFile = new File(dir); + + if (!dirFile.exists()) { + LOGGER.warn("Skipping extension directory that does not exist: " + dir); + return null; + } + + if (!dirFile.canRead()) { + LOGGER.warn("Skipping extension directory that can not be read: " + dir); + return null; + } + + final List<URL> resources = new LinkedList<>(); + + try { + resources.add(dirFile.toURI().toURL()); + } catch (final MalformedURLException mfe) { + LOGGER.warn("Unable to add {} to classpath due to {}", + new Object[]{ dirFile.getAbsolutePath(), mfe.getMessage()}, mfe); + } + + if (dirFile.isDirectory()) { + final File[] files = dirFile.listFiles(); + if (files != null) { + for (final File resource : files) { + if (resource.isDirectory()) { + LOGGER.warn("Recursive directories are not supported, skipping " + resource.getAbsolutePath()); + } else { + try { + resources.add(resource.toURI().toURL()); + } catch (final MalformedURLException mfe) { + LOGGER.warn("Unable to add {} to classpath due to {}", + new Object[]{ resource.getAbsolutePath(), mfe.getMessage()}, mfe); + } + } + } + } + } + + final URL[] urls = resources.toArray(new URL[resources.size()]); + return new ExtensionClassLoader(dir, urls, parentClassLoader); + } + + /** + * Extend URLClassLoader to keep track of the root directory. + */ + private static class ExtensionClassLoader extends URLClassLoader { + + private final String rootDir; + + public ExtensionClassLoader(final String rootDir, final URL[] urls, final ClassLoader parent) { + super(urls, parent); + this.rootDir = rootDir; + } + + public String getRootDir() { + return rootDir; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactory.java ---------------------------------------------------------------------- diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactory.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactory.java new file mode 100644 index 0000000..a3f3276 --- /dev/null +++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactory.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider; + +import java.util.List; + +import org.apache.nifi.registry.flow.FlowPersistenceProvider; +import org.apache.nifi.registry.hook.EventHookProvider; + +/** + * A factory for obtaining the configured providers. + */ +public interface ProviderFactory { + + /** + * Initialize the factory. + * + * @throws ProviderFactoryException if an error occurs during initialization + */ + void initialize() throws ProviderFactoryException; + + /** + * @return the configured FlowPersistenceProvider + */ + FlowPersistenceProvider getFlowPersistenceProvider(); + + /** + * @return the configured FlowHookProviders + */ + List<EventHookProvider> getEventHookProviders(); + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactoryException.java ---------------------------------------------------------------------- diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactoryException.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactoryException.java new file mode 100644 index 0000000..3842b9e --- /dev/null +++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactoryException.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider; + +/** + * An error that occurs while initializing a ProviderFactory. + */ +public class ProviderFactoryException extends RuntimeException { + + public ProviderFactoryException() { + } + + public ProviderFactoryException(String message) { + super(message); + } + + public ProviderFactoryException(String message, Throwable cause) { + super(message, cause); + } + + public ProviderFactoryException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderConfigurationContext.java ---------------------------------------------------------------------- diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderConfigurationContext.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderConfigurationContext.java new file mode 100644 index 0000000..8f186fd --- /dev/null +++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderConfigurationContext.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Standard configuration context to be passed to onConfigured method of Providers. + */ +public class StandardProviderConfigurationContext implements ProviderConfigurationContext { + + private final Map<String,String> properties; + + public StandardProviderConfigurationContext(final Map<String, String> properties) { + this.properties = Collections.unmodifiableMap(new HashMap<>(properties)); + } + + @Override + public Map<String, String> getProperties() { + return properties; + } + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java ---------------------------------------------------------------------- diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java new file mode 100644 index 0000000..65ba914 --- /dev/null +++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider; + +import org.apache.nifi.registry.extension.ExtensionManager; +import org.apache.nifi.registry.flow.FlowPersistenceProvider; +import org.apache.nifi.registry.hook.EventHookProvider; +import org.apache.nifi.registry.properties.NiFiRegistryProperties; +import org.apache.nifi.registry.provider.generated.Property; +import org.apache.nifi.registry.provider.generated.Providers; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.xml.sax.SAXException; + +import javax.annotation.PostConstruct; +import javax.xml.XMLConstants; +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBElement; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Unmarshaller; +import javax.xml.transform.stream.StreamSource; +import javax.xml.validation.Schema; +import javax.xml.validation.SchemaFactory; +import java.io.File; +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Standard implementation of ProviderFactory. + */ +@Configuration +public class StandardProviderFactory implements ProviderFactory { + + private static final Logger LOGGER = LoggerFactory.getLogger(StandardProviderFactory.class); + + private static final String PROVIDERS_XSD = "/providers.xsd"; + private static final String JAXB_GENERATED_PATH = "org.apache.nifi.registry.provider.generated"; + private static final JAXBContext JAXB_CONTEXT = initializeJaxbContext(); + + /** + * Load the JAXBContext. + */ + private static JAXBContext initializeJaxbContext() { + try { + return JAXBContext.newInstance(JAXB_GENERATED_PATH, StandardProviderFactory.class.getClassLoader()); + } catch (JAXBException e) { + throw new RuntimeException("Unable to create JAXBContext.", e); + } + } + + private final NiFiRegistryProperties properties; + private final ExtensionManager extensionManager; + private final AtomicReference<Providers> providersHolder = new AtomicReference<>(null); + + private FlowPersistenceProvider flowPersistenceProvider; + private List<EventHookProvider> eventHookProviders; + + @Autowired + public StandardProviderFactory(final NiFiRegistryProperties properties, final ExtensionManager extensionManager) { + this.properties = properties; + this.extensionManager = extensionManager; + + if (this.properties == null) { + throw new IllegalStateException("NiFiRegistryProperties cannot be null"); + } + + if (this.extensionManager == null) { + throw new IllegalStateException("ExtensionManager cannot be null"); + } + } + + @PostConstruct + @Override + public synchronized void initialize() throws ProviderFactoryException { + if (providersHolder.get() == null) { + final File providersConfigFile = properties.getProvidersConfigurationFile(); + if (providersConfigFile.exists()) { + try { + // find the schema + final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI); + final Schema schema = schemaFactory.newSchema(StandardProviderFactory.class.getResource(PROVIDERS_XSD)); + + // attempt to unmarshal + final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller(); + unmarshaller.setSchema(schema); + + // set the holder for later use + final JAXBElement<Providers> element = unmarshaller.unmarshal(new StreamSource(providersConfigFile), Providers.class); + providersHolder.set(element.getValue()); + } catch (SAXException | JAXBException e) { + throw new ProviderFactoryException("Unable to load the providers configuration file at: " + providersConfigFile.getAbsolutePath(), e); + } + } else { + throw new ProviderFactoryException("Unable to find the providers configuration file at " + providersConfigFile.getAbsolutePath()); + } + } + } + + @Bean + @Override + public synchronized FlowPersistenceProvider getFlowPersistenceProvider() { + if (flowPersistenceProvider == null) { + if (providersHolder.get() == null) { + throw new ProviderFactoryException("ProviderFactory must be initialized before obtaining a Provider"); + } + + final Providers providers = providersHolder.get(); + final org.apache.nifi.registry.provider.generated.Provider jaxbFlowProvider = providers.getFlowPersistenceProvider(); + final String flowProviderClassName = jaxbFlowProvider.getClazz(); + + try { + final ClassLoader classLoader = extensionManager.getExtensionClassLoader(flowProviderClassName); + if (classLoader == null) { + throw new IllegalStateException("Extension not found in any of the configured class loaders: " + flowProviderClassName); + } + + final Class<?> rawFlowProviderClass = Class.forName(flowProviderClassName, true, classLoader); + final Class<? extends FlowPersistenceProvider> flowProviderClass = rawFlowProviderClass.asSubclass(FlowPersistenceProvider.class); + + final Constructor constructor = flowProviderClass.getConstructor(); + flowPersistenceProvider = (FlowPersistenceProvider) constructor.newInstance(); + + LOGGER.info("Instantiated FlowPersistenceProvider with class name {}", new Object[] {flowProviderClassName}); + } catch (Exception e) { + throw new ProviderFactoryException("Error creating FlowPersistenceProvider with class name: " + flowProviderClassName, e); + } + + final ProviderConfigurationContext configurationContext = createConfigurationContext(jaxbFlowProvider.getProperty()); + flowPersistenceProvider.onConfigured(configurationContext); + LOGGER.info("Configured FlowPersistenceProvider with class name {}", new Object[] {flowProviderClassName}); + } + + return flowPersistenceProvider; + } + + @Bean + @Override + public List<EventHookProvider> getEventHookProviders() { + if (eventHookProviders == null) { + eventHookProviders = new ArrayList<>(); + + if (providersHolder.get() == null) { + throw new ProviderFactoryException("ProviderFactory must be initialized before obtaining a Provider"); + } + + final Providers providers = providersHolder.get(); + final List<org.apache.nifi.registry.provider.generated.Provider> jaxbHookProvider = providers.getEventHookProvider(); + + if(jaxbHookProvider == null || jaxbHookProvider.isEmpty()) { + // no hook provided + return eventHookProviders; + } + + for (org.apache.nifi.registry.provider.generated.Provider hookProvider : jaxbHookProvider) { + + final String hookProviderClassName = hookProvider.getClazz(); + EventHookProvider hook; + + try { + final ClassLoader classLoader = extensionManager.getExtensionClassLoader(hookProviderClassName); + if (classLoader == null) { + throw new IllegalStateException("Extension not found in any of the configured class loaders: " + hookProviderClassName); + } + + final Class<?> rawHookProviderClass = Class.forName(hookProviderClassName, true, classLoader); + final Class<? extends EventHookProvider> hookProviderClass = rawHookProviderClass.asSubclass(EventHookProvider.class); + + final Constructor constructor = hookProviderClass.getConstructor(); + hook = (EventHookProvider) constructor.newInstance(); + + LOGGER.info("Instantiated EventHookProvider with class name {}", new Object[] {hookProviderClassName}); + } catch (Exception e) { + throw new ProviderFactoryException("Error creating EventHookProvider with class name: " + hookProviderClassName, e); + } + + final ProviderConfigurationContext configurationContext = createConfigurationContext(hookProvider.getProperty()); + hook.onConfigured(configurationContext); + eventHookProviders.add(hook); + LOGGER.info("Configured EventHookProvider with class name {}", new Object[] {hookProviderClassName}); + } + } + + return eventHookProviders; + } + + private ProviderConfigurationContext createConfigurationContext(final List<Property> configProperties) { + final Map<String,String> properties = new HashMap<>(); + + if (configProperties != null) { + configProperties.stream().forEach(p -> properties.put(p.getName(), p.getValue())); + } + + return new StandardProviderConfigurationContext(properties); + } + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/FileSystemFlowPersistenceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/FileSystemFlowPersistenceProvider.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/FileSystemFlowPersistenceProvider.java new file mode 100644 index 0000000..071656d --- /dev/null +++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/FileSystemFlowPersistenceProvider.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider.flow; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.registry.flow.FlowPersistenceException; +import org.apache.nifi.registry.flow.FlowPersistenceProvider; +import org.apache.nifi.registry.flow.FlowSnapshotContext; +import org.apache.nifi.registry.provider.ProviderConfigurationContext; +import org.apache.nifi.registry.provider.ProviderCreationException; +import org.apache.nifi.registry.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; + +/** + * A FlowPersistenceProvider that uses the local filesystem for storage. + */ +public class FileSystemFlowPersistenceProvider implements FlowPersistenceProvider { + + static final Logger LOGGER = LoggerFactory.getLogger(FileSystemFlowPersistenceProvider.class); + + static final String FLOW_STORAGE_DIR_PROP = "Flow Storage Directory"; + + static final String SNAPSHOT_EXTENSION = ".snapshot"; + + private File flowStorageDir; + + @Override + public void onConfigured(final ProviderConfigurationContext configurationContext) throws ProviderCreationException { + final Map<String,String> props = configurationContext.getProperties(); + if (!props.containsKey(FLOW_STORAGE_DIR_PROP)) { + throw new ProviderCreationException("The property " + FLOW_STORAGE_DIR_PROP + " must be provided"); + } + + final String flowStorageDirValue = props.get(FLOW_STORAGE_DIR_PROP); + if (StringUtils.isBlank(flowStorageDirValue)) { + throw new ProviderCreationException("The property " + FLOW_STORAGE_DIR_PROP + " cannot be null or blank"); + } + + try { + flowStorageDir = new File(flowStorageDirValue); + FileUtils.ensureDirectoryExistAndCanReadAndWrite(flowStorageDir); + LOGGER.info("Configured FileSystemFlowPersistenceProvider with Flow Storage Directory {}", new Object[] {flowStorageDir.getAbsolutePath()}); + } catch (IOException e) { + throw new ProviderCreationException(e); + } + } + + @Override + public synchronized void saveFlowContent(final FlowSnapshotContext context, final byte[] content) throws FlowPersistenceException { + final File bucketDir = new File(flowStorageDir, context.getBucketId()); + try { + FileUtils.ensureDirectoryExistAndCanReadAndWrite(bucketDir); + } catch (IOException e) { + throw new FlowPersistenceException("Error accessing bucket directory at " + bucketDir.getAbsolutePath(), e); + } + + final File flowDir = new File(bucketDir, context.getFlowId()); + try { + FileUtils.ensureDirectoryExistAndCanReadAndWrite(flowDir); + } catch (IOException e) { + throw new FlowPersistenceException("Error accessing flow directory at " + flowDir.getAbsolutePath(), e); + } + + final String versionString = String.valueOf(context.getVersion()); + final File versionDir = new File(flowDir, versionString); + try { + FileUtils.ensureDirectoryExistAndCanReadAndWrite(versionDir); + } catch (IOException e) { + throw new FlowPersistenceException("Error accessing version directory at " + versionDir.getAbsolutePath(), e); + } + + final File versionFile = new File(versionDir, versionString + SNAPSHOT_EXTENSION); + if (versionFile.exists()) { + throw new FlowPersistenceException("Unable to save, a snapshot already exists with version " + versionString); + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Saving snapshot with filename {}", new Object[] {versionFile.getAbsolutePath()}); + } + + try (final OutputStream out = new FileOutputStream(versionFile)) { + out.write(content); + out.flush(); + } catch (Exception e) { + throw new FlowPersistenceException("Unable to write snapshot to disk due to " + e.getMessage(), e); + } + } + + @Override + public synchronized byte[] getFlowContent(final String bucketId, final String flowId, final int version) throws FlowPersistenceException { + final File snapshotFile = getSnapshotFile(bucketId, flowId, version); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Retrieving snapshot with filename {}", new Object[] {snapshotFile.getAbsolutePath()}); + } + + if (!snapshotFile.exists()) { + return null; + } + + try (final InputStream in = new FileInputStream(snapshotFile)){ + return IOUtils.toByteArray(in); + } catch (IOException e) { + throw new FlowPersistenceException("Error reading snapshot file: " + snapshotFile.getAbsolutePath(), e); + } + } + + @Override + public synchronized void deleteAllFlowContent(final String bucketId, final String flowId) throws FlowPersistenceException { + final File flowDir = new File(flowStorageDir, bucketId + "/" + flowId); + if (!flowDir.exists()) { + LOGGER.debug("Snapshot directory does not exist at {}", new Object[] {flowDir.getAbsolutePath()}); + return; + } + + // delete everything under the flow directory + try { + org.apache.commons.io.FileUtils.cleanDirectory(flowDir); + } catch (IOException e) { + throw new FlowPersistenceException("Error deleting snapshots at " + flowDir.getAbsolutePath(), e); + } + + // delete the directory for the flow + final boolean flowDirDeleted = flowDir.delete(); + if (!flowDirDeleted) { + LOGGER.error("Unable to delete flow directory: " + flowDir.getAbsolutePath()); + } + + // delete the directory for the bucket if there is nothing left + final File bucketDir = new File(flowStorageDir, bucketId); + final File[] bucketFiles = bucketDir.listFiles(); + if (bucketFiles.length == 0) { + final boolean deletedBucket = bucketDir.delete(); + if (!deletedBucket) { + LOGGER.error("Unable to delete bucket directory: " + flowDir.getAbsolutePath()); + } + } + } + + @Override + public synchronized void deleteFlowContent(final String bucketId, final String flowId, final int version) throws FlowPersistenceException { + final File snapshotFile = getSnapshotFile(bucketId, flowId, version); + if (!snapshotFile.exists()) { + LOGGER.debug("Snapshot file does not exist at {}", new Object[] {snapshotFile.getAbsolutePath()}); + return; + } + + final boolean deleted = snapshotFile.delete(); + if (!deleted) { + throw new FlowPersistenceException("Unable to delete snapshot at " + snapshotFile.getAbsolutePath()); + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Deleted snapshot at {}", new Object[] {snapshotFile.getAbsolutePath()}); + } + } + + protected File getSnapshotFile(final String bucketId, final String flowId, final int version) { + final String snapshotFilename = bucketId + "/" + flowId + "/" + version + "/" + version + SNAPSHOT_EXTENSION; + return new File(flowStorageDir, snapshotFilename); + } + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java ---------------------------------------------------------------------- diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java new file mode 100644 index 0000000..1728513 --- /dev/null +++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider.flow; + +import org.apache.commons.lang3.Validate; +import org.apache.nifi.registry.bucket.Bucket; +import org.apache.nifi.registry.flow.FlowSnapshotContext; +import org.apache.nifi.registry.flow.VersionedFlow; +import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; + +/** + * Standard implementation of FlowSnapshotContext. + */ +public class StandardFlowSnapshotContext implements FlowSnapshotContext { + + private final String bucketId; + private final String bucketName; + private final String flowId; + private final String flowName; + private final int version; + private final String comments; + private final String author; + private final long snapshotTimestamp; + + private StandardFlowSnapshotContext(final Builder builder) { + this.bucketId = builder.bucketId; + this.bucketName = builder.bucketName; + this.flowId = builder.flowId; + this.flowName = builder.flowName; + this.version = builder.version; + this.comments = builder.comments; + this.author = builder.author; + this.snapshotTimestamp = builder.snapshotTimestamp; + + Validate.notBlank(bucketId); + Validate.notBlank(bucketName); + Validate.notBlank(flowId); + Validate.notBlank(flowName); + Validate.isTrue(version > 0); + Validate.isTrue(snapshotTimestamp > 0); + } + + @Override + public String getBucketId() { + return bucketId; + } + + @Override + public String getBucketName() { + return bucketName; + } + + @Override + public String getFlowId() { + return flowId; + } + + @Override + public String getFlowName() { + return flowName; + } + + @Override + public int getVersion() { + return version; + } + + @Override + public String getComments() { + return comments; + } + + @Override + public long getSnapshotTimestamp() { + return snapshotTimestamp; + } + + @Override + public String getAuthor() { + return author; + } + + /** + * Builder for creating instances of StandardFlowSnapshotContext. + */ + public static class Builder { + + private String bucketId; + private String bucketName; + private String flowId; + private String flowName; + private int version; + private String comments; + private String author; + private long snapshotTimestamp; + + public Builder() { + + } + + public Builder(final Bucket bucket, final VersionedFlow versionedFlow, final VersionedFlowSnapshotMetadata snapshotMetadata) { + bucketId(bucket.getIdentifier()); + bucketName(bucket.getName()); + flowId(snapshotMetadata.getFlowIdentifier()); + flowName(versionedFlow.getName()); + version(snapshotMetadata.getVersion()); + comments(snapshotMetadata.getComments()); + author(snapshotMetadata.getAuthor()); + snapshotTimestamp(snapshotMetadata.getTimestamp()); + } + + public Builder bucketId(final String bucketId) { + this.bucketId = bucketId; + return this; + } + + public Builder bucketName(final String bucketName) { + this.bucketName = bucketName; + return this; + } + + public Builder flowId(final String flowId) { + this.flowId = flowId; + return this; + } + + public Builder flowName(final String flowName) { + this.flowName = flowName; + return this; + } + + public Builder version(final int version) { + this.version = version; + return this; + } + + public Builder comments(final String comments) { + this.comments = comments; + return this; + } + + public Builder author(final String author) { + this.author = author; + return this; + } + + public Builder snapshotTimestamp(final long snapshotTimestamp) { + this.snapshotTimestamp = snapshotTimestamp; + return this; + } + + public StandardFlowSnapshotContext build() { + return new StandardFlowSnapshotContext(this); + } + + } + +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Bucket.java ---------------------------------------------------------------------- diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Bucket.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Bucket.java new file mode 100644 index 0000000..3595d84 --- /dev/null +++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Bucket.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider.flow.git; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +class Bucket { + private final String bucketId; + private String bucketDirName; + + /** + * Flow ID to Flow. + */ + private Map<String, Flow> flows = new HashMap<>(); + + public Bucket(String bucketId) { + this.bucketId = bucketId; + } + + public String getBucketId() { + return bucketId; + } + + /** + * Returns the directory name of this bucket. + * @return can be different from original bucket name if it contained sanitized character. + */ + public String getBucketDirName() { + return bucketDirName; + } + + /** + * Set the name of bucket directory. + * @param bucketDirName The directory name must be sanitized, use {@link org.apache.nifi.registry.util.FileUtils#sanitizeFilename(String)} to do so. + */ + public void setBucketDirName(String bucketDirName) { + this.bucketDirName = bucketDirName; + } + + public Flow getFlowOrCreate(String flowId) { + return this.flows.computeIfAbsent(flowId, k -> new Flow(flowId)); + } + + public Optional<Flow> getFlow(String flowId) { + return Optional.ofNullable(flows.get(flowId)); + } + + public void removeFlow(String flowId) { + flows.remove(flowId); + } + + public boolean isEmpty() { + return flows.isEmpty(); + } + + /** + * Serialize the latest version of this Bucket meta data. + * @return serialized bucket + */ + Map<String, Object> serialize() { + final Map<String, Object> map = new HashMap<>(); + + map.put(GitFlowMetaData.LAYOUT_VERSION, GitFlowMetaData.CURRENT_LAYOUT_VERSION); + map.put(GitFlowMetaData.BUCKET_ID, bucketId); + map.put(GitFlowMetaData.FLOWS, + flows.keySet().stream().collect(Collectors.toMap(k -> k, k -> flows.get(k).serialize()))); + + return map; + } +} http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Flow.java ---------------------------------------------------------------------- diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Flow.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Flow.java new file mode 100644 index 0000000..1bc7f3f --- /dev/null +++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Flow.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider.flow.git; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +class Flow { + /** + * The ID of a Flow. It never changes. + */ + private final String flowId; + + /** + * A version to a Flow pointer. + */ + private final Map<Integer, FlowPointer> versions = new HashMap<>(); + + public Flow(String flowId) { + this.flowId = flowId; + } + + public boolean hasVersion(int version) { + return versions.containsKey(version); + } + + public FlowPointer getFlowVersion(int version) { + return versions.get(version); + } + + public void putVersion(int version, FlowPointer pointer) { + versions.put(version, pointer); + } + + public static class FlowPointer { + private String gitRev; + private String objectId; + private final String fileName; + + /** + * Create new FlowPointer instance. + * @param fileName The filename must be sanitized, use {@link org.apache.nifi.registry.util.FileUtils#sanitizeFilename(String)} to do so. + */ + public FlowPointer(String fileName) { + this.fileName = fileName; + } + + public void setGitRev(String gitRev) { + this.gitRev = gitRev; + } + + public String getGitRev() { + return gitRev; + } + + public String getFileName() { + return fileName; + } + + public String getObjectId() { + return objectId; + } + + public void setObjectId(String objectId) { + this.objectId = objectId; + } + } + + /** + * Serialize the latest version of this Flow meta data. + * @return serialized flow + */ + Map<String, Object> serialize() { + final Map<String, Object> map = new HashMap<>(); + final Optional<Integer> latestVerOpt = getLatestVersion(); + if (!latestVerOpt.isPresent()) { + throw new IllegalStateException("Flow version is not added yet, can not be serialized."); + } + final Integer latestVer = latestVerOpt.get(); + map.put(GitFlowMetaData.VER, latestVer); + map.put(GitFlowMetaData.FILE, versions.get(latestVer).fileName); + + return map; + } + + Optional<Integer> getLatestVersion() { + return versions.keySet().stream().reduce(Integer::max); + } + +}