Repository: nifi-minifi
Updated Branches:
  refs/heads/minifi-c2-server 631b59926 -> 7ab8fe7db


MINIFI-447 - Adding FlowMapper and FlowRetrievalService

This closes #119.

Signed-off-by: Kevin Doran <kdo...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi/commit/7ab8fe7d
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi/tree/7ab8fe7d
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi/diff/7ab8fe7d

Branch: refs/heads/minifi-c2-server
Commit: 7ab8fe7db92a1ad1afd714364219376df5d81d01
Parents: 631b599
Author: Bryan Bende <bbe...@apache.org>
Authored: Fri Mar 16 11:08:16 2018 -0400
Committer: Kevin Doran <kdo...@apache.org>
Committed: Fri Mar 23 14:19:31 2018 -0400

----------------------------------------------------------------------
 minifi-c2/minifi-c2-assembly/pom.xml            |   3 +
 .../main/resources/conf/minifi-c2.properties    |   4 +
 .../nifi/minifi/c2/properties/C2Properties.java |  11 ++
 minifi-c2/minifi-c2-framework/pom.xml           |   7 +-
 .../flow/client/NiFiRegistryClientFactory.java  | 126 ++++++++++++
 .../core/service/flow/mapping/FlowMapper.java   |  37 ++++
 .../flow/mapping/FlowMapperException.java       |  31 +++
 .../service/flow/mapping/FlowMapperFactory.java |  69 +++++++
 .../service/flow/mapping/SimpleFlowMapper.java  |  74 +++++++
 .../flow/retrieval/ClassNotMappedException.java |  31 +++
 .../flow/retrieval/FlowRetrievalException.java  |  31 +++
 .../flow/retrieval/FlowRetrievalService.java    |  66 +++++++
 .../retrieval/StandardFlowRetrievalService.java | 160 +++++++++++++++
 .../flow/mapping/TestSimpleFlowMapper.java      | 109 +++++++++++
 .../StandardFlowRetrievalServiceIT.java         | 157 +++++++++++++++
 .../TestStandardFlowRetrievalService.java       | 194 +++++++++++++++++++
 .../src/test/resources/minifi-c2.properties     |  35 ++++
 minifi-c2/minifi-c2-web-api/pom.xml             |   2 +-
 minifi-c2/pom.xml                               |   1 +
 19 files changed, 1146 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-c2/minifi-c2-assembly/pom.xml 
b/minifi-c2/minifi-c2-assembly/pom.xml
index 3f55ac2..02f07f0 100644
--- a/minifi-c2/minifi-c2-assembly/pom.xml
+++ b/minifi-c2/minifi-c2-assembly/pom.xml
@@ -38,6 +38,9 @@ limitations under the License.
         <minifi.c2.server.truststorePasswd />
 
         
<minifi.c2.server.provider.nifi.rest.api.url>http://localhost:8080/nifi-api</minifi.c2.server.provider.nifi.rest.api.url>
+
+        <minifi.c2.server.nifi.registry.url />
+        <minifi.c2.server.nifi.registry.bucket.id />
     </properties>
     <build>
         <plugins>

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-assembly/src/main/resources/conf/minifi-c2.properties
----------------------------------------------------------------------
diff --git 
a/minifi-c2/minifi-c2-assembly/src/main/resources/conf/minifi-c2.properties 
b/minifi-c2/minifi-c2-assembly/src/main/resources/conf/minifi-c2.properties
index 2836f03..fcdf0b5 100644
--- a/minifi-c2/minifi-c2-assembly/src/main/resources/conf/minifi-c2.properties
+++ b/minifi-c2/minifi-c2-assembly/src/main/resources/conf/minifi-c2.properties
@@ -29,3 +29,7 @@ minifi.c2.server.security.keyPasswd=
 minifi.c2.server.security.truststore=
 minifi.c2.server.security.truststoreType=
 minifi.c2.server.security.truststorePasswd=
+
+# The base URL of a NiFi Registry instance, such as https://localhost:18443
+minifi.c2.server.nifi.registry.url=${minifi.c2.server.nifi.registry.url}
+minifi.c2.server.nifi.registry.bucket.id=${minifi.c2.server.nifi.registry.bucket.id}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-commons/src/main/java/org/apache/nifi/minifi/c2/properties/C2Properties.java
----------------------------------------------------------------------
diff --git 
a/minifi-c2/minifi-c2-commons/src/main/java/org/apache/nifi/minifi/c2/properties/C2Properties.java
 
b/minifi-c2/minifi-c2-commons/src/main/java/org/apache/nifi/minifi/c2/properties/C2Properties.java
index e240139..55fc02e 100644
--- 
a/minifi-c2/minifi-c2-commons/src/main/java/org/apache/nifi/minifi/c2/properties/C2Properties.java
+++ 
b/minifi-c2/minifi-c2-commons/src/main/java/org/apache/nifi/minifi/c2/properties/C2Properties.java
@@ -51,6 +51,10 @@ public class C2Properties extends Properties {
     public static final String SECURITY_IDENTITY_MAPPING_PATTERN_PREFIX = 
"minifi.c2.server.security.identity.mapping.pattern.";
     public static final String SECURITY_IDENTITY_MAPPING_VALUE_PREFIX = 
"minifi.c2.server.security.identity.mapping.value.";
 
+    // NiFi Registry properties
+    public static final String NIFI_REGISTRY_URL = 
"minifi.c2.server.nifi.registry.url";
+    public static final String NIFI_REGISTRY_BUCKET_ID = 
"minifi.c2.server.nifi.registry.bucket.id";
+
     // Default Values
     public static final String DEFAULT_WEB_WORKING_DIR = "./work/jetty";
     public static final String DEFAULT_WAR_DIR = "./lib";
@@ -142,6 +146,13 @@ public class C2Properties extends Properties {
         return getProperty(SECURITY_TLS_TRUSTSTORE_PASSWD);
     }
 
+    public String getNifiRegistryUrl() {
+        return getProperty(NIFI_REGISTRY_URL);
+    }
+
+    public String getNifiRegistryBucketId() {
+        return getProperty(NIFI_REGISTRY_BUCKET_ID);
+    }
 
     // Helper functions for common ways of interpreting property values
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-framework/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-c2/minifi-c2-framework/pom.xml 
b/minifi-c2/minifi-c2-framework/pom.xml
index 464dd0e..f0d7198 100644
--- a/minifi-c2/minifi-c2-framework/pom.xml
+++ b/minifi-c2/minifi-c2-framework/pom.xml
@@ -46,7 +46,12 @@ limitations under the License.
         <dependency>
             <groupId>org.apache.nifi.registry</groupId>
             <artifactId>nifi-registry-security-api</artifactId>
-            <version>0.1.0</version>
+            <version>${nifi.registry.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi.registry</groupId>
+            <artifactId>nifi-registry-client</artifactId>
+            <version>${nifi.registry.version}</version>
         </dependency>
 
         <dependency>

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/client/NiFiRegistryClientFactory.java
----------------------------------------------------------------------
diff --git 
a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/client/NiFiRegistryClientFactory.java
 
b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/client/NiFiRegistryClientFactory.java
new file mode 100644
index 0000000..b521bb6
--- /dev/null
+++ 
b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/client/NiFiRegistryClientFactory.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.core.service.flow.client;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.Validate;
+import org.apache.nifi.minifi.c2.properties.C2Properties;
+import org.apache.nifi.registry.client.NiFiRegistryClient;
+import org.apache.nifi.registry.client.NiFiRegistryClientConfig;
+import org.apache.nifi.registry.client.impl.JerseyNiFiRegistryClient;
+import org.apache.nifi.registry.security.util.KeystoreType;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * This class does not follow the typical "factory bean" pattern of having a 
method annotated with @Bean because
+ * we want to support running a C2 server that may not be configured to do 
flow deployments, which means we don't want
+ * to create a NiFiRegistryClient during start-up because the registry URL may 
not be populated.
+ *
+ * An instance of this class can be injected into other services that need a 
NiFiRegistryClient, and if those services
+ * get called then they can use this class to lazily obtain an instance, which 
can then throw an exception if the
+ * appropriate configuration is not provided.
+ */
+@Component
+public class NiFiRegistryClientFactory {
+
+    private volatile NiFiRegistryClient client;
+
+    private final C2Properties c2Properties;
+
+    @Autowired
+    public NiFiRegistryClientFactory(final C2Properties c2Properties) {
+        this.c2Properties = c2Properties;
+        Validate.notNull(this.c2Properties);
+    }
+
+    public String getNiFiRegistryUrl() {
+        return c2Properties.getNifiRegistryUrl();
+    }
+
+    public String getNiFiRegistryBucketId() {
+        return c2Properties.getNifiRegistryBucketId();
+    }
+
+    /**
+     * Lazily creates a NiFiRegistryClient the first time this method is 
called.
+     *
+     * @return the NiFiRegistryClient held by this factory
+     */
+    public NiFiRegistryClient getClient() {
+        if (client == null) {
+            initializeClient();
+        }
+        return client;
+    }
+
+    private synchronized void initializeClient() {
+        // make sure another thread hasn't initialized the client before we 
got into this synchronized method
+        if (client != null) {
+            return;
+        }
+
+        final String url = c2Properties.getNifiRegistryUrl();
+        if (StringUtils.isBlank(url)) {
+            throw new IllegalStateException("Unable to create NiFi Registry 
Client because NiFi Registry URL was not provided");
+        }
+
+        final String keystore = c2Properties.getKeyStorePath();
+        final String keystoreType = c2Properties.getKeyStoreType();
+        final String keystorePasswd = c2Properties.getKeyStorePassword();
+        final String keyPasswd = c2Properties.getKeyPassword();
+
+        final String truststore = c2Properties.getTrustStorePath();
+        final String truststoreType = c2Properties.getTrustStoreType();
+        final String truststorePasswd = c2Properties.getTrustStorePassword();
+
+        final boolean secureUrl = url.startsWith("https");
+        if (secureUrl && (StringUtils.isBlank(keystore) || 
StringUtils.isBlank(truststore))) {
+            throw new IllegalStateException("Keystore and truststore must be 
provided when NiFi Registry URL is secure");
+        }
+
+        final NiFiRegistryClientConfig.Builder clientConfigBuilder = new 
NiFiRegistryClientConfig.Builder().baseUrl(url);
+
+        if (secureUrl) {
+            if (!StringUtils.isBlank(keystore)) {
+                clientConfigBuilder.keystoreFilename(keystore);
+            }
+            if (!StringUtils.isBlank(keystoreType)) {
+                
clientConfigBuilder.keystoreType(KeystoreType.valueOf(keystoreType.toUpperCase()));
+            }
+            if (!StringUtils.isBlank(keystorePasswd)) {
+                clientConfigBuilder.keystorePassword(keystorePasswd);
+            }
+            if (!StringUtils.isBlank(keyPasswd)) {
+                clientConfigBuilder.keyPassword(keyPasswd);
+            }
+            if (!StringUtils.isBlank(truststore)) {
+                clientConfigBuilder.truststoreFilename(truststore);
+            }
+            if (!StringUtils.isBlank(truststoreType)) {
+                
clientConfigBuilder.truststoreType(KeystoreType.valueOf(truststoreType.toUpperCase()));
+            }
+            if (!StringUtils.isBlank(truststorePasswd)) {
+                clientConfigBuilder.truststorePassword(truststorePasswd);
+            }
+        }
+
+        final NiFiRegistryClientConfig clientConfig = 
clientConfigBuilder.build();
+        this.client = new 
JerseyNiFiRegistryClient.Builder().config(clientConfig).build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/FlowMapper.java
----------------------------------------------------------------------
diff --git 
a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/FlowMapper.java
 
b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/FlowMapper.java
new file mode 100644
index 0000000..ed01e26
--- /dev/null
+++ 
b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/FlowMapper.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.core.service.flow.mapping;
+
+import org.apache.nifi.minifi.c2.model.FlowUri;
+
+import java.util.Optional;
+
+/**
+ * Provides mappings from an agent class to the URI of a versioned flow.
+ */
+public interface FlowMapper {
+
+    /**
+     * Gets the flow mapping information for the given agent class.
+     *
+     * @param agentClassName the name of an agent class
+     * @return the flow mapping information for the given agent class
+     * @throws FlowMapperException if an error occurs getting the mapping
+     */
+    Optional<FlowUri> getFlowMapping(String agentClassName) throws 
FlowMapperException;
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/FlowMapperException.java
----------------------------------------------------------------------
diff --git 
a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/FlowMapperException.java
 
b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/FlowMapperException.java
new file mode 100644
index 0000000..db99b12
--- /dev/null
+++ 
b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/FlowMapperException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.core.service.flow.mapping;
+
+/**
+ * Exception produced by a FlowMapper if an error occurs retrieving flow 
mappings.
+ */
+public class FlowMapperException extends Exception {
+
+    public FlowMapperException(String message) {
+        super(message);
+    }
+
+    public FlowMapperException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/FlowMapperFactory.java
----------------------------------------------------------------------
diff --git 
a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/FlowMapperFactory.java
 
b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/FlowMapperFactory.java
new file mode 100644
index 0000000..2888ab7
--- /dev/null
+++ 
b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/FlowMapperFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.core.service.flow.mapping;
+
+import org.apache.commons.lang3.Validate;
+import 
org.apache.nifi.minifi.c2.core.service.flow.client.NiFiRegistryClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * Factory that controls the creation of the FlowMapper bean. This factory 
doesn't follow the standard factory approach
+ * of having a method annotated with @Bean because we want to lazily create 
the FlowMapper and not fail start-up if
+ * the C2 server wasn't configured with a registry URL or bucket id.
+ *
+ * If we want to change implementations, or provide wrapped impls, the 
initializeFlowMapper() method can be modified.
+ */
+@Component
+public class FlowMapperFactory {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(FlowMapperFactory.class);
+
+    private final NiFiRegistryClientFactory niFiRegistryClientFactory;
+
+    private volatile FlowMapper flowMapper;
+
+    @Autowired
+    public FlowMapperFactory(final NiFiRegistryClientFactory 
niFiRegistryClientFactory) {
+        this.niFiRegistryClientFactory = niFiRegistryClientFactory;
+        Validate.notNull(this.niFiRegistryClientFactory);
+    }
+
+    /**
+     * Lazily creates a FlowMapper.
+     *
+     * @return the FlowMapper instance
+     */
+    public FlowMapper getFlowMapper() {
+        if (flowMapper == null) {
+            initializeFlowMapper();
+        }
+
+        return this.flowMapper;
+    }
+
+    private synchronized void initializeFlowMapper() {
+        if (this.flowMapper != null) {
+            return;
+        }
+
+        this.flowMapper = new SimpleFlowMapper(niFiRegistryClientFactory);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/SimpleFlowMapper.java
----------------------------------------------------------------------
diff --git 
a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/SimpleFlowMapper.java
 
b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/SimpleFlowMapper.java
new file mode 100644
index 0000000..e5bfb80
--- /dev/null
+++ 
b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/SimpleFlowMapper.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.core.service.flow.mapping;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.Validate;
+import 
org.apache.nifi.minifi.c2.core.service.flow.client.NiFiRegistryClientFactory;
+import org.apache.nifi.minifi.c2.model.FlowUri;
+import org.apache.nifi.registry.client.FlowClient;
+import org.apache.nifi.registry.client.NiFiRegistryClient;
+import org.apache.nifi.registry.flow.VersionedFlow;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * A simple FlowMapper implementation that will return FlowMappings where the 
registry URL and bucket are fixed
+ * values that are passed in from C2 properties, and the flow name follows a 
convention of being the agent class name.
+ */
+public class SimpleFlowMapper implements FlowMapper {
+
+    private final NiFiRegistryClientFactory clientFactory;
+
+    public SimpleFlowMapper(final NiFiRegistryClientFactory clientFactory) {
+        this.clientFactory = clientFactory;
+        Validate.notNull(this.clientFactory);
+    }
+
+    @Override
+    public Optional<FlowUri> getFlowMapping(final String agentClass) throws 
FlowMapperException {
+        if (StringUtils.isBlank(agentClass)) {
+            throw new IllegalArgumentException("Agent class cannot be null or 
blank");
+        }
+
+        try {
+            final String registryUrl = clientFactory.getNiFiRegistryUrl();
+            final String bucketId = clientFactory.getNiFiRegistryBucketId();
+
+            final NiFiRegistryClient client = clientFactory.getClient();
+            final FlowClient flowClient = client.getFlowClient();
+            final List<VersionedFlow> flows = flowClient.getByBucket(bucketId);
+
+            final Optional<String> flowId = flows.stream()
+                    .filter(f -> f.getName().equals(agentClass))
+                    .map(f -> f.getIdentifier())
+                    .findFirst();
+
+            if (flowId.isPresent()) {
+                final FlowUri flowUri = new FlowUri(registryUrl, bucketId, 
flowId.get());
+                return Optional.of(flowUri);
+            } else {
+                return Optional.empty();
+            }
+
+        } catch (final Exception e) {
+            throw new FlowMapperException("Unable to get flow mapping for " + 
agentClass + " due to " + e.getMessage(), e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/ClassNotMappedException.java
----------------------------------------------------------------------
diff --git 
a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/ClassNotMappedException.java
 
b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/ClassNotMappedException.java
new file mode 100644
index 0000000..1be5b8f
--- /dev/null
+++ 
b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/ClassNotMappedException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.core.service.flow.retrieval;
+
+/**
+ * Exception thrown when there is no flow mapping for a requested agent class.
+ */
+public class ClassNotMappedException extends Exception {
+
+    public ClassNotMappedException(String message) {
+        super(message);
+    }
+
+    public ClassNotMappedException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/FlowRetrievalException.java
----------------------------------------------------------------------
diff --git 
a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/FlowRetrievalException.java
 
b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/FlowRetrievalException.java
new file mode 100644
index 0000000..7c15201
--- /dev/null
+++ 
b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/FlowRetrievalException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.core.service.flow.retrieval;
+
+/**
+ * Exception that is thrown for general errors encountered by {@link 
FlowRetrievalService}.
+ */
+public class FlowRetrievalException extends Exception {
+
+    public FlowRetrievalException(String message) {
+        super(message);
+    }
+
+    public FlowRetrievalException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/FlowRetrievalService.java
----------------------------------------------------------------------
diff --git 
a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/FlowRetrievalService.java
 
b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/FlowRetrievalService.java
new file mode 100644
index 0000000..157b6e2
--- /dev/null
+++ 
b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/FlowRetrievalService.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.core.service.flow.retrieval;
+
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A service used to retrieve versioned flows from NiFi registries based on a 
given agent class.
+ *
+ * The implementations of this service know about a given set of mappings from 
agent class to versioned flow.
+ */
+public interface FlowRetrievalService {
+
+    /**
+     * Gets the available versions for the given agent class.
+     *
+     * @param agentClassName the name of the agent class
+     * @return the list of available versions
+     * @throws IOException if an I/O error occurs retrieving the flow versions
+     * @throws ClassNotMappedException if there is no flow mapping found for 
the given agent class
+     * @throws FlowRetrievalException if any other error is encountered 
attempting to retrieve the flow versions
+     */
+    List<VersionedFlowSnapshotMetadata> getVersions(String agentClassName) 
throws IOException, ClassNotMappedException, FlowRetrievalException;
+
+    /**
+     * Gets the specific version of a flow for the given agent class.
+     *
+     * @param agentClassName the name of an agent class
+     * @param version the version of the flow to get
+     * @return the flow for the given class and version
+     * @throws IOException if an I/O error occurs retrieving the flow
+     * @throws ClassNotMappedException if there is no flow mapping found for 
the given agent class
+     * @throws FlowRetrievalException if any other error is encountered 
attempting to retrieve the flow version
+     */
+    VersionedFlowSnapshot getFlow(String agentClassName, int version) throws 
IOException, ClassNotMappedException, FlowRetrievalException;
+
+    /**
+     * Gets the latest flow for the given agent class.
+     *
+     * @param agentClassName the name of an agent class
+     * @return the latest flow
+     * @throws IOException if an I/O error occurs retrieving the flow
+     * @throws ClassNotMappedException if there is no flow mapping found for 
the given agent class
+     * @throws FlowRetrievalException if any other error is encountered 
attempting to retrieve the flow version
+     */
+    VersionedFlowSnapshot getLatestFlow(String agentClassName) throws 
IOException, ClassNotMappedException, FlowRetrievalException;
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/StandardFlowRetrievalService.java
----------------------------------------------------------------------
diff --git 
a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/StandardFlowRetrievalService.java
 
b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/StandardFlowRetrievalService.java
new file mode 100644
index 0000000..e726035
--- /dev/null
+++ 
b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/StandardFlowRetrievalService.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.core.service.flow.retrieval;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.Validate;
+import 
org.apache.nifi.minifi.c2.core.service.flow.client.NiFiRegistryClientFactory;
+import org.apache.nifi.minifi.c2.core.service.flow.mapping.FlowMapper;
+import org.apache.nifi.minifi.c2.core.service.flow.mapping.FlowMapperException;
+import org.apache.nifi.minifi.c2.core.service.flow.mapping.FlowMapperFactory;
+import org.apache.nifi.minifi.c2.model.FlowUri;
+import org.apache.nifi.registry.client.NiFiRegistryClient;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Standard implementation of FlowRetrievalService.
+ *
+ * NOTE: Currently there is an assumption that a given C2 server only 
interacts with a single NiFi Registry instance, so
+ * even though the FlowMapping instances coming from the FlowMapper have 
registry URL, it is assumed for now that this
+ * URL will always be the same and thus we can avoid maintaining multiple 
registry clients for now.
+ */
+@Service
+public class StandardFlowRetrievalService implements FlowRetrievalService {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(StandardFlowRetrievalService.class);
+
+    private final FlowMapperFactory flowMapperFactory;
+    private final NiFiRegistryClientFactory clientFactory;
+
+    @Autowired
+    public StandardFlowRetrievalService(final FlowMapperFactory 
flowMapperFactory,
+                                        final NiFiRegistryClientFactory 
clientFactory) {
+        this.flowMapperFactory = flowMapperFactory;
+        this.clientFactory = clientFactory;
+        Validate.notNull(this.flowMapperFactory);
+        Validate.notNull(this.clientFactory);
+    }
+
+    @Override
+    public List<VersionedFlowSnapshotMetadata> getVersions(final String 
agentClass)
+            throws IOException, ClassNotMappedException, 
FlowRetrievalException {
+
+        if (StringUtils.isBlank(agentClass)) {
+            throw new IllegalArgumentException("Agent class cannot be null or 
blank");
+        }
+
+        final FlowUri flowUri = getFlowUri(agentClass);
+        if (LOGGER.isTraceEnabled()) {
+            LOGGER.trace("Getting flow versions for {} using flow mapping {}", 
new Object[] {agentClass, flowUri});
+        }
+
+        final NiFiRegistryClient client = clientFactory.getClient();
+        try {
+            return 
client.getFlowSnapshotClient().getSnapshotMetadata(flowUri.getBucketId(), 
flowUri.getFlowId());
+        } catch (IOException ioe) {
+            throw ioe;
+        } catch (NiFiRegistryException nre) {
+            throw new FlowRetrievalException("Error retrieving flow versions 
for " + agentClass + ": " + nre.getMessage(), nre);
+        }
+    }
+
+    @Override
+    public VersionedFlowSnapshot getFlow(final String agentClass, final int 
version)
+            throws IOException, ClassNotMappedException, 
FlowRetrievalException {
+
+        if (StringUtils.isBlank(agentClass)) {
+            throw new IllegalArgumentException("Agent class cannot be null or 
blank");
+        }
+
+        if (version < 1) {
+            throw new IllegalArgumentException("Version must be greater than 
or equal to 1");
+        }
+
+        final FlowUri flowUri = getFlowUri(agentClass);
+        if (LOGGER.isTraceEnabled()) {
+            LOGGER.trace("Getting flow for {} using flow mapping {} and 
version {}", new Object[] {agentClass, flowUri, Integer.valueOf(version)});
+        }
+
+        final NiFiRegistryClient client = clientFactory.getClient();
+        try {
+            return client.getFlowSnapshotClient().get(flowUri.getBucketId(), 
flowUri.getFlowId(), version);
+        } catch (IOException ioe) {
+            throw ioe;
+        } catch (NiFiRegistryException nre) {
+            throw new FlowRetrievalException("Error retrieving flow for " + 
agentClass + ": " + nre.getMessage(), nre);
+        }
+    }
+
+    @Override
+    public VersionedFlowSnapshot getLatestFlow(final String agentClass)
+            throws IOException, ClassNotMappedException, 
FlowRetrievalException {
+
+        if (StringUtils.isBlank(agentClass)) {
+            throw new IllegalArgumentException("Agent class cannot be null or 
blank");
+        }
+
+        final FlowUri flowUri = getFlowUri(agentClass);
+        if (LOGGER.isTraceEnabled()) {
+            LOGGER.trace("Getting latest flow for {} using flow mapping {}", 
new Object[] {agentClass, flowUri});
+        }
+
+        final NiFiRegistryClient client = clientFactory.getClient();
+        try {
+            return 
client.getFlowSnapshotClient().getLatest(flowUri.getBucketId(), 
flowUri.getFlowId());
+        } catch (IOException ioe) {
+            throw ioe;
+        } catch (NiFiRegistryException nre) {
+            throw new FlowRetrievalException("Error retrieving latest flow for 
" + agentClass + ": " + nre.getMessage(), nre);
+        }
+    }
+
+    /**
+     * Gets the FlowMapping for the given agent class.
+     *
+     * @param agentClass the class to get the flow mapping for
+     * @return the FlowUri for the agent class if one exists
+     * @throws FlowRetrievalException if an error occurs obtaining the 
FlowMapping from the FlowMapper
+     * @throws ClassNotMappedException if the FlowMapper does not contain a 
mapping for the given agent class
+     */
+    private FlowUri getFlowUri(final String agentClass) throws 
FlowRetrievalException, ClassNotMappedException {
+        final Optional<FlowUri> flowUri;
+        try {
+            final FlowMapper flowMapper = flowMapperFactory.getFlowMapper();
+            flowUri = flowMapper.getFlowMapping(agentClass);
+        } catch (FlowMapperException fme) {
+            throw new FlowRetrievalException("Error retrieving flow mapping 
for " + agentClass, fme);
+        }
+
+        if (flowUri.isPresent()) {
+            return flowUri.get();
+        } else {
+            throw new ClassNotMappedException("No flow mapping exists for " + 
agentClass);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-framework/src/test/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/TestSimpleFlowMapper.java
----------------------------------------------------------------------
diff --git 
a/minifi-c2/minifi-c2-framework/src/test/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/TestSimpleFlowMapper.java
 
b/minifi-c2/minifi-c2-framework/src/test/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/TestSimpleFlowMapper.java
new file mode 100644
index 0000000..81ff781
--- /dev/null
+++ 
b/minifi-c2/minifi-c2-framework/src/test/java/org/apache/nifi/minifi/c2/core/service/flow/mapping/TestSimpleFlowMapper.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.core.service.flow.mapping;
+
+import 
org.apache.nifi.minifi.c2.core.service.flow.client.NiFiRegistryClientFactory;
+import org.apache.nifi.minifi.c2.model.FlowUri;
+import org.apache.nifi.registry.client.FlowClient;
+import org.apache.nifi.registry.client.NiFiRegistryClient;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestSimpleFlowMapper {
+
+    static final String REGISTRY_URL = "http://localhost:18080";;
+    static final String BUCKET_ID = UUID.randomUUID().toString();
+
+    private NiFiRegistryClient client;
+    private FlowClient flowClient;
+
+    private NiFiRegistryClientFactory clientFactory;
+
+    private FlowMapper flowMapper;
+
+    @Before
+    public void setup() {
+        flowClient = mock(FlowClient.class);
+
+        client = mock(NiFiRegistryClient.class);
+        when(client.getFlowClient()).thenReturn(flowClient);
+
+        clientFactory = mock(NiFiRegistryClientFactory.class);
+        when(clientFactory.getClient()).thenReturn(client);
+        when(clientFactory.getNiFiRegistryUrl()).thenReturn(REGISTRY_URL);
+        when(clientFactory.getNiFiRegistryBucketId()).thenReturn(BUCKET_ID);
+
+        flowMapper = new SimpleFlowMapper(clientFactory);
+    }
+
+    @Test
+    public void testGetFlowMappingWhenAgentClassExists() throws 
FlowMapperException, IOException, NiFiRegistryException {
+        final String agentClass = "Class A";
+
+        final VersionedFlow versionedFlow = new VersionedFlow();
+        versionedFlow.setName(agentClass);
+        versionedFlow.setIdentifier(UUID.randomUUID().toString());
+        versionedFlow.setBucketIdentifier(BUCKET_ID);
+
+        
when(flowClient.getByBucket(BUCKET_ID)).thenReturn(Collections.singletonList(versionedFlow));
+
+        final Optional<FlowUri> flowUri = 
flowMapper.getFlowMapping(agentClass);
+        assertNotNull(flowUri);
+        assertTrue(flowUri.isPresent());
+        assertEquals(REGISTRY_URL, flowUri.get().getRegistryUrl());
+        assertEquals(BUCKET_ID, flowUri.get().getBucketId());
+        assertEquals(versionedFlow.getIdentifier(), flowUri.get().getFlowId());
+    }
+
+    @Test
+    public void testGetFlowMappingWhenAgentClassDoesNotExist() throws 
FlowMapperException, IOException, NiFiRegistryException {
+        final String agentClass = "Class A";
+
+        final VersionedFlow versionedFlow = new VersionedFlow();
+        versionedFlow.setName("SOME-OTHER-FLOW");
+        versionedFlow.setIdentifier(UUID.randomUUID().toString());
+        versionedFlow.setBucketIdentifier(BUCKET_ID);
+
+        
when(flowClient.getByBucket(BUCKET_ID)).thenReturn(Collections.singletonList(versionedFlow));
+
+        final Optional<FlowUri> flowUri = 
flowMapper.getFlowMapping(agentClass);
+        assertNotNull(flowUri);
+        assertFalse(flowUri.isPresent());
+    }
+
+    @Test(expected = FlowMapperException.class)
+    public void testGetFlowMappingWhenExceptionHappens() throws 
FlowMapperException, IOException, NiFiRegistryException {
+        final String agentClass = "Class A";
+        when(flowClient.getByBucket(BUCKET_ID)).thenThrow(new 
NiFiRegistryException("Bucket not found"));
+        flowMapper.getFlowMapping(agentClass);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-framework/src/test/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/StandardFlowRetrievalServiceIT.java
----------------------------------------------------------------------
diff --git 
a/minifi-c2/minifi-c2-framework/src/test/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/StandardFlowRetrievalServiceIT.java
 
b/minifi-c2/minifi-c2-framework/src/test/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/StandardFlowRetrievalServiceIT.java
new file mode 100644
index 0000000..1f709aa
--- /dev/null
+++ 
b/minifi-c2/minifi-c2-framework/src/test/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/StandardFlowRetrievalServiceIT.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.core.service.flow.retrieval;
+
+import 
org.apache.nifi.minifi.c2.core.service.flow.client.NiFiRegistryClientFactory;
+import org.apache.nifi.minifi.c2.core.service.flow.mapping.FlowMapperFactory;
+import org.apache.nifi.minifi.c2.properties.C2Properties;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.client.NiFiRegistryClient;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+public class StandardFlowRetrievalServiceIT {
+
+    private static final String CLASS_A = "Class A";
+    private static final String CLASS_B = "Class B";
+
+    private static StandardFlowRetrievalService flowRetrievalService;
+
+    @BeforeClass
+    public static void setup() throws IOException, NiFiRegistryException {
+        final C2Properties c2Properties = new C2Properties();
+        c2Properties.setProperty(C2Properties.NIFI_REGISTRY_URL, 
"http://localhost:18080";);
+
+        final NiFiRegistryClientFactory niFiRegistryClientFactory = new 
NiFiRegistryClientFactory(c2Properties);
+        final NiFiRegistryClient niFiRegistryClient = 
niFiRegistryClientFactory.getClient();
+
+        // Create a bucket to use for this test
+        final Bucket bucket = new Bucket();
+        bucket.setName("C2 Flow Retrieval IT - " + System.currentTimeMillis());
+
+        // Set the id of the created bucket into the C2 properties before we 
create the FlowMapperFactory
+        final Bucket createdBucket = 
niFiRegistryClient.getBucketClient().create(bucket);
+        c2Properties.setProperty(C2Properties.NIFI_REGISTRY_BUCKET_ID, 
createdBucket.getIdentifier());
+
+        final FlowMapperFactory flowMapperFactory = new 
FlowMapperFactory(niFiRegistryClientFactory);
+        flowRetrievalService = new 
StandardFlowRetrievalService(flowMapperFactory, niFiRegistryClientFactory);
+
+        // Create a flow for Class A
+        final VersionedFlow classAFlow = new VersionedFlow();
+        classAFlow.setBucketIdentifier(createdBucket.getIdentifier());
+        classAFlow.setName(CLASS_A);
+
+        final VersionedFlow createdClassAFlow = 
niFiRegistryClient.getFlowClient().create(classAFlow);
+
+        // Create a snapshot #1 for Class A
+        final VersionedFlowSnapshot classASnapshot1 = 
createSnapshot(createdClassAFlow, 1);
+        niFiRegistryClient.getFlowSnapshotClient().create(classASnapshot1);
+
+        // Create a snapshot #2 for Class A
+        final VersionedFlowSnapshot classASnapshot2 = 
createSnapshot(createdClassAFlow, 2);
+        niFiRegistryClient.getFlowSnapshotClient().create(classASnapshot2);
+
+        // Create a flow for Class B
+        final VersionedFlow classBFlow = new VersionedFlow();
+        classBFlow.setBucketIdentifier(createdBucket.getIdentifier());
+        classBFlow.setName(CLASS_B);
+        niFiRegistryClient.getFlowClient().create(classBFlow);
+
+        // Don't create any versions for Class B so we can test what happens 
when no versions exist
+    }
+
+    @Test
+    public void testRetrieveLatestWhenVersionsExist() throws 
ClassNotMappedException, IOException, FlowRetrievalException {
+        final VersionedFlowSnapshot versionedFlowSnapshot = 
flowRetrievalService.getLatestFlow(CLASS_A);
+        Assert.assertNotNull(versionedFlowSnapshot);
+        Assert.assertEquals(2, 
versionedFlowSnapshot.getSnapshotMetadata().getVersion());
+    }
+
+    @Test(expected = FlowRetrievalException.class)
+    public void testRetrieveLatestWhenNoVersions() throws 
ClassNotMappedException, IOException, FlowRetrievalException {
+        flowRetrievalService.getLatestFlow(CLASS_B);
+    }
+
+    @Test(expected = ClassNotMappedException.class)
+    public void testRetrieveLatestWhenNoFlowForClass() throws 
ClassNotMappedException, IOException, FlowRetrievalException {
+        flowRetrievalService.getLatestFlow("DOES-NOT-EXIST");
+    }
+
+    @Test
+    public void testRetrieveSpecificWhenVersionsExist() throws 
ClassNotMappedException, IOException, FlowRetrievalException {
+        final VersionedFlowSnapshot versionedFlowSnapshot = 
flowRetrievalService.getFlow(CLASS_A, 1);
+        Assert.assertNotNull(versionedFlowSnapshot);
+        Assert.assertEquals(1, 
versionedFlowSnapshot.getSnapshotMetadata().getVersion());
+    }
+
+    @Test(expected = FlowRetrievalException.class)
+    public void testRetrieveSpecificWhenNoVersions() throws 
ClassNotMappedException, IOException, FlowRetrievalException {
+        flowRetrievalService.getFlow(CLASS_B, 1);
+    }
+
+    @Test(expected = ClassNotMappedException.class)
+    public void testRetrieveSpecificWhenNoFlowForClass() throws 
ClassNotMappedException, IOException, FlowRetrievalException {
+        flowRetrievalService.getFlow("DOES-NOT-EXIST", 1);
+    }
+
+    @Test
+    public void testRetrieveVersionsListWhenVersionsExist() throws 
ClassNotMappedException, IOException, FlowRetrievalException {
+        final List<VersionedFlowSnapshotMetadata> versions = 
flowRetrievalService.getVersions(CLASS_A);
+        Assert.assertNotNull(versions);
+        Assert.assertEquals(2, versions.size());
+    }
+
+    @Test
+    public void testRetrieveVersionsListWhenNoVersions() throws 
ClassNotMappedException, IOException, FlowRetrievalException {
+        final List<VersionedFlowSnapshotMetadata> versions = 
flowRetrievalService.getVersions(CLASS_B);
+        Assert.assertNotNull(versions);
+        Assert.assertEquals(0, versions.size());
+    }
+
+    @Test(expected = ClassNotMappedException.class)
+    public void testRetrieveVersionsListWhenNoFlowForClass() throws 
ClassNotMappedException, IOException, FlowRetrievalException {
+        flowRetrievalService.getVersions("DOES-NOT-EXIST");
+    }
+
+    private static VersionedFlowSnapshot createSnapshot(final VersionedFlow 
versionedFlow, int num) {
+        final VersionedFlowSnapshotMetadata snapshotMetadata = new 
VersionedFlowSnapshotMetadata();
+        
snapshotMetadata.setBucketIdentifier(versionedFlow.getBucketIdentifier());
+        snapshotMetadata.setFlowIdentifier(versionedFlow.getIdentifier());
+        snapshotMetadata.setVersion(num);
+        snapshotMetadata.setComments("This is snapshot #" + num);
+
+        final VersionedProcessGroup rootProcessGroup = new 
VersionedProcessGroup();
+        rootProcessGroup.setIdentifier("root-pg");
+        rootProcessGroup.setName("Root Process Group");
+
+        final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
+        snapshot.setSnapshotMetadata(snapshotMetadata);
+        snapshot.setFlowContents(rootProcessGroup);
+        return snapshot;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-framework/src/test/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/TestStandardFlowRetrievalService.java
----------------------------------------------------------------------
diff --git 
a/minifi-c2/minifi-c2-framework/src/test/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/TestStandardFlowRetrievalService.java
 
b/minifi-c2/minifi-c2-framework/src/test/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/TestStandardFlowRetrievalService.java
new file mode 100644
index 0000000..7bee09e
--- /dev/null
+++ 
b/minifi-c2/minifi-c2-framework/src/test/java/org/apache/nifi/minifi/c2/core/service/flow/retrieval/TestStandardFlowRetrievalService.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.core.service.flow.retrieval;
+
+import 
org.apache.nifi.minifi.c2.core.service.flow.client.NiFiRegistryClientFactory;
+import org.apache.nifi.minifi.c2.core.service.flow.mapping.FlowMapper;
+import org.apache.nifi.minifi.c2.core.service.flow.mapping.FlowMapperException;
+import org.apache.nifi.minifi.c2.core.service.flow.mapping.FlowMapperFactory;
+import org.apache.nifi.minifi.c2.model.FlowUri;
+import org.apache.nifi.registry.client.FlowSnapshotClient;
+import org.apache.nifi.registry.client.NiFiRegistryClient;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestStandardFlowRetrievalService {
+
+    private static final String CLASS_A = "Class A";
+    private static final String CLASS_B = "Class B";
+
+    private FlowMapperFactory flowMapperFactory;
+    private FlowMapper flowMapper;
+
+    private NiFiRegistryClientFactory clientFactory;
+    private NiFiRegistryClient client;
+    private FlowSnapshotClient flowSnapshotClient;
+
+    private FlowRetrievalService flowRetrievalService;
+
+    @Before
+    public void setup() {
+        flowMapper = mock(FlowMapper.class);
+
+        flowMapperFactory = mock(FlowMapperFactory.class);
+        when(flowMapperFactory.getFlowMapper()).thenReturn(flowMapper);
+
+        flowSnapshotClient = mock(FlowSnapshotClient.class);
+
+        client = mock(NiFiRegistryClient.class);
+        when(client.getFlowSnapshotClient()).thenReturn(flowSnapshotClient);
+
+        clientFactory = mock(NiFiRegistryClientFactory.class);
+        when(clientFactory.getClient()).thenReturn(client);
+
+        flowRetrievalService = new 
StandardFlowRetrievalService(flowMapperFactory, clientFactory);
+    }
+
+    @Test
+    public void testRetrieveLatestWhenVersionsExist() throws 
ClassNotMappedException, IOException, FlowRetrievalException, 
FlowMapperException, NiFiRegistryException {
+        final FlowUri flowUri = new FlowUri("http://localhost:18080";, 
UUID.randomUUID().toString(), UUID.randomUUID().toString());
+        
when(flowMapper.getFlowMapping(CLASS_A)).thenReturn(Optional.of(flowUri));
+
+        final VersionedFlowSnapshot flowSnapshot = 
createSnapshot(flowUri.getBucketId(), flowUri.getFlowId(), 2);
+        when(flowSnapshotClient.getLatest(flowUri.getBucketId(), 
flowUri.getFlowId())).thenReturn(flowSnapshot);
+
+        final VersionedFlowSnapshot returnedFlowSnapshot = 
flowRetrievalService.getLatestFlow(CLASS_A);
+        assertNotNull(returnedFlowSnapshot);
+        assertEquals(flowSnapshot.getSnapshotMetadata().getBucketIdentifier(), 
returnedFlowSnapshot.getSnapshotMetadata().getBucketIdentifier());
+        assertEquals(flowSnapshot.getSnapshotMetadata().getFlowIdentifier(), 
returnedFlowSnapshot.getSnapshotMetadata().getFlowIdentifier());
+        assertEquals(flowSnapshot.getSnapshotMetadata().getVersion(), 
returnedFlowSnapshot.getSnapshotMetadata().getVersion());
+    }
+
+    @Test(expected = FlowRetrievalException.class)
+    public void testRetrieveLatestWhenNoVersions() throws 
ClassNotMappedException, IOException, FlowRetrievalException, 
FlowMapperException, NiFiRegistryException {
+        final FlowUri flowUri = new FlowUri("http://localhost:18080";, 
UUID.randomUUID().toString(), UUID.randomUUID().toString());
+        
when(flowMapper.getFlowMapping(CLASS_A)).thenReturn(Optional.of(flowUri));
+
+        when(flowSnapshotClient.getLatest(flowUri.getBucketId(), 
flowUri.getFlowId())).thenThrow(new NiFiRegistryException("No Versions"));
+
+        flowRetrievalService.getLatestFlow(CLASS_A);
+    }
+
+    @Test(expected = ClassNotMappedException.class)
+    public void testRetrieveLatestWhenNoFlowForClass() throws 
ClassNotMappedException, IOException, FlowRetrievalException, 
FlowMapperException {
+        when(flowMapper.getFlowMapping(CLASS_A)).thenReturn(Optional.empty());
+        flowRetrievalService.getLatestFlow(CLASS_A);
+    }
+
+    @Test
+    public void testRetrieveSpecificWhenVersionsExist() throws 
ClassNotMappedException, IOException, FlowRetrievalException, 
NiFiRegistryException, FlowMapperException {
+        final FlowUri flowUri = new FlowUri("http://localhost:18080";, 
UUID.randomUUID().toString(), UUID.randomUUID().toString());
+        
when(flowMapper.getFlowMapping(CLASS_A)).thenReturn(Optional.of(flowUri));
+
+        final int version = 2;
+        final VersionedFlowSnapshot flowSnapshot = 
createSnapshot(flowUri.getBucketId(), flowUri.getFlowId(), version);
+        when(flowSnapshotClient.get(flowUri.getBucketId(), 
flowUri.getFlowId(), version)).thenReturn(flowSnapshot);
+
+        final VersionedFlowSnapshot returnedFlowSnapshot = 
flowRetrievalService.getFlow(CLASS_A, version);
+        assertNotNull(returnedFlowSnapshot);
+        assertEquals(flowSnapshot.getSnapshotMetadata().getBucketIdentifier(), 
returnedFlowSnapshot.getSnapshotMetadata().getBucketIdentifier());
+        assertEquals(flowSnapshot.getSnapshotMetadata().getFlowIdentifier(), 
returnedFlowSnapshot.getSnapshotMetadata().getFlowIdentifier());
+        assertEquals(flowSnapshot.getSnapshotMetadata().getVersion(), 
returnedFlowSnapshot.getSnapshotMetadata().getVersion());
+    }
+
+    @Test(expected = FlowRetrievalException.class)
+    public void testRetrieveSpecificWhenNoVersions() throws 
ClassNotMappedException, IOException, FlowRetrievalException, 
NiFiRegistryException, FlowMapperException {
+        final FlowUri flowUri = new FlowUri("http://localhost:18080";, 
UUID.randomUUID().toString(), UUID.randomUUID().toString());
+        
when(flowMapper.getFlowMapping(CLASS_A)).thenReturn(Optional.of(flowUri));
+
+        final int version = 2;
+        when(flowSnapshotClient.get(flowUri.getBucketId(), 
flowUri.getFlowId(), version)).thenThrow(new NiFiRegistryException("No 
versions"));
+
+        flowRetrievalService.getFlow(CLASS_A, version);
+    }
+
+    @Test(expected = ClassNotMappedException.class)
+    public void testRetrieveSpecificWhenNoFlowForClass() throws 
ClassNotMappedException, IOException, FlowRetrievalException, 
FlowMapperException {
+        when(flowMapper.getFlowMapping(CLASS_A)).thenReturn(Optional.empty());
+        flowRetrievalService.getFlow(CLASS_A, 2);
+    }
+
+    @Test
+    public void testRetrieveVersionsListWhenVersionsExist() throws 
ClassNotMappedException, IOException, FlowRetrievalException, 
FlowMapperException, NiFiRegistryException {
+        final FlowUri flowUri = new FlowUri("http://localhost:18080";, 
UUID.randomUUID().toString(), UUID.randomUUID().toString());
+        
when(flowMapper.getFlowMapping(CLASS_A)).thenReturn(Optional.of(flowUri));
+
+        final VersionedFlowSnapshot flowSnapshot1 = 
createSnapshot(flowUri.getBucketId(), flowUri.getFlowId(), 1);
+        final VersionedFlowSnapshot flowSnapshot2 = 
createSnapshot(flowUri.getBucketId(), flowUri.getFlowId(), 2);
+
+        when(flowSnapshotClient.getSnapshotMetadata(flowUri.getBucketId(), 
flowUri.getFlowId()))
+                .thenReturn(Arrays.asList(
+                        flowSnapshot1.getSnapshotMetadata(),
+                        flowSnapshot2.getSnapshotMetadata()));
+
+        final List<VersionedFlowSnapshotMetadata> returnedVersions = 
flowRetrievalService.getVersions(CLASS_A);
+        assertNotNull(returnedVersions);
+        assertEquals(2, returnedVersions.size());
+    }
+
+    @Test
+    public void testRetrieveVersionsListWhenNoVersions() throws 
ClassNotMappedException, IOException, FlowRetrievalException, 
NiFiRegistryException, FlowMapperException {
+        final FlowUri flowUri = new FlowUri("http://localhost:18080";, 
UUID.randomUUID().toString(), UUID.randomUUID().toString());
+        
when(flowMapper.getFlowMapping(CLASS_A)).thenReturn(Optional.of(flowUri));
+
+        when(flowSnapshotClient.getSnapshotMetadata(flowUri.getBucketId(), 
flowUri.getFlowId())).thenReturn(Collections.emptyList());
+
+        final List<VersionedFlowSnapshotMetadata> returnedVersions = 
flowRetrievalService.getVersions(CLASS_A);
+        assertNotNull(returnedVersions);
+        assertEquals(0, returnedVersions.size());
+    }
+
+    @Test(expected = ClassNotMappedException.class)
+    public void testRetrieveVersionsListWhenNoFlowForClass() throws 
ClassNotMappedException, IOException, FlowRetrievalException, 
FlowMapperException {
+        when(flowMapper.getFlowMapping(CLASS_A)).thenReturn(Optional.empty());
+        flowRetrievalService.getVersions(CLASS_A);
+    }
+
+    private static VersionedFlowSnapshot createSnapshot(final String bucketId, 
final String flowId, int version) {
+        final VersionedFlowSnapshotMetadata snapshotMetadata = new 
VersionedFlowSnapshotMetadata();
+        snapshotMetadata.setBucketIdentifier(bucketId);
+        snapshotMetadata.setFlowIdentifier(flowId);
+        snapshotMetadata.setVersion(version);
+        snapshotMetadata.setComments("This is snapshot #" + version);
+
+        final VersionedProcessGroup rootProcessGroup = new 
VersionedProcessGroup();
+        rootProcessGroup.setIdentifier("root-pg");
+        rootProcessGroup.setName("Root Process Group");
+
+        final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
+        snapshot.setSnapshotMetadata(snapshotMetadata);
+        snapshot.setFlowContents(rootProcessGroup);
+        return snapshot;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-framework/src/test/resources/minifi-c2.properties
----------------------------------------------------------------------
diff --git 
a/minifi-c2/minifi-c2-framework/src/test/resources/minifi-c2.properties 
b/minifi-c2/minifi-c2-framework/src/test/resources/minifi-c2.properties
new file mode 100644
index 0000000..97aed2a
--- /dev/null
+++ b/minifi-c2/minifi-c2-framework/src/test/resources/minifi-c2.properties
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+minifi.c2.server.web.war.directory=./lib
+minifi.c2.server.web.host=localhost
+minifi.c2.server.web.port=10080
+minifi.c2.server.web.jetty.working.directory=./work/jetty
+minifi.c2.server.web.jetty.threads=200
+
+minifi.c2.server.security.tls.enabled=false
+minifi.c2.server.security.keystore=
+minifi.c2.server.security.keystoreType=
+minifi.c2.server.security.keystorePasswd=
+minifi.c2.server.security.keyPasswd=
+minifi.c2.server.security.truststore=
+minifi.c2.server.security.truststoreType=
+minifi.c2.server.security.truststorePasswd=
+
+# The base URL of a NiFi Registry instance, such as https://localhost:18443
+minifi.c2.server.nifi.registry.url=
+minifi.c2.server.nifi.registry.bucket.id=

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/minifi-c2-web-api/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-c2/minifi-c2-web-api/pom.xml 
b/minifi-c2/minifi-c2-web-api/pom.xml
index 046984a..f4132a4 100644
--- a/minifi-c2/minifi-c2-web-api/pom.xml
+++ b/minifi-c2/minifi-c2-web-api/pom.xml
@@ -210,7 +210,7 @@ limitations under the License.
         <dependency>
             <groupId>org.apache.nifi.registry</groupId>
             <artifactId>nifi-registry-security-utils</artifactId>
-            <version>0.1.0</version>
+            <version>${nifi.registry.version}</version>
         </dependency>
 
         <!-- Spring dependencies -->

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/7ab8fe7d/minifi-c2/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-c2/pom.xml b/minifi-c2/pom.xml
index 904a50d..0b81fc0 100644
--- a/minifi-c2/pom.xml
+++ b/minifi-c2/pom.xml
@@ -42,6 +42,7 @@ limitations under the License.
 
     <properties>
         <io.swagger.version>1.5.18</io.swagger.version>
+        <nifi.registry.version>0.1.0</nifi.registry.version>
     </properties>
 
     <dependencyManagement>

Reply via email to