This is an automated email from the ASF dual-hosted git repository. kdoran pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi-registry.git
The following commit(s) were added to refs/heads/master by this push: new 6f19a73 NIFIREG-285 - Add DatabaseFlowPersistenceProvider 6f19a73 is described below commit 6f19a736c00242affb82a0d326e686be5a2e1b50 Author: Bryan Bende <bbe...@apache.org> AuthorDate: Mon Jul 15 11:49:21 2019 -0400 NIFIREG-285 - Add DatabaseFlowPersistenceProvider - Added method injection of DataSource in StandardProviderFactory - Split out postgres migrations to use BYTEA for blob type This closes #205. Signed-off-by: Kevin Doran <kdo...@apache.org> --- .../src/main/asciidoc/administration-guide.adoc | 23 ++- .../registry/db/CustomFlywayConfiguration.java | 7 + .../registry/provider/StandardProviderFactory.java | 52 +++++- .../flow/DatabaseFlowPersistenceProvider.java | 86 ++++++++++ ...ache.nifi.registry.flow.FlowPersistenceProvider | 3 +- .../migration/default/V6__AddFlowPersistence.sql | 22 +++ .../db/migration/mysql/V6__AddFlowPersistence.sql | 22 +++ .../db/migration/postgres/V2__Initial.sql | 60 +++++++ .../db/migration/postgres/V3__AddExtensions.sql | 105 ++++++++++++ .../migration/postgres/V4__AddCascadeOnDelete.sql | 23 +++ .../postgres/V5__AddBucketPublicFlags.sql | 16 ++ .../migration/postgres/V6__AddFlowPersistence.sql | 22 +++ .../provider/TestStandardProviderFactory.java | 18 ++- .../flow/TestDatabaseFlowPersistenceProvider.java | 100 ++++++++++++ .../provider/hook/TestScriptEventHookProvider.java | 12 +- .../nifi/registry/provider/ProviderContext.java | 34 ++++ .../src/main/resources/conf/providers.xml | 6 + .../nifi/registry/web/api/DBFlowStorageIT.java | 178 +++++++++++++++++++++ .../application-ITDBFlowStorage.properties} | 9 +- .../conf/db-flow-storage/nifi-registry.properties} | 16 +- .../resources/conf/providers-db-flow-storage.xml | 29 ++++ .../FlowPersistenceProviderMigrator.java | 13 +- 22 files changed, 832 insertions(+), 24 deletions(-) diff --git a/nifi-registry-core/nifi-registry-docs/src/main/asciidoc/administration-guide.adoc b/nifi-registry-core/nifi-registry-docs/src/main/asciidoc/administration-guide.adoc index 29b5589..a28dc50 100644 --- a/nifi-registry-core/nifi-registry-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-registry-core/nifi-registry-docs/src/main/asciidoc/administration-guide.adoc @@ -1118,7 +1118,7 @@ Currently, NiFi Registry supports using H2, Postgres 9.x, and MySQL (5.6, 5.7, 8 NOTE: NiFi Registry 0.1.0 only supports H2. -== H2 +=== H2 H2 is an embedded database that is pre-configured in the default _nifi-registry.properties_ file. The contents of the H2 database are stored in a file on the local filesystem. @@ -1130,7 +1130,7 @@ For NiFi Registry 0.2.0 and forward, the location of the H2 database is specifie `nifi.registry.db.url=jdbc:h2:./database/nifi-registry-primary;` -== Postgres +=== Postgres Postgres provides the option to use an externally located database that also supports high availability. @@ -1159,7 +1159,7 @@ The following steps are required to use Postgres: nifi.registry.db.username=nifireg nifi.registry.db.password=changeme -== MySQL +=== MySQL MySQL also provides the option to use an externally located database that also supports high availability. @@ -1357,6 +1357,23 @@ Host bitbucket.org IdentityFile ~/.ssh/key-for-bitbucket .... +==== DatabaseFlowPersistenceProvider + +`DatabaseFlowPersistenceProvider` stores flow contents in a database table. + +This provider leverages the same database used for the metadata database, so there is no configuration to provide since the +connection details will come from the database properties in `nifi-registry.properties`. + +The database table is named `FLOW_PERSISTENCE_PROVIDER` and has the following schema: + +|==== +|*Column*|*Description* +|BUCKET_ID|The identifier of the bucket where the flow is located. +|FLOW_ID|The identifier of the flow. +|VERSION|The version of the flow. +|FLOW_CONTENT|The serialized bytes of the flow content stored as a BLOB. +|==== + ==== Switching from other Flow Persistence Provider In order to switch the Flow Persistence Provider, it is necessary to reset NiFi Registry. diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/CustomFlywayConfiguration.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/CustomFlywayConfiguration.java index 0cbf64f..0288f9d 100644 --- a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/CustomFlywayConfiguration.java +++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/CustomFlywayConfiguration.java @@ -41,6 +41,9 @@ public class CustomFlywayConfiguration implements FlywayConfigurationCustomizer private static final String LOCATION_MYSQL = "classpath:db/migration/mysql"; private static final String[] LOCATIONS_MYSQL = {LOCATION_COMMON, LOCATION_MYSQL}; + private static final String LOCATION_POSTGRES = "classpath:db/migration/postgres"; + private static final String[] LOCATIONS_POSTGRES = {LOCATION_COMMON, LOCATION_POSTGRES}; + @Override public void customize(final FluentConfiguration configuration) { final DatabaseType databaseType = getDatabaseType(configuration.getDataSource()); @@ -51,6 +54,10 @@ public class CustomFlywayConfiguration implements FlywayConfigurationCustomizer LOGGER.info("Setting migration locations to {}", new Object[] {LOCATIONS_MYSQL}); configuration.locations(LOCATIONS_MYSQL); break; + case POSTGRESQL: + LOGGER.info("Setting migration locations to {}", new Object[] {LOCATIONS_POSTGRES}); + configuration.locations(LOCATIONS_POSTGRES); + break; default: LOGGER.info("Setting migration locations to {}", new Object[] {LOCATIONS_DEFAULT}); configuration.locations(LOCATIONS_DEFAULT); diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java index acd7705..c9eb9f5 100644 --- a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java +++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java @@ -32,6 +32,7 @@ import org.springframework.context.annotation.Configuration; import org.xml.sax.SAXException; import javax.annotation.PostConstruct; +import javax.sql.DataSource; import javax.xml.XMLConstants; import javax.xml.bind.JAXBContext; import javax.xml.bind.JAXBElement; @@ -42,6 +43,8 @@ import javax.xml.validation.Schema; import javax.xml.validation.SchemaFactory; import java.io.File; import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -73,6 +76,7 @@ public class StandardProviderFactory implements ProviderFactory, DisposableBean private final NiFiRegistryProperties properties; private final ExtensionManager extensionManager; + private final DataSource dataSource; private final AtomicReference<Providers> providersHolder = new AtomicReference<>(null); private FlowPersistenceProvider flowPersistenceProvider; @@ -80,9 +84,10 @@ public class StandardProviderFactory implements ProviderFactory, DisposableBean private BundlePersistenceProvider bundlePersistenceProvider; @Autowired - public StandardProviderFactory(final NiFiRegistryProperties properties, final ExtensionManager extensionManager) { + public StandardProviderFactory(final NiFiRegistryProperties properties, final ExtensionManager extensionManager, final DataSource dataSource) { this.properties = properties; this.extensionManager = extensionManager; + this.dataSource = dataSource; if (this.properties == null) { throw new IllegalStateException("NiFiRegistryProperties cannot be null"); @@ -91,6 +96,10 @@ public class StandardProviderFactory implements ProviderFactory, DisposableBean if (this.extensionManager == null) { throw new IllegalStateException("ExtensionManager cannot be null"); } + + if (this.dataSource == null) { + throw new IllegalStateException("DataSource cannot be null"); + } } @PostConstruct @@ -144,14 +153,16 @@ public class StandardProviderFactory implements ProviderFactory, DisposableBean final Constructor constructor = flowProviderClass.getConstructor(); flowPersistenceProvider = (FlowPersistenceProvider) constructor.newInstance(); - LOGGER.info("Instantiated FlowPersistenceProvider with class name {}", new Object[] {flowProviderClassName}); + performMethodInjection(flowPersistenceProvider, flowProviderClass); + + LOGGER.info("Instantiated FlowPersistenceProvider with class name {}", new Object[]{flowProviderClassName}); } catch (Exception e) { throw new ProviderFactoryException("Error creating FlowPersistenceProvider with class name: " + flowProviderClassName, e); } final ProviderConfigurationContext configurationContext = createConfigurationContext(jaxbFlowProvider.getProperty()); flowPersistenceProvider.onConfigured(configurationContext); - LOGGER.info("Configured FlowPersistenceProvider with class name {}", new Object[] {flowProviderClassName}); + LOGGER.info("Configured FlowPersistenceProvider with class name {}", new Object[]{flowProviderClassName}); } return flowPersistenceProvider; @@ -192,6 +203,8 @@ public class StandardProviderFactory implements ProviderFactory, DisposableBean final Constructor constructor = hookProviderClass.getConstructor(); hook = (EventHookProvider) constructor.newInstance(); + performMethodInjection(hook, hookProviderClass); + LOGGER.info("Instantiated EventHookProvider with class name {}", new Object[] {hookProviderClassName}); } catch (Exception e) { throw new ProviderFactoryException("Error creating EventHookProvider with class name: " + hookProviderClassName, e); @@ -233,6 +246,8 @@ public class StandardProviderFactory implements ProviderFactory, DisposableBean final Constructor constructor = extensionBundleProviderClass.getConstructor(); bundlePersistenceProvider = (BundlePersistenceProvider) constructor.newInstance(); + performMethodInjection(bundlePersistenceProvider, extensionBundleProviderClass); + LOGGER.info("Instantiated BundlePersistenceProvider with class name {}", new Object[] {extensionBundleProviderClassName}); } catch (Exception e) { throw new ProviderFactoryException("Error creating BundlePersistenceProvider with class name: " + extensionBundleProviderClassName, e); @@ -273,4 +288,35 @@ public class StandardProviderFactory implements ProviderFactory, DisposableBean return new StandardProviderConfigurationContext(properties); } + private void performMethodInjection(final Object instance, final Class providerClass) throws IllegalAccessException, IllegalArgumentException, InvocationTargetException { + for (final Method method : providerClass.getMethods()) { + if (method.isAnnotationPresent(ProviderContext.class)) { + // make the method accessible + final boolean isAccessible = method.isAccessible(); + method.setAccessible(true); + + try { + final Class<?>[] argumentTypes = method.getParameterTypes(); + + // look for setters (single argument) + if (argumentTypes.length == 1) { + final Class<?> argumentType = argumentTypes[0]; + + // look for well known types, currently we only support injecting the DataSource + if (DataSource.class.isAssignableFrom(argumentType)) { + method.invoke(instance, dataSource); + } + } + } finally { + method.setAccessible(isAccessible); + } + } + } + + final Class parentClass = providerClass.getSuperclass(); + if (parentClass != null && Provider.class.isAssignableFrom(parentClass)) { + performMethodInjection(instance, parentClass); + } + } + } diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/DatabaseFlowPersistenceProvider.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/DatabaseFlowPersistenceProvider.java new file mode 100644 index 0000000..ee38592 --- /dev/null +++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/DatabaseFlowPersistenceProvider.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider.flow; + +import org.apache.nifi.registry.flow.FlowPersistenceException; +import org.apache.nifi.registry.flow.FlowPersistenceProvider; +import org.apache.nifi.registry.flow.FlowSnapshotContext; +import org.apache.nifi.registry.provider.ProviderConfigurationContext; +import org.apache.nifi.registry.provider.ProviderContext; +import org.apache.nifi.registry.provider.ProviderCreationException; +import org.springframework.jdbc.core.JdbcTemplate; + +import javax.sql.DataSource; +import java.util.ArrayList; +import java.util.List; + +/** + * A FlowPersistenceProvider that uses a database table for storage. The intent is to use the same database as the rest + * of the application so that all data can be stored together and benefit from any replication/scaling of the database. + */ +public class DatabaseFlowPersistenceProvider implements FlowPersistenceProvider { + + private DataSource dataSource; + private JdbcTemplate jdbcTemplate; + + @ProviderContext + public void setDataSource(final DataSource dataSource) { + this.dataSource = dataSource; + this.jdbcTemplate = new JdbcTemplate(this.dataSource); + } + + @Override + public void onConfigured(final ProviderConfigurationContext configurationContext) throws ProviderCreationException { + // there is no config since we get the DataSource from the framework + } + + @Override + public void saveFlowContent(final FlowSnapshotContext context, final byte[] content) throws FlowPersistenceException { + final String sql = "INSERT INTO FLOW_PERSISTENCE_PROVIDER (BUCKET_ID, FLOW_ID, VERSION, FLOW_CONTENT) VALUES (?, ?, ?, ?)"; + jdbcTemplate.update(sql, context.getBucketId(), context.getFlowId(), context.getVersion(), content); + } + + @Override + public byte[] getFlowContent(final String bucketId, final String flowId, final int version) throws FlowPersistenceException { + final List<byte[]> results = new ArrayList<>(); + final String sql = "SELECT FLOW_CONTENT FROM FLOW_PERSISTENCE_PROVIDER WHERE BUCKET_ID = ? and FLOW_ID = ? and VERSION = ?"; + + jdbcTemplate.query(sql, new Object[] {bucketId, flowId, version}, (rs) -> { + final byte[] content = rs.getBytes("FLOW_CONTENT"); + results.add(content); + }); + + if (results.isEmpty()) { + return null; + } else { + return results.get(0); + } + } + + @Override + public void deleteAllFlowContent(final String bucketId, final String flowId) throws FlowPersistenceException { + final String sql = "DELETE FROM FLOW_PERSISTENCE_PROVIDER WHERE BUCKET_ID = ? and FLOW_ID = ?"; + jdbcTemplate.update(sql, bucketId, flowId); + } + + @Override + public void deleteFlowContent(final String bucketId, final String flowId, final int version) throws FlowPersistenceException { + final String sql = "DELETE FROM FLOW_PERSISTENCE_PROVIDER WHERE BUCKET_ID = ? and FLOW_ID = ? and VERSION = ?"; + jdbcTemplate.update(sql, bucketId, flowId, version); + } + +} diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider b/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider index e456fa2..df57a73 100644 --- a/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider +++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider @@ -13,4 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. org.apache.nifi.registry.provider.flow.FileSystemFlowPersistenceProvider -org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider \ No newline at end of file +org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider +org.apache.nifi.registry.provider.flow.DatabaseFlowPersistenceProvider \ No newline at end of file diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/default/V6__AddFlowPersistence.sql b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/default/V6__AddFlowPersistence.sql new file mode 100644 index 0000000..0765b70 --- /dev/null +++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/default/V6__AddFlowPersistence.sql @@ -0,0 +1,22 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +CREATE TABLE FLOW_PERSISTENCE_PROVIDER ( + BUCKET_ID VARCHAR(50) NOT NULL, + FLOW_ID VARCHAR(50) NOT NULL, + VERSION INT NOT NULL, + FLOW_CONTENT BLOB NOT NULL, + CONSTRAINT PK__FLOW_PERSISTENCE_PROVIDER PRIMARY KEY (BUCKET_ID, FLOW_ID, VERSION) +); \ No newline at end of file diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/mysql/V6__AddFlowPersistence.sql b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/mysql/V6__AddFlowPersistence.sql new file mode 100644 index 0000000..ad3d53f --- /dev/null +++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/mysql/V6__AddFlowPersistence.sql @@ -0,0 +1,22 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +CREATE TABLE FLOW_PERSISTENCE_PROVIDER ( + BUCKET_ID VARCHAR(50) NOT NULL, + FLOW_ID VARCHAR(50) NOT NULL, + VERSION INT NOT NULL, + FLOW_CONTENT LONGBLOB NOT NULL, + CONSTRAINT PK__FLOW_PERSISTENCE_PROVIDER PRIMARY KEY (BUCKET_ID, FLOW_ID, VERSION) +); \ No newline at end of file diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V2__Initial.sql b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V2__Initial.sql new file mode 100644 index 0000000..b992d23 --- /dev/null +++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V2__Initial.sql @@ -0,0 +1,60 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +-- The NAME column has a max size of 768 because this is the largest size that MySQL allows when using a unique constraint. +CREATE TABLE BUCKET ( + ID VARCHAR(50) NOT NULL, + NAME VARCHAR(1000) NOT NULL, + DESCRIPTION TEXT, + CREATED TIMESTAMP NOT NULL, + CONSTRAINT PK__BUCKET_ID PRIMARY KEY (ID), + CONSTRAINT UNIQUE__BUCKET_NAME UNIQUE (NAME) +); + +CREATE TABLE BUCKET_ITEM ( + ID VARCHAR(50) NOT NULL, + NAME VARCHAR(1000) NOT NULL, + DESCRIPTION TEXT, + CREATED TIMESTAMP NOT NULL, + MODIFIED TIMESTAMP NOT NULL, + ITEM_TYPE VARCHAR(50) NOT NULL, + BUCKET_ID VARCHAR(50) NOT NULL, + CONSTRAINT PK__BUCKET_ITEM_ID PRIMARY KEY (ID), + CONSTRAINT FK__BUCKET_ITEM_BUCKET_ID FOREIGN KEY (BUCKET_ID) REFERENCES BUCKET(ID) +); + +CREATE TABLE FLOW ( + ID VARCHAR(50) NOT NULL, + CONSTRAINT PK__FLOW_ID PRIMARY KEY (ID), + CONSTRAINT FK__FLOW_BUCKET_ITEM_ID FOREIGN KEY (ID) REFERENCES BUCKET_ITEM(ID) +); + +CREATE TABLE FLOW_SNAPSHOT ( + FLOW_ID VARCHAR(50) NOT NULL, + VERSION INT NOT NULL, + CREATED TIMESTAMP NOT NULL, + CREATED_BY VARCHAR(4096) NOT NULL, + COMMENTS TEXT, + CONSTRAINT PK__FLOW_SNAPSHOT_FLOW_ID_AND_VERSION PRIMARY KEY (FLOW_ID, VERSION), + CONSTRAINT FK__FLOW_SNAPSHOT_FLOW_ID FOREIGN KEY (FLOW_ID) REFERENCES FLOW(ID) +); + +CREATE TABLE SIGNING_KEY ( + ID VARCHAR(50) NOT NULL, + TENANT_IDENTITY VARCHAR(4096) NOT NULL, + KEY_VALUE VARCHAR(50) NOT NULL, + CONSTRAINT PK__SIGNING_KEY_ID PRIMARY KEY (ID), + CONSTRAINT UNIQUE__SIGNING_KEY_TENANT_IDENTITY UNIQUE (TENANT_IDENTITY) +); \ No newline at end of file diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V3__AddExtensions.sql b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V3__AddExtensions.sql new file mode 100644 index 0000000..3bd9820 --- /dev/null +++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V3__AddExtensions.sql @@ -0,0 +1,105 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +CREATE TABLE BUNDLE ( + ID VARCHAR(50) NOT NULL, + BUCKET_ID VARCHAR(50) NOT NULL, + BUNDLE_TYPE VARCHAR(200) NOT NULL, + GROUP_ID VARCHAR(500) NOT NULL, + ARTIFACT_ID VARCHAR(500) NOT NULL, + CONSTRAINT PK__EXTENSION_BUNDLE_ID PRIMARY KEY (ID), + CONSTRAINT FK__EXTENSION_BUNDLE_BUCKET_ITEM_ID FOREIGN KEY (ID) REFERENCES BUCKET_ITEM(ID) ON DELETE CASCADE, + CONSTRAINT FK__EXTENSION_BUNDLE_BUCKET_ID FOREIGN KEY(BUCKET_ID) REFERENCES BUCKET(ID) ON DELETE CASCADE, + CONSTRAINT UNIQUE__EXTENSION_BUNDLE_BUCKET_GROUP_ARTIFACT UNIQUE (BUCKET_ID, GROUP_ID, ARTIFACT_ID) +); + +CREATE TABLE BUNDLE_VERSION ( + ID VARCHAR(50) NOT NULL, + BUNDLE_ID VARCHAR(50) NOT NULL, + VERSION VARCHAR(100) NOT NULL, + CREATED TIMESTAMP NOT NULL, + CREATED_BY VARCHAR(4096) NOT NULL, + DESCRIPTION TEXT, + SHA_256_HEX VARCHAR(512) NOT NULL, + SHA_256_SUPPLIED INT NOT NULL, + CONTENT_SIZE BIGINT NOT NULL, + SYSTEM_API_VERSION VARCHAR(50), + BUILD_TOOL VARCHAR(100), + BUILD_FLAGS VARCHAR(100), + BUILD_BRANCH VARCHAR(200), + BUILD_TAG VARCHAR(200), + BUILD_REVISION VARCHAR(100), + BUILT TIMESTAMP, + BUILT_BY VARCHAR(4096), + CONSTRAINT PK__BUNDLE_VERSION_ID PRIMARY KEY (ID), + CONSTRAINT FK__BUNDLE_VERSION_BUNDLE_ID FOREIGN KEY (BUNDLE_ID) REFERENCES BUNDLE(ID) ON DELETE CASCADE, + CONSTRAINT UNIQUE__BUNDLE_VERSION_BUNDLE_ID_VERSION UNIQUE (BUNDLE_ID, VERSION) +); + +CREATE TABLE BUNDLE_VERSION_DEPENDENCY ( + ID VARCHAR(50) NOT NULL, + BUNDLE_VERSION_ID VARCHAR(50) NOT NULL, + GROUP_ID VARCHAR(500) NOT NULL, + ARTIFACT_ID VARCHAR(500) NOT NULL, + VERSION VARCHAR(100) NOT NULL, + CONSTRAINT PK__BUNDLE_VERSION_DEPENDENCY_ID PRIMARY KEY (ID), + CONSTRAINT FK__BUNDLE_VERSION_DEPENDENCY_BUNDLE_VERSION_ID FOREIGN KEY (BUNDLE_VERSION_ID) REFERENCES BUNDLE_VERSION(ID) ON DELETE CASCADE, + CONSTRAINT UNIQUE__BUNDLE_VERSION_DEPENDENCY_BUNDLE_ID_GROUP_ARTIFACT_VERSION UNIQUE (BUNDLE_VERSION_ID, GROUP_ID, ARTIFACT_ID, VERSION) +); + +CREATE TABLE EXTENSION ( + ID VARCHAR(50) NOT NULL, + BUNDLE_VERSION_ID VARCHAR(50) NOT NULL, + NAME VARCHAR(500) NOT NULL, + DISPLAY_NAME VARCHAR(500) NOT NULL, + TYPE VARCHAR(100) NOT NULL, + CONTENT TEXT NOT NULL, + ADDITIONAL_DETAILS TEXT, + HAS_ADDITIONAL_DETAILS INT NOT NULL, + CONSTRAINT PK__EXTENSION_ID PRIMARY KEY (ID), + CONSTRAINT FK__EXTENSION_BUNDLE_VERSION_ID FOREIGN KEY (BUNDLE_VERSION_ID) REFERENCES BUNDLE_VERSION(ID) ON DELETE CASCADE, + CONSTRAINT UNIQUE__EXTENSION_BUNDLE_VERSION_ID_AND_NAME UNIQUE (BUNDLE_VERSION_ID, NAME) +); + +CREATE TABLE EXTENSION_PROVIDED_SERVICE_API ( + ID VARCHAR(50) NOT NULL, + EXTENSION_ID VARCHAR(50) NOT NULL, + CLASS_NAME VARCHAR (500) NOT NULL, + GROUP_ID VARCHAR(500) NOT NULL, + ARTIFACT_ID VARCHAR(500) NOT NULL, + VERSION VARCHAR(100) NOT NULL, + CONSTRAINT PK__EXTENSION_PROVIDED_SERVICE_API_ID PRIMARY KEY (ID), + CONSTRAINT FK__EXTENSION_PROVIDED_SERVICE_API_EXTENSION_ID FOREIGN KEY (EXTENSION_ID) REFERENCES EXTENSION(ID) ON DELETE CASCADE, + CONSTRAINT UNIQUE__EXTENSION_PROVIDED_SERVICE_API UNIQUE (EXTENSION_ID, CLASS_NAME, GROUP_ID, ARTIFACT_ID, VERSION) +); + +CREATE TABLE EXTENSION_RESTRICTION ( + ID VARCHAR(50) NOT NULL, + EXTENSION_ID VARCHAR(50) NOT NULL, + REQUIRED_PERMISSION VARCHAR(200) NOT NULL, + EXPLANATION VARCHAR (4096) NOT NULL, + CONSTRAINT PK__EXTENSION_RESTRICTION_ID PRIMARY KEY (ID), + CONSTRAINT FK__EXTENSION_RESTRICTION_EXTENSION_ID FOREIGN KEY (EXTENSION_ID) REFERENCES EXTENSION(ID) ON DELETE CASCADE, + CONSTRAINT UNIQUE__EXTENSION_RESTRICTION_EXTENSION_ID_REQUIRED_PERMISSION UNIQUE (EXTENSION_ID, REQUIRED_PERMISSION) +); + +CREATE TABLE EXTENSION_TAG ( + EXTENSION_ID VARCHAR(50) NOT NULL, + TAG VARCHAR(200) NOT NULL, + CONSTRAINT PK__EXTENSION_TAG_EXTENSION_ID_AND_TAG PRIMARY KEY (EXTENSION_ID, TAG), + CONSTRAINT FK__EXTENSION_TAG_EXTENSION_ID FOREIGN KEY (EXTENSION_ID) REFERENCES EXTENSION(ID) ON DELETE CASCADE +); + +ALTER TABLE BUCKET ADD ALLOW_EXTENSION_BUNDLE_REDEPLOY INT NOT NULL DEFAULT (0); \ No newline at end of file diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V4__AddCascadeOnDelete.sql b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V4__AddCascadeOnDelete.sql new file mode 100644 index 0000000..5b0e6c6 --- /dev/null +++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V4__AddCascadeOnDelete.sql @@ -0,0 +1,23 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +ALTER TABLE BUCKET_ITEM DROP CONSTRAINT FK__BUCKET_ITEM_BUCKET_ID; +ALTER TABLE BUCKET_ITEM ADD CONSTRAINT FK__BUCKET_ITEM_BUCKET_ID FOREIGN KEY (BUCKET_ID) REFERENCES BUCKET(ID) ON DELETE CASCADE; + +ALTER TABLE FLOW DROP CONSTRAINT FK__FLOW_BUCKET_ITEM_ID; +ALTER TABLE FLOW ADD CONSTRAINT FK__FLOW_BUCKET_ITEM_ID FOREIGN KEY (ID) REFERENCES BUCKET_ITEM(ID) ON DELETE CASCADE; + +ALTER TABLE FLOW_SNAPSHOT DROP CONSTRAINT FK__FLOW_SNAPSHOT_FLOW_ID; +ALTER TABLE FLOW_SNAPSHOT ADD CONSTRAINT FK__FLOW_SNAPSHOT_FLOW_ID FOREIGN KEY (FLOW_ID) REFERENCES FLOW(ID) ON DELETE CASCADE; \ No newline at end of file diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V5__AddBucketPublicFlags.sql b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V5__AddBucketPublicFlags.sql new file mode 100644 index 0000000..ef7478b --- /dev/null +++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V5__AddBucketPublicFlags.sql @@ -0,0 +1,16 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +ALTER TABLE BUCKET ADD ALLOW_PUBLIC_READ INT NOT NULL DEFAULT (0); diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V6__AddFlowPersistence.sql b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V6__AddFlowPersistence.sql new file mode 100644 index 0000000..395760b --- /dev/null +++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V6__AddFlowPersistence.sql @@ -0,0 +1,22 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +CREATE TABLE FLOW_PERSISTENCE_PROVIDER ( + BUCKET_ID VARCHAR(50) NOT NULL, + FLOW_ID VARCHAR(50) NOT NULL, + VERSION INT NOT NULL, + FLOW_CONTENT BYTEA NOT NULL, + CONSTRAINT PK__FLOW_PERSISTENCE_PROVIDER PRIMARY KEY (BUCKET_ID, FLOW_ID, VERSION) +); \ No newline at end of file diff --git a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/TestStandardProviderFactory.java b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/TestStandardProviderFactory.java index ce374f7..0d23110 100644 --- a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/TestStandardProviderFactory.java +++ b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/TestStandardProviderFactory.java @@ -23,6 +23,8 @@ import org.apache.nifi.registry.properties.NiFiRegistryProperties; import org.junit.Test; import org.mockito.Mockito; +import javax.sql.DataSource; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.mockito.Mockito.any; @@ -38,7 +40,9 @@ public class TestStandardProviderFactory { final ExtensionManager extensionManager = Mockito.mock(ExtensionManager.class); when(extensionManager.getExtensionClassLoader(any(String.class))).thenReturn(this.getClass().getClassLoader()); - final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager); + final DataSource dataSource = Mockito.mock(DataSource.class); + + final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager, dataSource); providerFactory.initialize(); final FlowPersistenceProvider flowPersistenceProvider = providerFactory.getFlowPersistenceProvider(); @@ -66,7 +70,9 @@ public class TestStandardProviderFactory { final ExtensionManager extensionManager = Mockito.mock(ExtensionManager.class); when(extensionManager.getExtensionClassLoader(any(String.class))).thenReturn(this.getClass().getClassLoader()); - final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager); + final DataSource dataSource = Mockito.mock(DataSource.class); + + final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager, dataSource); providerFactory.getFlowPersistenceProvider(); } @@ -78,7 +84,9 @@ public class TestStandardProviderFactory { final ExtensionManager extensionManager = Mockito.mock(ExtensionManager.class); when(extensionManager.getExtensionClassLoader(any(String.class))).thenReturn(this.getClass().getClassLoader()); - final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager); + final DataSource dataSource = Mockito.mock(DataSource.class); + + final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager, dataSource); providerFactory.initialize(); } @@ -90,7 +98,9 @@ public class TestStandardProviderFactory { final ExtensionManager extensionManager = Mockito.mock(ExtensionManager.class); when(extensionManager.getExtensionClassLoader(any(String.class))).thenReturn(this.getClass().getClassLoader()); - final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager); + final DataSource dataSource = Mockito.mock(DataSource.class); + + final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager, dataSource); providerFactory.initialize(); providerFactory.getFlowPersistenceProvider(); diff --git a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/TestDatabaseFlowPersistenceProvider.java b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/TestDatabaseFlowPersistenceProvider.java new file mode 100644 index 0000000..5314860 --- /dev/null +++ b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/TestDatabaseFlowPersistenceProvider.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider.flow; + +import org.apache.nifi.registry.db.DatabaseTestApplication; +import org.apache.nifi.registry.flow.FlowPersistenceProvider; +import org.apache.nifi.registry.flow.FlowSnapshotContext; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.TestExecutionListeners; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.test.context.support.DependencyInjectionTestExecutionListener; +import org.springframework.test.context.transaction.TransactionalTestExecutionListener; +import org.springframework.transaction.annotation.Transactional; + +import javax.sql.DataSource; +import java.nio.charset.StandardCharsets; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.when; + +@Transactional +@RunWith(SpringRunner.class) +@SpringBootTest(classes = DatabaseTestApplication.class, webEnvironment = SpringBootTest.WebEnvironment.NONE) +@TestExecutionListeners({DependencyInjectionTestExecutionListener.class, TransactionalTestExecutionListener.class}) +public class TestDatabaseFlowPersistenceProvider { + + @Autowired + private DataSource dataSource; + + private FlowPersistenceProvider persistenceProvider; + + @Before + public void setup() { + persistenceProvider = new DatabaseFlowPersistenceProvider(); + ((DatabaseFlowPersistenceProvider)persistenceProvider).setDataSource(dataSource); + } + + @Test + public void testAll() { + // Save two versions of a flow... + final FlowSnapshotContext context1 = getFlowSnapshotContext("b1", "f1", 1); + final byte[] content1 = "f1v1".getBytes(StandardCharsets.UTF_8); + persistenceProvider.saveFlowContent(context1, content1); + + final FlowSnapshotContext context2 = getFlowSnapshotContext("b1", "f1", 2); + final byte[] content2 = "f1v2".getBytes(StandardCharsets.UTF_8); + persistenceProvider.saveFlowContent(context2, content2); + + // Verify we can retrieve both versions and that the content is correct + final byte[] retrievedContent1 = persistenceProvider.getFlowContent(context1.getBucketId(), context1.getFlowId(), context1.getVersion()); + assertNotNull(retrievedContent1); + assertEquals("f1v1", new String(retrievedContent1, StandardCharsets.UTF_8)); + + final byte[] retrievedContent2 = persistenceProvider.getFlowContent(context2.getBucketId(), context2.getFlowId(), context2.getVersion()); + assertNotNull(retrievedContent2); + assertEquals("f1v2", new String(retrievedContent2, StandardCharsets.UTF_8)); + + // Delete a specific version and verify we can longer retrieve it + persistenceProvider.deleteFlowContent(context1.getBucketId(), context1.getFlowId(), context1.getVersion()); + + final byte[] deletedContent1 = persistenceProvider.getFlowContent(context1.getBucketId(), context1.getFlowId(), context1.getVersion()); + assertNull(deletedContent1); + + // Delete all content for a flow + persistenceProvider.deleteAllFlowContent(context1.getBucketId(), context1.getFlowId()); + + final byte[] deletedContent2 = persistenceProvider.getFlowContent(context2.getBucketId(), context2.getFlowId(), context2.getVersion()); + assertNull(deletedContent2); + } + + private FlowSnapshotContext getFlowSnapshotContext(final String bucketId, final String flowId, final int version) { + final FlowSnapshotContext context = Mockito.mock(FlowSnapshotContext.class); + when(context.getBucketId()).thenReturn(bucketId); + when(context.getFlowId()).thenReturn(flowId); + when(context.getVersion()).thenReturn(version); + return context; + } + +} diff --git a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/hook/TestScriptEventHookProvider.java b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/hook/TestScriptEventHookProvider.java index ab24998..b7d241b 100644 --- a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/hook/TestScriptEventHookProvider.java +++ b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/hook/TestScriptEventHookProvider.java @@ -16,9 +16,6 @@ */ package org.apache.nifi.registry.provider.hook; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.when; - import org.apache.nifi.registry.extension.ExtensionManager; import org.apache.nifi.registry.properties.NiFiRegistryProperties; import org.apache.nifi.registry.provider.ProviderCreationException; @@ -27,6 +24,11 @@ import org.apache.nifi.registry.provider.StandardProviderFactory; import org.junit.Test; import org.mockito.Mockito; +import javax.sql.DataSource; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + public class TestScriptEventHookProvider { @Test(expected = ProviderCreationException.class) @@ -37,7 +39,9 @@ public class TestScriptEventHookProvider { final ExtensionManager extensionManager = Mockito.mock(ExtensionManager.class); when(extensionManager.getExtensionClassLoader(any(String.class))).thenReturn(this.getClass().getClassLoader()); - final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager); + final DataSource dataSource = Mockito.mock(DataSource.class); + + final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager, dataSource); providerFactory.initialize(); providerFactory.getEventHookProviders(); } diff --git a/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderContext.java b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderContext.java new file mode 100644 index 0000000..9729176 --- /dev/null +++ b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderContext.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotation for setter methods in a provider to indicate the framework should inject the requested resource. + */ +@Documented +@Target({ElementType.FIELD, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface ProviderContext { +} diff --git a/nifi-registry-core/nifi-registry-resources/src/main/resources/conf/providers.xml b/nifi-registry-core/nifi-registry-resources/src/main/resources/conf/providers.xml index f41eee8..3eb138f 100644 --- a/nifi-registry-core/nifi-registry-resources/src/main/resources/conf/providers.xml +++ b/nifi-registry-core/nifi-registry-resources/src/main/resources/conf/providers.xml @@ -37,6 +37,12 @@ --> <!-- + <flowPersistenceProvider> + <class>org.apache.nifi.registry.provider.flow.DatabaseFlowPersistenceProvider</class> + </flowPersistenceProvider> + --> + + <!-- <eventHookProvider> <class>org.apache.nifi.registry.provider.hook.ScriptEventHookProvider</class> <property name="Script Path"></property> diff --git a/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/DBFlowStorageIT.java b/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/DBFlowStorageIT.java new file mode 100644 index 0000000..b438eaa --- /dev/null +++ b/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/DBFlowStorageIT.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.web.api; + +import org.apache.nifi.registry.NiFiRegistryTestApiApplication; +import org.apache.nifi.registry.bucket.Bucket; +import org.apache.nifi.registry.client.NiFiRegistryClient; +import org.apache.nifi.registry.client.NiFiRegistryClientConfig; +import org.apache.nifi.registry.client.NiFiRegistryException; +import org.apache.nifi.registry.client.impl.JerseyNiFiRegistryClient; +import org.apache.nifi.registry.flow.VersionedFlow; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; +import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; +import org.apache.nifi.registry.flow.VersionedProcessGroup; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.jdbc.Sql; +import org.springframework.test.context.junit4.SpringRunner; + +import java.io.IOException; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +@RunWith(SpringRunner.class) +@SpringBootTest( + classes = NiFiRegistryTestApiApplication.class, + webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, + properties = "spring.profiles.include=ITDBFlowStorage") +@Sql(executionPhase = Sql.ExecutionPhase.BEFORE_TEST_METHOD, scripts = "classpath:db/clearDB.sql") +public class DBFlowStorageIT extends IntegrationTestBase { + + static final Logger LOGGER = LoggerFactory.getLogger(UnsecuredNiFiRegistryClientIT.class); + + private NiFiRegistryClient client; + + @Before + public void setup() throws IOException { + final String baseUrl = createBaseURL(); + LOGGER.info("Using base url = " + baseUrl); + + final NiFiRegistryClientConfig clientConfig = new NiFiRegistryClientConfig.Builder() + .baseUrl(baseUrl) + .build(); + assertNotNull(clientConfig); + + final NiFiRegistryClient client = new JerseyNiFiRegistryClient.Builder() + .config(clientConfig) + .build(); + assertNotNull(client); + this.client = client; + } + + @After + public void teardown() { + try { + client.close(); + } catch (Exception e) { + LOGGER.warn(e.getMessage(), e); + } + } + + @Test + public void testAll() throws IOException, NiFiRegistryException { + // Create two buckets... + + final Bucket b1 = new Bucket(); + b1.setName("b1"); + + final Bucket createdB1 = client.getBucketClient().create(b1); + assertNotNull(createdB1); + + final Bucket b2 = new Bucket(); + b2.setName("b2"); + + final Bucket createdB2 = client.getBucketClient().create(b2); + assertNotNull(createdB2); + + // Create two flows... + + final VersionedFlow f1 = new VersionedFlow(); + f1.setName("f1"); + f1.setBucketIdentifier(createdB1.getIdentifier()); + + final VersionedFlow createdF1 = client.getFlowClient().create(f1); + assertNotNull(createdF1); + + final VersionedFlow f2 = new VersionedFlow(); + f2.setName("f2"); + f2.setBucketIdentifier(createdB2.getIdentifier()); + + final VersionedFlow createdF2 = client.getFlowClient().create(f2); + assertNotNull(createdF2); + + // Create some versions for each flow... + + final VersionedFlowSnapshot snapshotF1V1 = createSnapshot(createdB1, createdF1, 1, "f1v1"); + final VersionedFlowSnapshot createdSnapshotF1V1 = client.getFlowSnapshotClient().create(snapshotF1V1); + assertNotNull(createdSnapshotF1V1); + + final VersionedFlowSnapshot snapshotF1V2 = createSnapshot(createdB1, createdF1, 2, "f1v2"); + final VersionedFlowSnapshot createdSnapshotF1V2 = client.getFlowSnapshotClient().create(snapshotF1V2); + assertNotNull(createdSnapshotF1V2); + + final VersionedFlowSnapshot snapshotF2V1 = createSnapshot(createdB2, createdF2, 1, "f2v1"); + final VersionedFlowSnapshot createdSnapshotF2V1 = client.getFlowSnapshotClient().create(snapshotF2V1); + assertNotNull(createdSnapshotF2V1); + + final VersionedFlowSnapshot snapshotF2V2 = createSnapshot(createdB2, createdF2, 2, "f2v2"); + final VersionedFlowSnapshot createdSnapshotF2V2 = client.getFlowSnapshotClient().create(snapshotF2V2); + assertNotNull(createdSnapshotF2V2); + + // Verify retrieving flow versions... + + final VersionedFlowSnapshot retrievedSnapshotF1V1 = client.getFlowSnapshotClient().get(createdF1.getIdentifier(), 1); + assertNotNull(retrievedSnapshotF1V1); + assertNotNull(retrievedSnapshotF1V1.getFlowContents()); + assertEquals("f1v1", retrievedSnapshotF1V1.getFlowContents().getName()); + + final VersionedFlowSnapshot retrievedSnapshotF1V2 = client.getFlowSnapshotClient().get(createdF1.getIdentifier(), 2); + assertNotNull(retrievedSnapshotF1V2); + assertNotNull(retrievedSnapshotF1V2.getFlowContents()); + assertEquals("f1v2", retrievedSnapshotF1V2.getFlowContents().getName()); + + // Verify deleting a flow... + + client.getFlowClient().delete(createdB1.getIdentifier(), createdF1.getIdentifier()); + + // All versions of f1 should be deleted + try { + client.getFlowSnapshotClient().get(createdF1.getIdentifier(), 1); + fail("Should have thrown exception"); + } catch (NiFiRegistryException nre) { + } + + // Versions of f2 should still exist... + final VersionedFlowSnapshot retrievedSnapshotF2V1 = client.getFlowSnapshotClient().get(createdF2.getIdentifier(), 1); + assertNotNull(retrievedSnapshotF2V1); + assertNotNull(retrievedSnapshotF2V1.getFlowContents()); + assertEquals("f2v1", retrievedSnapshotF2V1.getFlowContents().getName()); + } + + private VersionedFlowSnapshot createSnapshot(final Bucket bucket, final VersionedFlow flow, final int version, final String rootPgName) { + final VersionedProcessGroup rootPg = new VersionedProcessGroup(); + rootPg.setName(rootPgName); + + final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata(); + snapshotMetadata.setBucketIdentifier(bucket.getIdentifier()); + snapshotMetadata.setFlowIdentifier(flow.getIdentifier()); + snapshotMetadata.setVersion(version); + snapshotMetadata.setComments("comments"); + + final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot(); + snapshot.setFlowContents(rootPg); + snapshot.setSnapshotMetadata(snapshotMetadata); + return snapshot; + } +} diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider b/nifi-registry-core/nifi-registry-web-api/src/test/resources/application-ITDBFlowStorage.properties similarity index 77% copy from nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider copy to nifi-registry-core/nifi-registry-web-api/src/test/resources/application-ITDBFlowStorage.properties index e456fa2..91e653c 100644 --- a/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider +++ b/nifi-registry-core/nifi-registry-web-api/src/test/resources/application-ITDBFlowStorage.properties @@ -1,3 +1,4 @@ +# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. @@ -12,5 +13,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.registry.provider.flow.FileSystemFlowPersistenceProvider -org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider \ No newline at end of file +# + +# Integration Test Profile for running an unsecured NiFi Registry instance + +# Custom (non-standard to Spring Boot) properties +nifi.registry.properties.file = src/test/resources/conf/db-flow-storage/nifi-registry.properties \ No newline at end of file diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider b/nifi-registry-core/nifi-registry-web-api/src/test/resources/conf/db-flow-storage/nifi-registry.properties similarity index 66% copy from nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider copy to nifi-registry-core/nifi-registry-web-api/src/test/resources/conf/db-flow-storage/nifi-registry.properties index e456fa2..dd50ae6 100644 --- a/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider +++ b/nifi-registry-core/nifi-registry-web-api/src/test/resources/conf/db-flow-storage/nifi-registry.properties @@ -1,3 +1,4 @@ +# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. @@ -12,5 +13,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.registry.provider.flow.FileSystemFlowPersistenceProvider -org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider \ No newline at end of file +# + +# web properties # +nifi.registry.web.http.host=localhost + +# providers properties # +nifi.registry.providers.configuration.file=./target/test-classes/conf/providers-db-flow-storage.xml + +# extensions working dir # +nifi.registry.extensions.working.directory=./target/work/extensions + +# database properties +nifi.registry.db.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE \ No newline at end of file diff --git a/nifi-registry-core/nifi-registry-web-api/src/test/resources/conf/providers-db-flow-storage.xml b/nifi-registry-core/nifi-registry-web-api/src/test/resources/conf/providers-db-flow-storage.xml new file mode 100644 index 0000000..2ec132a --- /dev/null +++ b/nifi-registry-core/nifi-registry-web-api/src/test/resources/conf/providers-db-flow-storage.xml @@ -0,0 +1,29 @@ +<?xml version="1.0" encoding="UTF-8" standalone="yes"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<providers> + + <flowPersistenceProvider> + <class>org.apache.nifi.registry.provider.flow.DatabaseFlowPersistenceProvider</class> + </flowPersistenceProvider> + + <extensionBundlePersistenceProvider> + <class>org.apache.nifi.registry.provider.extension.FileSystemBundlePersistenceProvider</class> + <property name="Extension Bundle Storage Directory">./target/test-classes/extension_bundles</property> + </extensionBundlePersistenceProvider> + +</providers> \ No newline at end of file diff --git a/nifi-registry-toolkit/nifi-registry-toolkit-persistence/src/main/java/org/apache/nifi/registry/toolkit/persistence/FlowPersistenceProviderMigrator.java b/nifi-registry-toolkit/nifi-registry-toolkit-persistence/src/main/java/org/apache/nifi/registry/toolkit/persistence/FlowPersistenceProviderMigrator.java index a917c19..a510281 100644 --- a/nifi-registry-toolkit/nifi-registry-toolkit-persistence/src/main/java/org/apache/nifi/registry/toolkit/persistence/FlowPersistenceProviderMigrator.java +++ b/nifi-registry-toolkit/nifi-registry-toolkit-persistence/src/main/java/org/apache/nifi/registry/toolkit/persistence/FlowPersistenceProviderMigrator.java @@ -40,6 +40,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jdbc.core.JdbcTemplate; +import javax.sql.DataSource; + public class FlowPersistenceProviderMigrator { private static final Logger log = LoggerFactory.getLogger(FlowPersistenceProviderMigrator.class); public static final int PARSE_EXCEPTION = 1; @@ -81,9 +83,10 @@ public class FlowPersistenceProviderMigrator { NiFiRegistryProperties fromProperties = NiFiRegistry.initializeProperties(NiFiRegistry.getMasterKeyProvider()); - DatabaseMetadataService fromMetadataService = new DatabaseMetadataService(new JdbcTemplate(new DataSourceFactory(fromProperties).getDataSource())); - FlowPersistenceProvider fromPersistenceProvider = createFlowPersistenceProvider(fromProperties); - FlowPersistenceProvider toPersistenceProvider = createFlowPersistenceProvider(createToProperties(commandLine, fromProperties)); + DataSource dataSource = new DataSourceFactory(fromProperties).getDataSource(); + DatabaseMetadataService fromMetadataService = new DatabaseMetadataService(new JdbcTemplate(dataSource)); + FlowPersistenceProvider fromPersistenceProvider = createFlowPersistenceProvider(fromProperties, dataSource); + FlowPersistenceProvider toPersistenceProvider = createFlowPersistenceProvider(createToProperties(commandLine, fromProperties), dataSource); new FlowPersistenceProviderMigrator().doMigrate(fromMetadataService, fromPersistenceProvider, toPersistenceProvider); } @@ -97,10 +100,10 @@ public class FlowPersistenceProviderMigrator { return toProperties; } - private static FlowPersistenceProvider createFlowPersistenceProvider(NiFiRegistryProperties niFiRegistryProperties) { + private static FlowPersistenceProvider createFlowPersistenceProvider(NiFiRegistryProperties niFiRegistryProperties, DataSource dataSource) { ExtensionManager fromExtensionManager = new ExtensionManager(niFiRegistryProperties); fromExtensionManager.discoverExtensions(); - StandardProviderFactory fromProviderFactory = new StandardProviderFactory(niFiRegistryProperties, fromExtensionManager); + StandardProviderFactory fromProviderFactory = new StandardProviderFactory(niFiRegistryProperties, fromExtensionManager, dataSource); fromProviderFactory.initialize(); return fromProviderFactory.getFlowPersistenceProvider(); }