Repository: nifi-minifi Updated Branches: refs/heads/minifi-c2-server 631b59926 -> 7ab8fe7db
MINIFI-447 - Adding FlowMapper and FlowRetrievalService This closes #119. Signed-off-by: Kevin Doran <kdo...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi/commit/7ab8fe7d Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi/tree/7ab8fe7d Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi/diff/7ab8fe7d Branch: refs/heads/minifi-c2-server Commit: 7ab8fe7db92a1ad1afd714364219376df5d81d01 Parents: 631b599 Author: Bryan Bende <bbe...@apache.org> Authored: Fri Mar 16 11:08:16 2018 -0400 Committer: Kevin Doran <kdo...@apache.org> Committed: Fri Mar 23 14:19:31 2018 -0400 ---------------------------------------------------------------------- minifi-c2/minifi-c2-assembly/pom.xml | 3 + .../main/resources/conf/minifi-c2.properties | 4 + .../nifi/minifi/c2/properties/C2Properties.java | 11 ++ minifi-c2/minifi-c2-framework/pom.xml | 7 +- .../flow/client/NiFiRegistryClientFactory.java | 126 ++++++++++++ .../core/service/flow/mapping/FlowMapper.java | 37 ++++ .../flow/mapping/FlowMapperException.java | 31 +++ .../service/flow/mapping/FlowMapperFactory.java | 69 +++++++ .../service/flow/mapping/SimpleFlowMapper.java | 74 +++++++ .../flow/retrieval/ClassNotMappedException.java | 31 +++ .../flow/retrieval/FlowRetrievalException.java | 31 +++ .../flow/retrieval/FlowRetrievalService.java | 66 +++++++ .../retrieval/StandardFlowRetrievalService.java | 160 +++++++++++++++ .../flow/mapping/TestSimpleFlowMapper.java | 109 +++++++++++ .../StandardFlowRetrievalServiceIT.java | 157 +++++++++++++++ .../TestStandardFlowRetrievalService.java | 194 +++++++++++++++++++ .../src/test/resources/minifi-c2.properties | 35 ++++ minifi-c2/minifi-c2-web-api/pom.xml | 2 +- minifi-c2/pom.xml | 1 + 19 files changed, 1146 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/minifi-c2/minifi-c2-assembly/pom.xml b/minifi-c2/minifi-c2-assembly/pom.xml index 3f55ac2..02f07f0 100644 --- a/minifi-c2/minifi-c2-assembly/pom.xml +++ b/minifi-c2/minifi-c2-assembly/pom.xml @@ -38,6 +38,9 @@ limitations under the License. <minifi.c2.server.truststorePasswd /> <minifi.c2.server.provider.nifi.rest.api.url>http://localhost:8080/nifi-api</minifi.c2.server.provider.nifi.rest.api.url> + + <minifi.c2.server.nifi.registry.url /> + <minifi.c2.server.nifi.registry.bucket.id /> </properties> <build> <plugins> http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-assembly/src/main/resources/conf/minifi-c2.properties ---------------------------------------------------------------------- diff --git a/minifi-c2/minifi-c2-assembly/src/main/resources/conf/minifi-c2.properties b/minifi-c2/minifi-c2-assembly/src/main/resources/conf/minifi-c2.properties index 2836f03..fcdf0b5 100644 --- a/minifi-c2/minifi-c2-assembly/src/main/resources/conf/minifi-c2.properties +++ b/minifi-c2/minifi-c2-assembly/src/main/resources/conf/minifi-c2.properties @@ -29,3 +29,7 @@ minifi.c2.server.security.keyPasswd= minifi.c2.server.security.truststore= minifi.c2.server.security.truststoreType= minifi.c2.server.security.truststorePasswd= + +# The base URL of a NiFi Registry instance, such as https://localhost:18443 +minifi.c2.server.nifi.registry.url=${minifi.c2.server.nifi.registry.url} +minifi.c2.server.nifi.registry.bucket.id=${minifi.c2.server.nifi.registry.bucket.id} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-commons/src/main/java/org/apache/nifi/minifi/c2/properties/C2Properties.java ---------------------------------------------------------------------- diff --git a/minifi-c2/minifi-c2-commons/src/main/java/org/apache/nifi/minifi/c2/properties/C2Properties.java b/minifi-c2/minifi-c2-commons/src/main/java/org/apache/nifi/minifi/c2/properties/C2Properties.java index e240139..55fc02e 100644 --- a/minifi-c2/minifi-c2-commons/src/main/java/org/apache/nifi/minifi/c2/properties/C2Properties.java +++ b/minifi-c2/minifi-c2-commons/src/main/java/org/apache/nifi/minifi/c2/properties/C2Properties.java @@ -51,6 +51,10 @@ public class C2Properties extends Properties { public static final String SECURITY_IDENTITY_MAPPING_PATTERN_PREFIX = "minifi.c2.server.security.identity.mapping.pattern."; public static final String SECURITY_IDENTITY_MAPPING_VALUE_PREFIX = "minifi.c2.server.security.identity.mapping.value."; + // NiFi Registry properties + public static final String NIFI_REGISTRY_URL = "minifi.c2.server.nifi.registry.url"; + public static final String NIFI_REGISTRY_BUCKET_ID = "minifi.c2.server.nifi.registry.bucket.id"; + // Default Values public static final String DEFAULT_WEB_WORKING_DIR = "./work/jetty"; public static final String DEFAULT_WAR_DIR = "./lib"; @@ -142,6 +146,13 @@ public class C2Properties extends Properties { return getProperty(SECURITY_TLS_TRUSTSTORE_PASSWD); } + public String getNifiRegistryUrl() { + return getProperty(NIFI_REGISTRY_URL); + } + + public String getNifiRegistryBucketId() { + return getProperty(NIFI_REGISTRY_BUCKET_ID); + } // Helper functions for common ways of interpreting property values http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-framework/pom.xml ---------------------------------------------------------------------- diff --git a/minifi-c2/minifi-c2-framework/pom.xml b/minifi-c2/minifi-c2-framework/pom.xml index 464dd0e..f0d7198 100644 --- a/minifi-c2/minifi-c2-framework/pom.xml +++ b/minifi-c2/minifi-c2-framework/pom.xml @@ -46,7 +46,12 @@ limitations under the License. <dependency> <groupId>org.apache.nifi.registry</groupId> <artifactId>nifi-registry-security-api</artifactId> - <version>0.1.0</version> + <version>${nifi.registry.version}</version> + </dependency> + <dependency> + <groupId>org.apache.nifi.registry</groupId> + <artifactId>nifi-registry-client</artifactId> + <version>${nifi.registry.version}</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/client/NiFiRegistryClientFactory.java ---------------------------------------------------------------------- diff --git a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/client/NiFiRegistryClientFactory.java b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/client/NiFiRegistryClientFactory.java new file mode 100644 index 0000000..b521bb6 --- /dev/null +++ b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/client/NiFiRegistryClientFactory.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.c2.core.service.flow.client; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.Validate; +import org.apache.nifi.minifi.c2.properties.C2Properties; +import org.apache.nifi.registry.client.NiFiRegistryClient; +import org.apache.nifi.registry.client.NiFiRegistryClientConfig; +import org.apache.nifi.registry.client.impl.JerseyNiFiRegistryClient; +import org.apache.nifi.registry.security.util.KeystoreType; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * This class does not follow the typical "factory bean" pattern of having a method annotated with @Bean because + * we want to support running a C2 server that may not be configured to do flow deployments, which means we don't want + * to create a NiFiRegistryClient during start-up because the registry URL may not be populated. + * + * An instance of this class can be injected into other services that need a NiFiRegistryClient, and if those services + * get called then they can use this class to lazily obtain an instance, which can then throw an exception if the + * appropriate configuration is not provided. + */ +@Component +public class NiFiRegistryClientFactory { + + private volatile NiFiRegistryClient client; + + private final C2Properties c2Properties; + + @Autowired + public NiFiRegistryClientFactory(final C2Properties c2Properties) { + this.c2Properties = c2Properties; + Validate.notNull(this.c2Properties); + } + + public String getNiFiRegistryUrl() { + return c2Properties.getNifiRegistryUrl(); + } + + public String getNiFiRegistryBucketId() { + return c2Properties.getNifiRegistryBucketId(); + } + + /** + * Lazily creates a NiFiRegistryClient the first time this method is called. + * + * @return the NiFiRegistryClient held by this factory + */ + public NiFiRegistryClient getClient() { + if (client == null) { + initializeClient(); + } + return client; + } + + private synchronized void initializeClient() { + // make sure another thread hasn't initialized the client before we got into this synchronized method + if (client != null) { + return; + } + + final String url = c2Properties.getNifiRegistryUrl(); + if (StringUtils.isBlank(url)) { + throw new IllegalStateException("Unable to create NiFi Registry Client because NiFi Registry URL was not provided"); + } + + final String keystore = c2Properties.getKeyStorePath(); + final String keystoreType = c2Properties.getKeyStoreType(); + final String keystorePasswd = c2Properties.getKeyStorePassword(); + final String keyPasswd = c2Properties.getKeyPassword(); + + final String truststore = c2Properties.getTrustStorePath(); + final String truststoreType = c2Properties.getTrustStoreType(); + final String truststorePasswd = c2Properties.getTrustStorePassword(); + + final boolean secureUrl = url.startsWith("https"); + if (secureUrl && (StringUtils.isBlank(keystore) || StringUtils.isBlank(truststore))) { + throw new IllegalStateException("Keystore and truststore must be provided when NiFi Registry URL is secure"); + } + + final NiFiRegistryClientConfig.Builder clientConfigBuilder = new NiFiRegistryClientConfig.Builder().baseUrl(url); + + if (secureUrl) { + if (!StringUtils.isBlank(keystore)) { + clientConfigBuilder.keystoreFilename(keystore); + } + if (!StringUtils.isBlank(keystoreType)) { + clientConfigBuilder.keystoreType(KeystoreType.valueOf(keystoreType.toUpperCase())); + } + if (!StringUtils.isBlank(keystorePasswd)) { + clientConfigBuilder.keystorePassword(keystorePasswd); + } + if (!StringUtils.isBlank(keyPasswd)) { + clientConfigBuilder.keyPassword(keyPasswd); + } + if (!StringUtils.isBlank(truststore)) { + clientConfigBuilder.truststoreFilename(truststore); + } + if (!StringUtils.isBlank(truststoreType)) { + clientConfigBuilder.truststoreType(KeystoreType.valueOf(truststoreType.toUpperCase())); + } + if (!StringUtils.isBlank(truststorePasswd)) { + clientConfigBuilder.truststorePassword(truststorePasswd); + } + } + + final NiFiRegistryClientConfig clientConfig = clientConfigBuilder.build(); + this.client = new JerseyNiFiRegistryClient.Builder().config(clientConfig).build(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/FlowMapper.java ---------------------------------------------------------------------- diff --git a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/FlowMapper.java b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/FlowMapper.java new file mode 100644 index 0000000..ed01e26 --- /dev/null +++ b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/FlowMapper.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.c2.core.service.flow.mapping; + +import org.apache.nifi.minifi.c2.model.FlowUri; + +import java.util.Optional; + +/** + * Provides mappings from an agent class to the URI of a versioned flow. + */ +public interface FlowMapper { + + /** + * Gets the flow mapping information for the given agent class. + * + * @param agentClassName the name of an agent class + * @return the flow mapping information for the given agent class + * @throws FlowMapperException if an error occurs getting the mapping + */ + Optional<FlowUri> getFlowMapping(String agentClassName) throws FlowMapperException; + +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/FlowMapperException.java ---------------------------------------------------------------------- diff --git a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/FlowMapperException.java b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/FlowMapperException.java new file mode 100644 index 0000000..db99b12 --- /dev/null +++ b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/FlowMapperException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.c2.core.service.flow.mapping; + +/** + * Exception produced by a FlowMapper if an error occurs retrieving flow mappings. + */ +public class FlowMapperException extends Exception { + + public FlowMapperException(String message) { + super(message); + } + + public FlowMapperException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/FlowMapperFactory.java ---------------------------------------------------------------------- diff --git a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/FlowMapperFactory.java b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/FlowMapperFactory.java new file mode 100644 index 0000000..2888ab7 --- /dev/null +++ b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/FlowMapperFactory.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.c2.core.service.flow.mapping; + +import org.apache.commons.lang3.Validate; +import org.apache.nifi.minifi.c2.core.service.flow.client.NiFiRegistryClientFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * Factory that controls the creation of the FlowMapper bean. This factory doesn't follow the standard factory approach + * of having a method annotated with @Bean because we want to lazily create the FlowMapper and not fail start-up if + * the C2 server wasn't configured with a registry URL or bucket id. + * + * If we want to change implementations, or provide wrapped impls, the initializeFlowMapper() method can be modified. + */ +@Component +public class FlowMapperFactory { + + private static final Logger LOGGER = LoggerFactory.getLogger(FlowMapperFactory.class); + + private final NiFiRegistryClientFactory niFiRegistryClientFactory; + + private volatile FlowMapper flowMapper; + + @Autowired + public FlowMapperFactory(final NiFiRegistryClientFactory niFiRegistryClientFactory) { + this.niFiRegistryClientFactory = niFiRegistryClientFactory; + Validate.notNull(this.niFiRegistryClientFactory); + } + + /** + * Lazily creates a FlowMapper. + * + * @return the FlowMapper instance + */ + public FlowMapper getFlowMapper() { + if (flowMapper == null) { + initializeFlowMapper(); + } + + return this.flowMapper; + } + + private synchronized void initializeFlowMapper() { + if (this.flowMapper != null) { + return; + } + + this.flowMapper = new SimpleFlowMapper(niFiRegistryClientFactory); + } + +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/SimpleFlowMapper.java ---------------------------------------------------------------------- diff --git a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/SimpleFlowMapper.java b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/SimpleFlowMapper.java new file mode 100644 index 0000000..e5bfb80 --- /dev/null +++ b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/SimpleFlowMapper.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.c2.core.service.flow.mapping; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.Validate; +import org.apache.nifi.minifi.c2.core.service.flow.client.NiFiRegistryClientFactory; +import org.apache.nifi.minifi.c2.model.FlowUri; +import org.apache.nifi.registry.client.FlowClient; +import org.apache.nifi.registry.client.NiFiRegistryClient; +import org.apache.nifi.registry.flow.VersionedFlow; + +import java.util.List; +import java.util.Optional; + +/** + * A simple FlowMapper implementation that will return FlowMappings where the registry URL and bucket are fixed + * values that are passed in from C2 properties, and the flow name follows a convention of being the agent class name. + */ +public class SimpleFlowMapper implements FlowMapper { + + private final NiFiRegistryClientFactory clientFactory; + + public SimpleFlowMapper(final NiFiRegistryClientFactory clientFactory) { + this.clientFactory = clientFactory; + Validate.notNull(this.clientFactory); + } + + @Override + public Optional<FlowUri> getFlowMapping(final String agentClass) throws FlowMapperException { + if (StringUtils.isBlank(agentClass)) { + throw new IllegalArgumentException("Agent class cannot be null or blank"); + } + + try { + final String registryUrl = clientFactory.getNiFiRegistryUrl(); + final String bucketId = clientFactory.getNiFiRegistryBucketId(); + + final NiFiRegistryClient client = clientFactory.getClient(); + final FlowClient flowClient = client.getFlowClient(); + final List<VersionedFlow> flows = flowClient.getByBucket(bucketId); + + final Optional<String> flowId = flows.stream() + .filter(f -> f.getName().equals(agentClass)) + .map(f -> f.getIdentifier()) + .findFirst(); + + if (flowId.isPresent()) { + final FlowUri flowUri = new FlowUri(registryUrl, bucketId, flowId.get()); + return Optional.of(flowUri); + } else { + return Optional.empty(); + } + + } catch (final Exception e) { + throw new FlowMapperException("Unable to get flow mapping for " + agentClass + " due to " + e.getMessage(), e); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/ClassNotMappedException.java ---------------------------------------------------------------------- diff --git a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/ClassNotMappedException.java b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/ClassNotMappedException.java new file mode 100644 index 0000000..1be5b8f --- /dev/null +++ b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/ClassNotMappedException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.c2.core.service.flow.retrieval; + +/** + * Exception thrown when there is no flow mapping for a requested agent class. + */ +public class ClassNotMappedException extends Exception { + + public ClassNotMappedException(String message) { + super(message); + } + + public ClassNotMappedException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/FlowRetrievalException.java ---------------------------------------------------------------------- diff --git a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/FlowRetrievalException.java b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/FlowRetrievalException.java new file mode 100644 index 0000000..7c15201 --- /dev/null +++ b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/FlowRetrievalException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.c2.core.service.flow.retrieval; + +/** + * Exception that is thrown for general errors encountered by {@link FlowRetrievalService}. + */ +public class FlowRetrievalException extends Exception { + + public FlowRetrievalException(String message) { + super(message); + } + + public FlowRetrievalException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/FlowRetrievalService.java ---------------------------------------------------------------------- diff --git a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/FlowRetrievalService.java b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/FlowRetrievalService.java new file mode 100644 index 0000000..157b6e2 --- /dev/null +++ b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/FlowRetrievalService.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.c2.core.service.flow.retrieval; + +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; +import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; + +import java.io.IOException; +import java.util.List; + +/** + * A service used to retrieve versioned flows from NiFi registries based on a given agent class. + * + * The implementations of this service know about a given set of mappings from agent class to versioned flow. + */ +public interface FlowRetrievalService { + + /** + * Gets the available versions for the given agent class. + * + * @param agentClassName the name of the agent class + * @return the list of available versions + * @throws IOException if an I/O error occurs retrieving the flow versions + * @throws ClassNotMappedException if there is no flow mapping found for the given agent class + * @throws FlowRetrievalException if any other error is encountered attempting to retrieve the flow versions + */ + List<VersionedFlowSnapshotMetadata> getVersions(String agentClassName) throws IOException, ClassNotMappedException, FlowRetrievalException; + + /** + * Gets the specific version of a flow for the given agent class. + * + * @param agentClassName the name of an agent class + * @param version the version of the flow to get + * @return the flow for the given class and version + * @throws IOException if an I/O error occurs retrieving the flow + * @throws ClassNotMappedException if there is no flow mapping found for the given agent class + * @throws FlowRetrievalException if any other error is encountered attempting to retrieve the flow version + */ + VersionedFlowSnapshot getFlow(String agentClassName, int version) throws IOException, ClassNotMappedException, FlowRetrievalException; + + /** + * Gets the latest flow for the given agent class. + * + * @param agentClassName the name of an agent class + * @return the latest flow + * @throws IOException if an I/O error occurs retrieving the flow + * @throws ClassNotMappedException if there is no flow mapping found for the given agent class + * @throws FlowRetrievalException if any other error is encountered attempting to retrieve the flow version + */ + VersionedFlowSnapshot getLatestFlow(String agentClassName) throws IOException, ClassNotMappedException, FlowRetrievalException; + +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/StandardFlowRetrievalService.java ---------------------------------------------------------------------- diff --git a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/StandardFlowRetrievalService.java b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/StandardFlowRetrievalService.java new file mode 100644 index 0000000..e726035 --- /dev/null +++ b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/StandardFlowRetrievalService.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.c2.core.service.flow.retrieval; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.Validate; +import org.apache.nifi.minifi.c2.core.service.flow.client.NiFiRegistryClientFactory; +import org.apache.nifi.minifi.c2.core.service.flow.mapping.FlowMapper; +import org.apache.nifi.minifi.c2.core.service.flow.mapping.FlowMapperException; +import org.apache.nifi.minifi.c2.core.service.flow.mapping.FlowMapperFactory; +import org.apache.nifi.minifi.c2.model.FlowUri; +import org.apache.nifi.registry.client.NiFiRegistryClient; +import org.apache.nifi.registry.client.NiFiRegistryException; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; +import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +/** + * Standard implementation of FlowRetrievalService. + * + * NOTE: Currently there is an assumption that a given C2 server only interacts with a single NiFi Registry instance, so + * even though the FlowMapping instances coming from the FlowMapper have registry URL, it is assumed for now that this + * URL will always be the same and thus we can avoid maintaining multiple registry clients for now. + */ +@Service +public class StandardFlowRetrievalService implements FlowRetrievalService { + + private static final Logger LOGGER = LoggerFactory.getLogger(StandardFlowRetrievalService.class); + + private final FlowMapperFactory flowMapperFactory; + private final NiFiRegistryClientFactory clientFactory; + + @Autowired + public StandardFlowRetrievalService(final FlowMapperFactory flowMapperFactory, + final NiFiRegistryClientFactory clientFactory) { + this.flowMapperFactory = flowMapperFactory; + this.clientFactory = clientFactory; + Validate.notNull(this.flowMapperFactory); + Validate.notNull(this.clientFactory); + } + + @Override + public List<VersionedFlowSnapshotMetadata> getVersions(final String agentClass) + throws IOException, ClassNotMappedException, FlowRetrievalException { + + if (StringUtils.isBlank(agentClass)) { + throw new IllegalArgumentException("Agent class cannot be null or blank"); + } + + final FlowUri flowUri = getFlowUri(agentClass); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Getting flow versions for {} using flow mapping {}", new Object[] {agentClass, flowUri}); + } + + final NiFiRegistryClient client = clientFactory.getClient(); + try { + return client.getFlowSnapshotClient().getSnapshotMetadata(flowUri.getBucketId(), flowUri.getFlowId()); + } catch (IOException ioe) { + throw ioe; + } catch (NiFiRegistryException nre) { + throw new FlowRetrievalException("Error retrieving flow versions for " + agentClass + ": " + nre.getMessage(), nre); + } + } + + @Override + public VersionedFlowSnapshot getFlow(final String agentClass, final int version) + throws IOException, ClassNotMappedException, FlowRetrievalException { + + if (StringUtils.isBlank(agentClass)) { + throw new IllegalArgumentException("Agent class cannot be null or blank"); + } + + if (version < 1) { + throw new IllegalArgumentException("Version must be greater than or equal to 1"); + } + + final FlowUri flowUri = getFlowUri(agentClass); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Getting flow for {} using flow mapping {} and version {}", new Object[] {agentClass, flowUri, Integer.valueOf(version)}); + } + + final NiFiRegistryClient client = clientFactory.getClient(); + try { + return client.getFlowSnapshotClient().get(flowUri.getBucketId(), flowUri.getFlowId(), version); + } catch (IOException ioe) { + throw ioe; + } catch (NiFiRegistryException nre) { + throw new FlowRetrievalException("Error retrieving flow for " + agentClass + ": " + nre.getMessage(), nre); + } + } + + @Override + public VersionedFlowSnapshot getLatestFlow(final String agentClass) + throws IOException, ClassNotMappedException, FlowRetrievalException { + + if (StringUtils.isBlank(agentClass)) { + throw new IllegalArgumentException("Agent class cannot be null or blank"); + } + + final FlowUri flowUri = getFlowUri(agentClass); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Getting latest flow for {} using flow mapping {}", new Object[] {agentClass, flowUri}); + } + + final NiFiRegistryClient client = clientFactory.getClient(); + try { + return client.getFlowSnapshotClient().getLatest(flowUri.getBucketId(), flowUri.getFlowId()); + } catch (IOException ioe) { + throw ioe; + } catch (NiFiRegistryException nre) { + throw new FlowRetrievalException("Error retrieving latest flow for " + agentClass + ": " + nre.getMessage(), nre); + } + } + + /** + * Gets the FlowMapping for the given agent class. + * + * @param agentClass the class to get the flow mapping for + * @return the FlowUri for the agent class if one exists + * @throws FlowRetrievalException if an error occurs obtaining the FlowMapping from the FlowMapper + * @throws ClassNotMappedException if the FlowMapper does not contain a mapping for the given agent class + */ + private FlowUri getFlowUri(final String agentClass) throws FlowRetrievalException, ClassNotMappedException { + final Optional<FlowUri> flowUri; + try { + final FlowMapper flowMapper = flowMapperFactory.getFlowMapper(); + flowUri = flowMapper.getFlowMapping(agentClass); + } catch (FlowMapperException fme) { + throw new FlowRetrievalException("Error retrieving flow mapping for " + agentClass, fme); + } + + if (flowUri.isPresent()) { + return flowUri.get(); + } else { + throw new ClassNotMappedException("No flow mapping exists for " + agentClass); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-framework/src/test/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/TestSimpleFlowMapper.java ---------------------------------------------------------------------- diff --git a/minifi-c2/minifi-c2-framework/src/test/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/TestSimpleFlowMapper.java b/minifi-c2/minifi-c2-framework/src/test/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/TestSimpleFlowMapper.java new file mode 100644 index 0000000..81ff781 --- /dev/null +++ b/minifi-c2/minifi-c2-framework/src/test/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/TestSimpleFlowMapper.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.c2.core.service.flow.mapping; + +import org.apache.nifi.minifi.c2.core.service.flow.client.NiFiRegistryClientFactory; +import org.apache.nifi.minifi.c2.model.FlowUri; +import org.apache.nifi.registry.client.FlowClient; +import org.apache.nifi.registry.client.NiFiRegistryClient; +import org.apache.nifi.registry.client.NiFiRegistryException; +import org.apache.nifi.registry.flow.VersionedFlow; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.Optional; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestSimpleFlowMapper { + + static final String REGISTRY_URL = "http://localhost:18080"; + static final String BUCKET_ID = UUID.randomUUID().toString(); + + private NiFiRegistryClient client; + private FlowClient flowClient; + + private NiFiRegistryClientFactory clientFactory; + + private FlowMapper flowMapper; + + @Before + public void setup() { + flowClient = mock(FlowClient.class); + + client = mock(NiFiRegistryClient.class); + when(client.getFlowClient()).thenReturn(flowClient); + + clientFactory = mock(NiFiRegistryClientFactory.class); + when(clientFactory.getClient()).thenReturn(client); + when(clientFactory.getNiFiRegistryUrl()).thenReturn(REGISTRY_URL); + when(clientFactory.getNiFiRegistryBucketId()).thenReturn(BUCKET_ID); + + flowMapper = new SimpleFlowMapper(clientFactory); + } + + @Test + public void testGetFlowMappingWhenAgentClassExists() throws FlowMapperException, IOException, NiFiRegistryException { + final String agentClass = "Class A"; + + final VersionedFlow versionedFlow = new VersionedFlow(); + versionedFlow.setName(agentClass); + versionedFlow.setIdentifier(UUID.randomUUID().toString()); + versionedFlow.setBucketIdentifier(BUCKET_ID); + + when(flowClient.getByBucket(BUCKET_ID)).thenReturn(Collections.singletonList(versionedFlow)); + + final Optional<FlowUri> flowUri = flowMapper.getFlowMapping(agentClass); + assertNotNull(flowUri); + assertTrue(flowUri.isPresent()); + assertEquals(REGISTRY_URL, flowUri.get().getRegistryUrl()); + assertEquals(BUCKET_ID, flowUri.get().getBucketId()); + assertEquals(versionedFlow.getIdentifier(), flowUri.get().getFlowId()); + } + + @Test + public void testGetFlowMappingWhenAgentClassDoesNotExist() throws FlowMapperException, IOException, NiFiRegistryException { + final String agentClass = "Class A"; + + final VersionedFlow versionedFlow = new VersionedFlow(); + versionedFlow.setName("SOME-OTHER-FLOW"); + versionedFlow.setIdentifier(UUID.randomUUID().toString()); + versionedFlow.setBucketIdentifier(BUCKET_ID); + + when(flowClient.getByBucket(BUCKET_ID)).thenReturn(Collections.singletonList(versionedFlow)); + + final Optional<FlowUri> flowUri = flowMapper.getFlowMapping(agentClass); + assertNotNull(flowUri); + assertFalse(flowUri.isPresent()); + } + + @Test(expected = FlowMapperException.class) + public void testGetFlowMappingWhenExceptionHappens() throws FlowMapperException, IOException, NiFiRegistryException { + final String agentClass = "Class A"; + when(flowClient.getByBucket(BUCKET_ID)).thenThrow(new NiFiRegistryException("Bucket not found")); + flowMapper.getFlowMapping(agentClass); + } + +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-framework/src/test/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/StandardFlowRetrievalServiceIT.java ---------------------------------------------------------------------- diff --git a/minifi-c2/minifi-c2-framework/src/test/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/StandardFlowRetrievalServiceIT.java b/minifi-c2/minifi-c2-framework/src/test/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/StandardFlowRetrievalServiceIT.java new file mode 100644 index 0000000..1f709aa --- /dev/null +++ b/minifi-c2/minifi-c2-framework/src/test/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/StandardFlowRetrievalServiceIT.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.c2.core.service.flow.retrieval; + +import org.apache.nifi.minifi.c2.core.service.flow.client.NiFiRegistryClientFactory; +import org.apache.nifi.minifi.c2.core.service.flow.mapping.FlowMapperFactory; +import org.apache.nifi.minifi.c2.properties.C2Properties; +import org.apache.nifi.registry.bucket.Bucket; +import org.apache.nifi.registry.client.NiFiRegistryClient; +import org.apache.nifi.registry.client.NiFiRegistryException; +import org.apache.nifi.registry.flow.VersionedFlow; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; +import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; +import org.apache.nifi.registry.flow.VersionedProcessGroup; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; + +public class StandardFlowRetrievalServiceIT { + + private static final String CLASS_A = "Class A"; + private static final String CLASS_B = "Class B"; + + private static StandardFlowRetrievalService flowRetrievalService; + + @BeforeClass + public static void setup() throws IOException, NiFiRegistryException { + final C2Properties c2Properties = new C2Properties(); + c2Properties.setProperty(C2Properties.NIFI_REGISTRY_URL, "http://localhost:18080"); + + final NiFiRegistryClientFactory niFiRegistryClientFactory = new NiFiRegistryClientFactory(c2Properties); + final NiFiRegistryClient niFiRegistryClient = niFiRegistryClientFactory.getClient(); + + // Create a bucket to use for this test + final Bucket bucket = new Bucket(); + bucket.setName("C2 Flow Retrieval IT - " + System.currentTimeMillis()); + + // Set the id of the created bucket into the C2 properties before we create the FlowMapperFactory + final Bucket createdBucket = niFiRegistryClient.getBucketClient().create(bucket); + c2Properties.setProperty(C2Properties.NIFI_REGISTRY_BUCKET_ID, createdBucket.getIdentifier()); + + final FlowMapperFactory flowMapperFactory = new FlowMapperFactory(niFiRegistryClientFactory); + flowRetrievalService = new StandardFlowRetrievalService(flowMapperFactory, niFiRegistryClientFactory); + + // Create a flow for Class A + final VersionedFlow classAFlow = new VersionedFlow(); + classAFlow.setBucketIdentifier(createdBucket.getIdentifier()); + classAFlow.setName(CLASS_A); + + final VersionedFlow createdClassAFlow = niFiRegistryClient.getFlowClient().create(classAFlow); + + // Create a snapshot #1 for Class A + final VersionedFlowSnapshot classASnapshot1 = createSnapshot(createdClassAFlow, 1); + niFiRegistryClient.getFlowSnapshotClient().create(classASnapshot1); + + // Create a snapshot #2 for Class A + final VersionedFlowSnapshot classASnapshot2 = createSnapshot(createdClassAFlow, 2); + niFiRegistryClient.getFlowSnapshotClient().create(classASnapshot2); + + // Create a flow for Class B + final VersionedFlow classBFlow = new VersionedFlow(); + classBFlow.setBucketIdentifier(createdBucket.getIdentifier()); + classBFlow.setName(CLASS_B); + niFiRegistryClient.getFlowClient().create(classBFlow); + + // Don't create any versions for Class B so we can test what happens when no versions exist + } + + @Test + public void testRetrieveLatestWhenVersionsExist() throws ClassNotMappedException, IOException, FlowRetrievalException { + final VersionedFlowSnapshot versionedFlowSnapshot = flowRetrievalService.getLatestFlow(CLASS_A); + Assert.assertNotNull(versionedFlowSnapshot); + Assert.assertEquals(2, versionedFlowSnapshot.getSnapshotMetadata().getVersion()); + } + + @Test(expected = FlowRetrievalException.class) + public void testRetrieveLatestWhenNoVersions() throws ClassNotMappedException, IOException, FlowRetrievalException { + flowRetrievalService.getLatestFlow(CLASS_B); + } + + @Test(expected = ClassNotMappedException.class) + public void testRetrieveLatestWhenNoFlowForClass() throws ClassNotMappedException, IOException, FlowRetrievalException { + flowRetrievalService.getLatestFlow("DOES-NOT-EXIST"); + } + + @Test + public void testRetrieveSpecificWhenVersionsExist() throws ClassNotMappedException, IOException, FlowRetrievalException { + final VersionedFlowSnapshot versionedFlowSnapshot = flowRetrievalService.getFlow(CLASS_A, 1); + Assert.assertNotNull(versionedFlowSnapshot); + Assert.assertEquals(1, versionedFlowSnapshot.getSnapshotMetadata().getVersion()); + } + + @Test(expected = FlowRetrievalException.class) + public void testRetrieveSpecificWhenNoVersions() throws ClassNotMappedException, IOException, FlowRetrievalException { + flowRetrievalService.getFlow(CLASS_B, 1); + } + + @Test(expected = ClassNotMappedException.class) + public void testRetrieveSpecificWhenNoFlowForClass() throws ClassNotMappedException, IOException, FlowRetrievalException { + flowRetrievalService.getFlow("DOES-NOT-EXIST", 1); + } + + @Test + public void testRetrieveVersionsListWhenVersionsExist() throws ClassNotMappedException, IOException, FlowRetrievalException { + final List<VersionedFlowSnapshotMetadata> versions = flowRetrievalService.getVersions(CLASS_A); + Assert.assertNotNull(versions); + Assert.assertEquals(2, versions.size()); + } + + @Test + public void testRetrieveVersionsListWhenNoVersions() throws ClassNotMappedException, IOException, FlowRetrievalException { + final List<VersionedFlowSnapshotMetadata> versions = flowRetrievalService.getVersions(CLASS_B); + Assert.assertNotNull(versions); + Assert.assertEquals(0, versions.size()); + } + + @Test(expected = ClassNotMappedException.class) + public void testRetrieveVersionsListWhenNoFlowForClass() throws ClassNotMappedException, IOException, FlowRetrievalException { + flowRetrievalService.getVersions("DOES-NOT-EXIST"); + } + + private static VersionedFlowSnapshot createSnapshot(final VersionedFlow versionedFlow, int num) { + final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata(); + snapshotMetadata.setBucketIdentifier(versionedFlow.getBucketIdentifier()); + snapshotMetadata.setFlowIdentifier(versionedFlow.getIdentifier()); + snapshotMetadata.setVersion(num); + snapshotMetadata.setComments("This is snapshot #" + num); + + final VersionedProcessGroup rootProcessGroup = new VersionedProcessGroup(); + rootProcessGroup.setIdentifier("root-pg"); + rootProcessGroup.setName("Root Process Group"); + + final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot(); + snapshot.setSnapshotMetadata(snapshotMetadata); + snapshot.setFlowContents(rootProcessGroup); + return snapshot; + } + + +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-framework/src/test/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/TestStandardFlowRetrievalService.java ---------------------------------------------------------------------- diff --git a/minifi-c2/minifi-c2-framework/src/test/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/TestStandardFlowRetrievalService.java b/minifi-c2/minifi-c2-framework/src/test/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/TestStandardFlowRetrievalService.java new file mode 100644 index 0000000..7bee09e --- /dev/null +++ b/minifi-c2/minifi-c2-framework/src/test/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/TestStandardFlowRetrievalService.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.c2.core.service.flow.retrieval; + +import org.apache.nifi.minifi.c2.core.service.flow.client.NiFiRegistryClientFactory; +import org.apache.nifi.minifi.c2.core.service.flow.mapping.FlowMapper; +import org.apache.nifi.minifi.c2.core.service.flow.mapping.FlowMapperException; +import org.apache.nifi.minifi.c2.core.service.flow.mapping.FlowMapperFactory; +import org.apache.nifi.minifi.c2.model.FlowUri; +import org.apache.nifi.registry.client.FlowSnapshotClient; +import org.apache.nifi.registry.client.NiFiRegistryClient; +import org.apache.nifi.registry.client.NiFiRegistryException; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; +import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata; +import org.apache.nifi.registry.flow.VersionedProcessGroup; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestStandardFlowRetrievalService { + + private static final String CLASS_A = "Class A"; + private static final String CLASS_B = "Class B"; + + private FlowMapperFactory flowMapperFactory; + private FlowMapper flowMapper; + + private NiFiRegistryClientFactory clientFactory; + private NiFiRegistryClient client; + private FlowSnapshotClient flowSnapshotClient; + + private FlowRetrievalService flowRetrievalService; + + @Before + public void setup() { + flowMapper = mock(FlowMapper.class); + + flowMapperFactory = mock(FlowMapperFactory.class); + when(flowMapperFactory.getFlowMapper()).thenReturn(flowMapper); + + flowSnapshotClient = mock(FlowSnapshotClient.class); + + client = mock(NiFiRegistryClient.class); + when(client.getFlowSnapshotClient()).thenReturn(flowSnapshotClient); + + clientFactory = mock(NiFiRegistryClientFactory.class); + when(clientFactory.getClient()).thenReturn(client); + + flowRetrievalService = new StandardFlowRetrievalService(flowMapperFactory, clientFactory); + } + + @Test + public void testRetrieveLatestWhenVersionsExist() throws ClassNotMappedException, IOException, FlowRetrievalException, FlowMapperException, NiFiRegistryException { + final FlowUri flowUri = new FlowUri("http://localhost:18080", UUID.randomUUID().toString(), UUID.randomUUID().toString()); + when(flowMapper.getFlowMapping(CLASS_A)).thenReturn(Optional.of(flowUri)); + + final VersionedFlowSnapshot flowSnapshot = createSnapshot(flowUri.getBucketId(), flowUri.getFlowId(), 2); + when(flowSnapshotClient.getLatest(flowUri.getBucketId(), flowUri.getFlowId())).thenReturn(flowSnapshot); + + final VersionedFlowSnapshot returnedFlowSnapshot = flowRetrievalService.getLatestFlow(CLASS_A); + assertNotNull(returnedFlowSnapshot); + assertEquals(flowSnapshot.getSnapshotMetadata().getBucketIdentifier(), returnedFlowSnapshot.getSnapshotMetadata().getBucketIdentifier()); + assertEquals(flowSnapshot.getSnapshotMetadata().getFlowIdentifier(), returnedFlowSnapshot.getSnapshotMetadata().getFlowIdentifier()); + assertEquals(flowSnapshot.getSnapshotMetadata().getVersion(), returnedFlowSnapshot.getSnapshotMetadata().getVersion()); + } + + @Test(expected = FlowRetrievalException.class) + public void testRetrieveLatestWhenNoVersions() throws ClassNotMappedException, IOException, FlowRetrievalException, FlowMapperException, NiFiRegistryException { + final FlowUri flowUri = new FlowUri("http://localhost:18080", UUID.randomUUID().toString(), UUID.randomUUID().toString()); + when(flowMapper.getFlowMapping(CLASS_A)).thenReturn(Optional.of(flowUri)); + + when(flowSnapshotClient.getLatest(flowUri.getBucketId(), flowUri.getFlowId())).thenThrow(new NiFiRegistryException("No Versions")); + + flowRetrievalService.getLatestFlow(CLASS_A); + } + + @Test(expected = ClassNotMappedException.class) + public void testRetrieveLatestWhenNoFlowForClass() throws ClassNotMappedException, IOException, FlowRetrievalException, FlowMapperException { + when(flowMapper.getFlowMapping(CLASS_A)).thenReturn(Optional.empty()); + flowRetrievalService.getLatestFlow(CLASS_A); + } + + @Test + public void testRetrieveSpecificWhenVersionsExist() throws ClassNotMappedException, IOException, FlowRetrievalException, NiFiRegistryException, FlowMapperException { + final FlowUri flowUri = new FlowUri("http://localhost:18080", UUID.randomUUID().toString(), UUID.randomUUID().toString()); + when(flowMapper.getFlowMapping(CLASS_A)).thenReturn(Optional.of(flowUri)); + + final int version = 2; + final VersionedFlowSnapshot flowSnapshot = createSnapshot(flowUri.getBucketId(), flowUri.getFlowId(), version); + when(flowSnapshotClient.get(flowUri.getBucketId(), flowUri.getFlowId(), version)).thenReturn(flowSnapshot); + + final VersionedFlowSnapshot returnedFlowSnapshot = flowRetrievalService.getFlow(CLASS_A, version); + assertNotNull(returnedFlowSnapshot); + assertEquals(flowSnapshot.getSnapshotMetadata().getBucketIdentifier(), returnedFlowSnapshot.getSnapshotMetadata().getBucketIdentifier()); + assertEquals(flowSnapshot.getSnapshotMetadata().getFlowIdentifier(), returnedFlowSnapshot.getSnapshotMetadata().getFlowIdentifier()); + assertEquals(flowSnapshot.getSnapshotMetadata().getVersion(), returnedFlowSnapshot.getSnapshotMetadata().getVersion()); + } + + @Test(expected = FlowRetrievalException.class) + public void testRetrieveSpecificWhenNoVersions() throws ClassNotMappedException, IOException, FlowRetrievalException, NiFiRegistryException, FlowMapperException { + final FlowUri flowUri = new FlowUri("http://localhost:18080", UUID.randomUUID().toString(), UUID.randomUUID().toString()); + when(flowMapper.getFlowMapping(CLASS_A)).thenReturn(Optional.of(flowUri)); + + final int version = 2; + when(flowSnapshotClient.get(flowUri.getBucketId(), flowUri.getFlowId(), version)).thenThrow(new NiFiRegistryException("No versions")); + + flowRetrievalService.getFlow(CLASS_A, version); + } + + @Test(expected = ClassNotMappedException.class) + public void testRetrieveSpecificWhenNoFlowForClass() throws ClassNotMappedException, IOException, FlowRetrievalException, FlowMapperException { + when(flowMapper.getFlowMapping(CLASS_A)).thenReturn(Optional.empty()); + flowRetrievalService.getFlow(CLASS_A, 2); + } + + @Test + public void testRetrieveVersionsListWhenVersionsExist() throws ClassNotMappedException, IOException, FlowRetrievalException, FlowMapperException, NiFiRegistryException { + final FlowUri flowUri = new FlowUri("http://localhost:18080", UUID.randomUUID().toString(), UUID.randomUUID().toString()); + when(flowMapper.getFlowMapping(CLASS_A)).thenReturn(Optional.of(flowUri)); + + final VersionedFlowSnapshot flowSnapshot1 = createSnapshot(flowUri.getBucketId(), flowUri.getFlowId(), 1); + final VersionedFlowSnapshot flowSnapshot2 = createSnapshot(flowUri.getBucketId(), flowUri.getFlowId(), 2); + + when(flowSnapshotClient.getSnapshotMetadata(flowUri.getBucketId(), flowUri.getFlowId())) + .thenReturn(Arrays.asList( + flowSnapshot1.getSnapshotMetadata(), + flowSnapshot2.getSnapshotMetadata())); + + final List<VersionedFlowSnapshotMetadata> returnedVersions = flowRetrievalService.getVersions(CLASS_A); + assertNotNull(returnedVersions); + assertEquals(2, returnedVersions.size()); + } + + @Test + public void testRetrieveVersionsListWhenNoVersions() throws ClassNotMappedException, IOException, FlowRetrievalException, NiFiRegistryException, FlowMapperException { + final FlowUri flowUri = new FlowUri("http://localhost:18080", UUID.randomUUID().toString(), UUID.randomUUID().toString()); + when(flowMapper.getFlowMapping(CLASS_A)).thenReturn(Optional.of(flowUri)); + + when(flowSnapshotClient.getSnapshotMetadata(flowUri.getBucketId(), flowUri.getFlowId())).thenReturn(Collections.emptyList()); + + final List<VersionedFlowSnapshotMetadata> returnedVersions = flowRetrievalService.getVersions(CLASS_A); + assertNotNull(returnedVersions); + assertEquals(0, returnedVersions.size()); + } + + @Test(expected = ClassNotMappedException.class) + public void testRetrieveVersionsListWhenNoFlowForClass() throws ClassNotMappedException, IOException, FlowRetrievalException, FlowMapperException { + when(flowMapper.getFlowMapping(CLASS_A)).thenReturn(Optional.empty()); + flowRetrievalService.getVersions(CLASS_A); + } + + private static VersionedFlowSnapshot createSnapshot(final String bucketId, final String flowId, int version) { + final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata(); + snapshotMetadata.setBucketIdentifier(bucketId); + snapshotMetadata.setFlowIdentifier(flowId); + snapshotMetadata.setVersion(version); + snapshotMetadata.setComments("This is snapshot #" + version); + + final VersionedProcessGroup rootProcessGroup = new VersionedProcessGroup(); + rootProcessGroup.setIdentifier("root-pg"); + rootProcessGroup.setName("Root Process Group"); + + final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot(); + snapshot.setSnapshotMetadata(snapshotMetadata); + snapshot.setFlowContents(rootProcessGroup); + return snapshot; + } + +} http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-framework/src/test/resources/minifi-c2.properties ---------------------------------------------------------------------- diff --git a/minifi-c2/minifi-c2-framework/src/test/resources/minifi-c2.properties b/minifi-c2/minifi-c2-framework/src/test/resources/minifi-c2.properties new file mode 100644 index 0000000..97aed2a --- /dev/null +++ b/minifi-c2/minifi-c2-framework/src/test/resources/minifi-c2.properties @@ -0,0 +1,35 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +minifi.c2.server.web.war.directory=./lib +minifi.c2.server.web.host=localhost +minifi.c2.server.web.port=10080 +minifi.c2.server.web.jetty.working.directory=./work/jetty +minifi.c2.server.web.jetty.threads=200 + +minifi.c2.server.security.tls.enabled=false +minifi.c2.server.security.keystore= +minifi.c2.server.security.keystoreType= +minifi.c2.server.security.keystorePasswd= +minifi.c2.server.security.keyPasswd= +minifi.c2.server.security.truststore= +minifi.c2.server.security.truststoreType= +minifi.c2.server.security.truststorePasswd= + +# The base URL of a NiFi Registry instance, such as https://localhost:18443 +minifi.c2.server.nifi.registry.url= +minifi.c2.server.nifi.registry.bucket.id= http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-web-api/pom.xml ---------------------------------------------------------------------- diff --git a/minifi-c2/minifi-c2-web-api/pom.xml b/minifi-c2/minifi-c2-web-api/pom.xml index 046984a..f4132a4 100644 --- a/minifi-c2/minifi-c2-web-api/pom.xml +++ b/minifi-c2/minifi-c2-web-api/pom.xml @@ -210,7 +210,7 @@ limitations under the License. <dependency> <groupId>org.apache.nifi.registry</groupId> <artifactId>nifi-registry-security-utils</artifactId> - <version>0.1.0</version> + <version>${nifi.registry.version}</version> </dependency> <!-- Spring dependencies --> http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/pom.xml ---------------------------------------------------------------------- diff --git a/minifi-c2/pom.xml b/minifi-c2/pom.xml index 904a50d..0b81fc0 100644 --- a/minifi-c2/pom.xml +++ b/minifi-c2/pom.xml @@ -42,6 +42,7 @@ limitations under the License. <properties> <io.swagger.version>1.5.18</io.swagger.version> + <nifi.registry.version>0.1.0</nifi.registry.version> </properties> <dependencyManagement>