This is an automated email from the ASF dual-hosted git repository.

kdoran pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new b5e61109f6 NIFI-9538: Add C2 heartbeat capability to minifi-c2-service
b5e61109f6 is described below

commit b5e61109f6d5d7331d7cd0c7027ad9250c347852
Author: Matthew Burgess <mattyb...@apache.org>
AuthorDate: Wed Feb 9 13:49:23 2022 -0500

    NIFI-9538: Add C2 heartbeat capability to minifi-c2-service
    
    - Added content hash code to avoid repeatedly updating with the same flow
    - Gracefully handle agent classes and provide update URL to /config
    - Fixed JDK 8 build issue with ConfigService
    
    This closes #5755.
    
    Signed-off-by: Kevin Doran <kdo...@apache.org>
---
 .../apache/nifi/minifi/c2/api/Configuration.java   |   8 +
 minifi/minifi-c2/minifi-c2-assembly/pom.xml        |   2 +-
 .../src/main/resources/conf/authorizations.yaml    |  28 ++++
 .../FileSystemWritableConfiguration.java           |  11 ++
 .../c2/cache/s3/S3WritableConfiguration.java       |   8 +-
 .../minifi-c2-provider-nifi-rest/pom.xml           |   7 +-
 minifi/minifi-c2/minifi-c2-service/pom.xml         |  22 ++-
 .../minifi/c2/configuration/C2ResourceConfig.java  |   3 +
 .../C2JsonProvider.java}                           |  32 ++--
 .../C2JsonProviderFeature.java}                    |  23 ++-
 .../nifi/minifi/c2/service/C2ProtocolContext.java  |  96 +++++++++++
 .../C2ProtocolService.java}                        |  21 +--
 .../nifi/minifi/c2/service/ConfigService.java      | 182 +++++++++++++++++++--
 .../minifi/c2/service/SimpleC2ProtocolService.java | 127 ++++++++++++++
 minifi/pom.xml                                     |   2 +-
 15 files changed, 513 insertions(+), 59 deletions(-)

diff --git 
a/minifi/minifi-c2/minifi-c2-api/src/main/java/org/apache/nifi/minifi/c2/api/Configuration.java
 
b/minifi/minifi-c2/minifi-c2-api/src/main/java/org/apache/nifi/minifi/c2/api/Configuration.java
index 90fbdba346..3f78b52269 100644
--- 
a/minifi/minifi-c2/minifi-c2-api/src/main/java/org/apache/nifi/minifi/c2/api/Configuration.java
+++ 
b/minifi/minifi-c2/minifi-c2-api/src/main/java/org/apache/nifi/minifi/c2/api/Configuration.java
@@ -18,6 +18,7 @@
 package org.apache.nifi.minifi.c2.api;
 
 import java.io.InputStream;
+import java.net.URL;
 
 /**
  * Represents a MiNiFi configuration of a given version, format matches the 
format of the ConfigurationProvider
@@ -44,4 +45,11 @@ public interface Configuration {
      * @return an input stream to read the configuration with
      */
     InputStream getInputStream() throws ConfigurationProviderException;
+
+    /**
+     * Gets the URL of the resource
+     *
+     * @return the URL of the resource
+     */
+    URL getURL() throws ConfigurationProviderException;
 }
diff --git a/minifi/minifi-c2/minifi-c2-assembly/pom.xml 
b/minifi/minifi-c2/minifi-c2-assembly/pom.xml
index ba2553e395..148a6efe7b 100644
--- a/minifi/minifi-c2/minifi-c2-assembly/pom.xml
+++ b/minifi/minifi-c2/minifi-c2-assembly/pom.xml
@@ -26,7 +26,7 @@ limitations under the License.
     <packaging>pom</packaging>
     <description>This is the assembly of Apache MiNiFi's - Command And Control 
Server</description>
     <properties>
-        <minifi.c2.server.port>10080</minifi.c2.server.port>
+        <minifi.c2.server.port>10090</minifi.c2.server.port>
 
         <minifi.c2.server.secure>false</minifi.c2.server.secure>
         
<minifi.c2.server.keystore>./conf/keystore.jks</minifi.c2.server.keystore>
diff --git 
a/minifi/minifi-c2/minifi-c2-assembly/src/main/resources/conf/authorizations.yaml
 
b/minifi/minifi-c2/minifi-c2-assembly/src/main/resources/conf/authorizations.yaml
index 5669451940..14386e5c17 100644
--- 
a/minifi/minifi-c2/minifi-c2-assembly/src/main/resources/conf/authorizations.yaml
+++ 
b/minifi/minifi-c2/minifi-c2-assembly/src/main/resources/conf/authorizations.yaml
@@ -37,3 +37,31 @@ Paths:
     # Default authorization lets anonymous pull any config.  Remove below to 
change that.
     - Authorization: ROLE_ANONYMOUS
       Action: allow
+
+  /c2/config/heartbeat:
+    Default Action: deny
+    Actions:
+      - Authorization: CLASS_RASPI_3
+        Query Parameters:
+          class: raspi3
+        Action: allow
+      - Authorization: ROLE_SUPERUSER
+        Action: allow
+
+      # Default authorization lets anonymous pull any config.  Remove below to 
change that.
+      - Authorization: ROLE_ANONYMOUS
+        Action: allow
+
+  /c2/config/acknowledge:
+    Default Action: deny
+    Actions:
+      - Authorization: CLASS_RASPI_3
+        Query Parameters:
+          class: raspi3
+        Action: allow
+      - Authorization: ROLE_SUPERUSER
+        Action: allow
+
+      # Default authorization lets anonymous pull any config.  Remove below to 
change that.
+      - Authorization: ROLE_ANONYMOUS
+        Action: allow
diff --git 
a/minifi/minifi-c2/minifi-c2-cache/minifi-c2-cache-filesystem/src/main/java/org/apache/nifi/minifi/c2/cache/filesystem/FileSystemWritableConfiguration.java
 
b/minifi/minifi-c2/minifi-c2-cache/minifi-c2-cache-filesystem/src/main/java/org/apache/nifi/minifi/c2/cache/filesystem/FileSystemWritableConfiguration.java
index bb48dcc3df..65afe5c1fd 100644
--- 
a/minifi/minifi-c2/minifi-c2-cache/minifi-c2-cache-filesystem/src/main/java/org/apache/nifi/minifi/c2/cache/filesystem/FileSystemWritableConfiguration.java
+++ 
b/minifi/minifi-c2/minifi-c2-cache/minifi-c2-cache-filesystem/src/main/java/org/apache/nifi/minifi/c2/cache/filesystem/FileSystemWritableConfiguration.java
@@ -25,6 +25,8 @@ import 
org.apache.nifi.minifi.c2.api.util.DelegatingOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
@@ -82,6 +84,15 @@ public class FileSystemWritableConfiguration implements 
WriteableConfiguration {
         }
     }
 
+    @Override
+    public URL getURL() throws ConfigurationProviderException {
+        try {
+            return path.toUri().toURL();
+        } catch (MalformedURLException murle) {
+            throw new ConfigurationProviderException("Could not determine URL 
of " + path, murle);
+        }
+    }
+
     @Override
     public String getName() {
         return path.getFileName().toString();
diff --git 
a/minifi/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3WritableConfiguration.java
 
b/minifi/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3WritableConfiguration.java
index 4ab62c9885..7bcf0a668b 100644
--- 
a/minifi/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3WritableConfiguration.java
+++ 
b/minifi/minifi-c2/minifi-c2-cache/minifi-c2-cache-s3/src/main/java/org/apache/nifi/minifi/c2/cache/s3/S3WritableConfiguration.java
@@ -23,13 +23,14 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
 
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.URL;
 
 import org.apache.nifi.minifi.c2.api.ConfigurationProviderException;
 import org.apache.nifi.minifi.c2.api.cache.WriteableConfiguration;
 
 public class S3WritableConfiguration implements WriteableConfiguration {
 
-  private AmazonS3 s3;
+  private final AmazonS3 s3;
   private final S3Object s3Object;
   private final String version;
 
@@ -81,6 +82,11 @@ public class S3WritableConfiguration implements 
WriteableConfiguration {
     return s3Object.getObjectContent();
   }
 
+  @Override
+  public URL getURL() throws ConfigurationProviderException {
+    return s3.getUrl(s3Object.getBucketName(), s3Object.getKey());
+  }
+
   @Override
   public String getName() {
     return s3Object.getKey();
diff --git 
a/minifi/minifi-c2/minifi-c2-provider/minifi-c2-provider-nifi-rest/pom.xml 
b/minifi/minifi-c2/minifi-c2-provider/minifi-c2-provider-nifi-rest/pom.xml
index cec43376aa..bd5517fc71 100644
--- a/minifi/minifi-c2/minifi-c2-provider/minifi-c2-provider-nifi-rest/pom.xml
+++ b/minifi/minifi-c2/minifi-c2-provider/minifi-c2-provider-nifi-rest/pom.xml
@@ -15,7 +15,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 
or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
  -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <artifactId>minifi-c2-provider</artifactId>
@@ -53,6 +54,10 @@ limitations under the License.
                     <groupId>org.slf4j</groupId>
                     <artifactId>slf4j-log4j12</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>org.glassfish.jersey.media</groupId>
+                    <artifactId>jersey-media-json-jackson</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
 
diff --git a/minifi/minifi-c2/minifi-c2-service/pom.xml 
b/minifi/minifi-c2/minifi-c2-service/pom.xml
index 6fcd217313..97de3fb79e 100644
--- a/minifi/minifi-c2/minifi-c2-service/pom.xml
+++ b/minifi/minifi-c2/minifi-c2-service/pom.xml
@@ -29,9 +29,29 @@ limitations under the License.
         <dependency>
             <groupId>org.apache.nifi.minifi</groupId>
             <artifactId>minifi-c2-api</artifactId>
-            <version>${project.version}</version>
+            <version>1.17.0-SNAPSHOT</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>c2-protocol-api</artifactId>
+            <version>1.17.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.module</groupId>
+            <artifactId>jackson-module-jaxb-annotations</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.jaxrs</groupId>
+            <artifactId>jackson-jaxrs-json-provider</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
         <dependency>
             <groupId>org.springframework</groupId>
             <artifactId>spring-context</artifactId>
diff --git 
a/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/configuration/C2ResourceConfig.java
 
b/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/configuration/C2ResourceConfig.java
index 5ed07b23be..95dc5f620c 100644
--- 
a/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/configuration/C2ResourceConfig.java
+++ 
b/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/configuration/C2ResourceConfig.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.minifi.c2.configuration;
 
+import org.apache.nifi.minifi.c2.service.C2JsonProviderFeature;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.springframework.context.ApplicationContext;
 import org.springframework.web.context.support.WebApplicationContextUtils;
@@ -29,6 +30,8 @@ public class C2ResourceConfig extends ResourceConfig {
     public C2ResourceConfig(@Context ServletContext servletContext) {
         final ApplicationContext appCtx = 
WebApplicationContextUtils.getWebApplicationContext(servletContext);
 
+        // register Jackson Object Mapper Resolver
+        register(C2JsonProviderFeature.class);
         register(appCtx.getBean("configService"));
     }
 }
diff --git 
a/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/configuration/C2ResourceConfig.java
 
b/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/C2JsonProvider.java
similarity index 53%
copy from 
minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/configuration/C2ResourceConfig.java
copy to 
minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/C2JsonProvider.java
index 5ed07b23be..c12cfc9e74 100644
--- 
a/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/configuration/C2ResourceConfig.java
+++ 
b/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/C2JsonProvider.java
@@ -14,21 +14,31 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.nifi.minifi.c2.service;
 
-package org.apache.nifi.minifi.c2.configuration;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider;
 
-import org.glassfish.jersey.server.ResourceConfig;
-import org.springframework.context.ApplicationContext;
-import org.springframework.web.context.support.WebApplicationContextUtils;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.ext.Provider;
 
-import javax.servlet.ServletContext;
-import javax.ws.rs.core.Context;
+@Provider
+@Produces(MediaType.APPLICATION_JSON)
+public class C2JsonProvider extends JacksonJaxbJsonProvider {
 
-public class C2ResourceConfig extends ResourceConfig {
+    private static final ObjectMapper objectMapper = new ObjectMapper();
 
-    public C2ResourceConfig(@Context ServletContext servletContext) {
-        final ApplicationContext appCtx = 
WebApplicationContextUtils.getWebApplicationContext(servletContext);
+    static {
+        
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+    }
 
-        register(appCtx.getBean("configService"));
+    public C2JsonProvider() {
+        super();
+        setMapper(objectMapper);
     }
-}
+
+
+
+}
\ No newline at end of file
diff --git 
a/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/configuration/C2ResourceConfig.java
 
b/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/C2JsonProviderFeature.java
similarity index 57%
copy from 
minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/configuration/C2ResourceConfig.java
copy to 
minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/C2JsonProviderFeature.java
index 5ed07b23be..e40e8201ad 100644
--- 
a/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/configuration/C2ResourceConfig.java
+++ 
b/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/C2JsonProviderFeature.java
@@ -14,21 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.nifi.minifi.c2.service;
 
-package org.apache.nifi.minifi.c2.configuration;
+import javax.ws.rs.core.Feature;
+import javax.ws.rs.core.FeatureContext;
+import javax.ws.rs.ext.MessageBodyReader;
+import javax.ws.rs.ext.MessageBodyWriter;
 
-import org.glassfish.jersey.server.ResourceConfig;
-import org.springframework.context.ApplicationContext;
-import org.springframework.web.context.support.WebApplicationContextUtils;
+public class C2JsonProviderFeature implements Feature {
 
-import javax.servlet.ServletContext;
-import javax.ws.rs.core.Context;
-
-public class C2ResourceConfig extends ResourceConfig {
-
-    public C2ResourceConfig(@Context ServletContext servletContext) {
-        final ApplicationContext appCtx = 
WebApplicationContextUtils.getWebApplicationContext(servletContext);
-
-        register(appCtx.getBean("configService"));
+    @Override
+    public boolean configure(FeatureContext context) {
+        context.register(C2JsonProvider.class, MessageBodyReader.class, 
MessageBodyWriter.class);
+        return true;
     }
 }
diff --git 
a/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/C2ProtocolContext.java
 
b/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/C2ProtocolContext.java
new file mode 100644
index 0000000000..e09af8e7b2
--- /dev/null
+++ 
b/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/C2ProtocolContext.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.service;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+
+public class C2ProtocolContext {
+    private static final Logger logger = 
LoggerFactory.getLogger(C2ProtocolContext.class);
+
+    private static final C2ProtocolContext EMPTY = builder().build();
+
+    private final URI baseUri;
+    private final Long contentLength;
+    private final String sha256;
+
+    C2ProtocolContext(final Builder builder) {
+        this.baseUri = builder.baseUri;
+        this.contentLength = builder.contentLength;
+        this.sha256 = builder.sha256;
+    }
+
+    public URI getBaseUri() {
+        return baseUri;
+    }
+
+    public Long getContentLength() {
+        return contentLength;
+    }
+
+    public String getSha256() {
+        return sha256;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static C2ProtocolContext empty() {
+        return EMPTY;
+    }
+
+    public static class Builder {
+
+        private URI baseUri;
+        private Long contentLength;
+        private String sha256;
+
+        private Builder() {
+        }
+
+        public Builder baseUri(final URI baseUri) {
+            this.baseUri = baseUri;
+            return this;
+        }
+
+        public Builder contentLength(final Long contentLength) {
+            this.contentLength = contentLength;
+            return this;
+        }
+
+        public Builder contentLength(final String contentLength) {
+            try {
+                this.contentLength = Long.valueOf(contentLength);
+            } catch (final NumberFormatException e) {
+                logger.debug("Could not parse content length string: " + 
contentLength, e);
+            }
+            return this;
+        }
+
+        public Builder sha256(final String sha256) {
+            this.sha256 = sha256;
+            return this;
+        }
+
+        public C2ProtocolContext build() {
+            return new C2ProtocolContext(this);
+        }
+    }
+}
diff --git 
a/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/configuration/C2ResourceConfig.java
 
b/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/C2ProtocolService.java
similarity index 57%
copy from 
minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/configuration/C2ResourceConfig.java
copy to 
minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/C2ProtocolService.java
index 5ed07b23be..756c92065a 100644
--- 
a/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/configuration/C2ResourceConfig.java
+++ 
b/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/C2ProtocolService.java
@@ -14,21 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.nifi.minifi.c2.service;
 
-package org.apache.nifi.minifi.c2.configuration;
+import org.apache.nifi.c2.protocol.api.C2Heartbeat;
+import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
 
-import org.glassfish.jersey.server.ResourceConfig;
-import org.springframework.context.ApplicationContext;
-import org.springframework.web.context.support.WebApplicationContextUtils;
+public interface C2ProtocolService {
+    C2HeartbeatResponse processHeartbeat(C2Heartbeat heartbeat, 
C2ProtocolContext context);
 
-import javax.servlet.ServletContext;
-import javax.ws.rs.core.Context;
-
-public class C2ResourceConfig extends ResourceConfig {
-
-    public C2ResourceConfig(@Context ServletContext servletContext) {
-        final ApplicationContext appCtx = 
WebApplicationContextUtils.getWebApplicationContext(servletContext);
-
-        register(appCtx.getBean("configService"));
-    }
+    void processOperationAck(C2OperationAck operationAck, C2ProtocolContext 
context);
 }
diff --git 
a/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/ConfigService.java
 
b/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/ConfigService.java
index 0732befcd7..9b70fba527 100644
--- 
a/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/ConfigService.java
+++ 
b/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/ConfigService.java
@@ -18,14 +18,19 @@
 package org.apache.nifi.minifi.c2.service;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.util.concurrent.UncheckedExecutionException;
 import io.swagger.annotations.ApiModel;
-import org.apache.nifi.minifi.c2.api.Configuration;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import org.apache.nifi.c2.protocol.api.C2Heartbeat;
+import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
 import org.apache.nifi.minifi.c2.api.ConfigurationProvider;
 import org.apache.nifi.minifi.c2.api.ConfigurationProviderException;
 import org.apache.nifi.minifi.c2.api.InvalidParameterException;
@@ -35,10 +40,13 @@ import org.apache.nifi.minifi.c2.api.util.Pair;
 import org.apache.nifi.minifi.c2.util.HttpRequestUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Configuration;
 import org.springframework.security.core.context.SecurityContextHolder;
 
 import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
 import javax.ws.rs.GET;
+import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.WebApplicationException;
@@ -50,10 +58,11 @@ import javax.ws.rs.core.UriInfo;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.URI;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -61,36 +70,53 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
+import static javax.ws.rs.core.HttpHeaders.CONTENT_LENGTH;
+import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
+
+@Configuration
 @Path("/config")
 @ApiModel(
         value = "/config",
-        description = "Provides configuration for MiNiFi instances"
+        description = "Provides configuration and heartbeat/acknowledge 
capabilities for MiNiFi instances"
 )
 public class ConfigService {
+
+    public static final String MESSAGE_400 = "MiNiFi C2 server was unable to 
complete the request because it was invalid. The request should not be retried 
without modification.";
+
     private static final Logger logger = 
LoggerFactory.getLogger(ConfigService.class);
     private final Authorizer authorizer;
     private final ObjectMapper objectMapper;
     private final Supplier<ConfigurationProviderInfo> 
configurationProviderInfo;
     private final LoadingCache<ConfigurationProviderKey, 
ConfigurationProviderValue> configurationCache;
+    private final C2ProtocolService c2ProtocolService;
+
+    @Context
+    protected HttpServletRequest httpServletRequest;
+
+    @Context
+    protected UriInfo uriInfo;
 
     public ConfigService(List<ConfigurationProvider> configurationProviders, 
Authorizer authorizer) {
         this(configurationProviders, authorizer, 1000, 300_000);
     }
+
     public ConfigService(List<ConfigurationProvider> configurationProviders, 
Authorizer authorizer, long maximumCacheSize, long cacheTtlMillis) {
-        this.authorizer = authorizer;
         this.objectMapper = new ObjectMapper();
+
+        this.authorizer = authorizer;
         if (configurationProviders == null || configurationProviders.size() == 
0) {
             throw new IllegalArgumentException("Expected at least one 
configuration provider");
         }
         this.configurationProviderInfo = Suppliers.memoizeWithExpiration(() -> 
initContentTypeInfo(configurationProviders), cacheTtlMillis, 
TimeUnit.MILLISECONDS);
         CacheBuilder<Object, Object> cacheBuilder = CacheBuilder.newBuilder();
         if (maximumCacheSize >= 0) {
-            cacheBuilder = cacheBuilder.maximumSize(maximumCacheSize);
+            cacheBuilder.maximumSize(maximumCacheSize);
         }
         if (cacheTtlMillis >= 0) {
-            cacheBuilder = cacheBuilder.refreshAfterWrite(cacheTtlMillis, 
TimeUnit.MILLISECONDS);
+            cacheBuilder.refreshAfterWrite(cacheTtlMillis, 
TimeUnit.MILLISECONDS);
         }
         this.configurationCache = cacheBuilder
                 .build(new CacheLoader<ConfigurationProviderKey, 
ConfigurationProviderValue>() {
@@ -99,6 +125,7 @@ public class ConfigService {
                         return initConfigurationProviderValue(key);
                     }
                 });
+        this.c2ProtocolService = new SimpleC2ProtocolService();
     }
 
     public ConfigurationProviderValue 
initConfigurationProviderValue(ConfigurationProviderKey key) {
@@ -147,6 +174,127 @@ public class ConfigService {
         return new ConfigurationProviderInfo(mediaTypeList, contentTypes, 
null);
     }
 
+    @POST
+    @Path("/heartbeat")
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @ApiOperation(
+            value = "An endpoint for a MiNiFi Agent to send a heartbeat to the 
C2 server",
+            response = C2HeartbeatResponse.class
+    )
+    @ApiResponses({
+            @ApiResponse(code = 400, message = MESSAGE_400)})
+    public Response heartbeat(
+            @Context HttpServletRequest request, @Context HttpHeaders 
httpHeaders, @Context UriInfo uriInfo,
+            @ApiParam(required = true) final C2Heartbeat heartbeat) {
+
+        try {
+            
authorizer.authorize(SecurityContextHolder.getContext().getAuthentication(), 
uriInfo);
+        } catch (AuthorizationException e) {
+            logger.warn(HttpRequestUtil.getClientString(request) + " not 
authorized to access " + uriInfo, e);
+            return Response.status(403).build();
+        }
+
+        List<MediaType> acceptValues = httpHeaders.getAcceptableMediaTypes();
+        boolean defaultAccept = false;
+        if (acceptValues.size() == 0) {
+            acceptValues = Collections.singletonList(MediaType.WILDCARD_TYPE);
+            defaultAccept = true;
+        }
+        if (logger.isDebugEnabled()) {
+            StringBuilder builder = new StringBuilder("Handling request from ")
+                    .append(HttpRequestUtil.getClientString(request))
+                    .append(" with Accept");
+            if (defaultAccept) {
+                builder.append(" default value");
+            }
+            builder.append(": ")
+                    
.append(acceptValues.stream().map(Object::toString).collect(Collectors.joining(",
 ")));
+            logger.debug(builder.toString());
+        }
+
+        try {
+            final String flowId;
+            Response response;
+            final String agentClass = heartbeat.getAgentClass();
+            if (agentClass == null || agentClass.equals("")) {
+                logger.warn("No agent class provided, returning OK (200)");
+                response = Response.ok().build();
+                return response;
+            } else {
+                Map<String, List<String>> parameters = 
Collections.singletonMap("class", Collections.singletonList(agentClass));
+                ConfigurationProviderValue configurationProviderValue = 
configurationCache.get(new ConfigurationProviderKey(acceptValues, parameters));
+                org.apache.nifi.minifi.c2.api.Configuration configuration;
+                try {
+                    configuration = 
configurationProviderValue.getConfiguration();
+                } catch (ConfigurationProviderException cpe) {
+                    logger.warn("No flow available for agent class " + 
agentClass + ", returning No Content (204)");
+                    response = Response.noContent().build();
+                    return response;
+                }
+                try (InputStream inputStream = configuration.getInputStream();
+                     ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream()) {
+                    MessageDigest sha256 = 
MessageDigest.getInstance("SHA-256");
+                    byte[] buffer = new byte[1024];
+                    int read;
+                    while ((read = inputStream.read(buffer)) >= 0) {
+                        outputStream.write(buffer, 0, read);
+                        sha256.update(buffer, 0, read);
+                    }
+                    flowId = bytesToHex(sha256.digest());
+
+                } catch (ConfigurationProviderException | IOException | 
NoSuchAlgorithmException e) {
+                    logger.error("Error reading or checksumming configuration 
file", e);
+                    throw new WebApplicationException(500);
+                }
+                final C2ProtocolContext heartbeatContext = 
C2ProtocolContext.builder()
+                        
.baseUri(uriInfo.getBaseUriBuilder().path("/config").queryParam("class", 
agentClass).build())
+                        
.contentLength(httpServletRequest.getHeader(CONTENT_LENGTH))
+                        .sha256(flowId)
+                        .build();
+
+                try {
+                    final C2HeartbeatResponse heartbeatResponse = 
c2ProtocolService.processHeartbeat(heartbeat, heartbeatContext);
+                    response = Response.ok(heartbeatResponse).build();
+                } catch (Exception e) {
+                    logger.error("Heartbeat processing failed", e);
+                    response = 
Response.status(BAD_REQUEST).entity(e.getMessage()).build();
+                }
+            }
+            return response;
+        } catch (ExecutionException | UncheckedExecutionException e) {
+            Throwable cause = e.getCause();
+            if (cause instanceof WebApplicationException) {
+                throw (WebApplicationException) cause;
+            }
+            logger.error(HttpRequestUtil.getClientString(request) + " made 
request with " + HttpRequestUtil.getQueryString(request) + " that caused 
error.", cause);
+            return Response.status(500).entity("Internal error").build();
+        }
+    }
+
+    @POST
+    @Path("/acknowledge")
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @ApiOperation(
+            value = "An endpoint for a MiNiFi Agent to send an operation 
acknowledgement to the C2 server"
+    )
+    @ApiResponses({
+            @ApiResponse(code = 400, message = MESSAGE_400)})
+    public Response acknowledge(
+            @ApiParam(required = true) final C2OperationAck operationAck) {
+
+        final C2ProtocolContext ackContext = C2ProtocolContext.builder()
+                .baseUri(getBaseUri())
+                .contentLength(httpServletRequest.getHeader(CONTENT_LENGTH))
+                .build();
+
+        c2ProtocolService.processOperationAck(operationAck, ackContext);
+
+        return Response.ok().build();
+
+    }
+
     @GET
     @Path("/contentTypes")
     @Produces(MediaType.APPLICATION_JSON)
@@ -182,14 +330,11 @@ public class ConfigService {
             logger.warn(HttpRequestUtil.getClientString(request) + " not 
authorized to access " + uriInfo, e);
             return Response.status(403).build();
         }
-        Map<String, List<String>> parameters = new HashMap<>();
-        for (Map.Entry<String, List<String>> entry : 
uriInfo.getQueryParameters().entrySet()) {
-            parameters.put(entry.getKey(), entry.getValue());
-        }
+        Map<String, List<String>> parameters = new 
HashMap<>(uriInfo.getQueryParameters());
         List<MediaType> acceptValues = httpHeaders.getAcceptableMediaTypes();
         boolean defaultAccept = false;
         if (acceptValues.size() == 0) {
-            acceptValues = Arrays.asList(MediaType.WILDCARD_TYPE);
+            acceptValues = Collections.singletonList(MediaType.WILDCARD_TYPE);
             defaultAccept = true;
         }
         if (logger.isDebugEnabled()) {
@@ -199,16 +344,16 @@ public class ConfigService {
                     .append(parameters)
                     .append(" and Accept");
             if (defaultAccept) {
-                builder = builder.append(" default value");
+                builder.append(" default value");
             }
-            builder = builder.append(": ")
+            builder.append(": ")
                     
.append(acceptValues.stream().map(Object::toString).collect(Collectors.joining(",
 ")));
             logger.debug(builder.toString());
         }
 
         try {
             ConfigurationProviderValue configurationProviderValue = 
configurationCache.get(new ConfigurationProviderKey(acceptValues, parameters));
-            Configuration configuration = 
configurationProviderValue.getConfiguration();
+            org.apache.nifi.minifi.c2.api.Configuration configuration = 
configurationProviderValue.getConfiguration();
             Response.ResponseBuilder ok = Response.ok();
             ok = ok.header("X-Content-Version", configuration.getVersion());
             ok = ok.type(configurationProviderValue.getMediaType());
@@ -240,7 +385,7 @@ public class ConfigService {
         } catch (ConfigurationProviderException e) {
             logger.warn("Unable to get configuration.", e);
             return Response.status(500).build();
-        } catch (ExecutionException|UncheckedExecutionException e) {
+        } catch (ExecutionException | UncheckedExecutionException e) {
             Throwable cause = e.getCause();
             if (cause instanceof WebApplicationException) {
                 throw (WebApplicationException) cause;
@@ -279,4 +424,9 @@ public class ConfigService {
                 "\"Accept: " + 
acceptValues.stream().map(Object::toString).collect(Collectors.joining(", ")) + 
"\" supported media types are " +
                 
mediaTypeList.stream().map(Pair::getFirst).map(Object::toString).collect(Collectors.joining(",
 "))).build());
     }
+
+    private URI getBaseUri() {
+        // Forwarded Headers are expected to have been applied as part of 
servlet filter chain
+        return uriInfo.getBaseUri();
+    }
 }
diff --git 
a/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/SimpleC2ProtocolService.java
 
b/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/SimpleC2ProtocolService.java
new file mode 100644
index 0000000000..c26e0449bf
--- /dev/null
+++ 
b/minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/SimpleC2ProtocolService.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.service;
+
+import org.apache.nifi.c2.protocol.api.C2Heartbeat;
+import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationState;
+import org.apache.nifi.c2.protocol.api.OperationType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+@Service
+public class SimpleC2ProtocolService implements C2ProtocolService {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(SimpleC2ProtocolService.class);
+
+    private static final Set<String> issuedOperationIds = new HashSet<>();
+
+    private final Map<String, String> currentFlowIds;
+
+    public SimpleC2ProtocolService() {
+        currentFlowIds = new HashMap<>(1000);
+    }
+
+    @Override
+    public void processOperationAck(final C2OperationAck operationAck, final 
C2ProtocolContext context) {
+        // This service assumes there is a single Operation UPDATE to pass 
over the updated flow
+        logger.debug("Received operation acknowledgement: {}; {}", 
operationAck, context);
+        // Remove the operator ID from the list of issued operations and log 
the state
+        final String operationId = operationAck.getOperationId();
+        try {
+            OperationState opState = OperationState.DONE;
+            String details = null;
+
+            /* Partial applications are rare and only happen when an operation 
consists of updating multiple config
+             * items and some succeed ( we don't yet have the concept of 
rollback in agents ).
+             * Fully Applied yields an operation success.
+             * Operation Not Understood and Not Applied give little details 
but also will result in Operation Failure.
+             * We should explore if providing textual details. */
+            final C2OperationState c2OperationState = 
operationAck.getOperationState();
+            if (null != c2OperationState) {
+                details = c2OperationState.getDetails();
+                if (c2OperationState.getState() != 
C2OperationState.OperationState.FULLY_APPLIED) {
+                    opState = OperationState.FAILED;
+                }
+            }
+
+            if (!issuedOperationIds.remove(operationId)) {
+                logger.warn("Operation with ID " + operationId + " has either 
already been acknowledged or is unknown to this server");
+            } else if (null != c2OperationState) {
+                final C2OperationState.OperationState operationState = 
c2OperationState.getState();
+                logger.debug("Operation with ID " + operationId + " 
acknowledged with a state of " + operationState.name() + "(" + opState.name() + 
"), details = "
+                        + (details == null ? "" : details));
+            }
+
+            // Optionally, an acknowledgement can include some of the info 
normally passed in a heartbeat.
+            // If this info is present, process it as a heartbeat, so we 
update our latest known state of the agent.
+            if (operationAck.getAgentInfo() != null
+                    || operationAck.getDeviceInfo() != null
+                    || operationAck.getFlowInfo() != null) {
+                final C2Heartbeat heartbeatInfo = toHeartbeat(operationAck);
+                logger.trace("Operation acknowledgement contains additional 
info. Processing as heartbeat: {}", heartbeatInfo);
+                processHeartbeat(heartbeatInfo, context);
+            }
+
+        } catch (final Exception e) {
+            logger.warn("Encountered exception while processing operation 
ack", e);
+        }
+    }
+
+    @Override
+    public C2HeartbeatResponse processHeartbeat(final C2Heartbeat heartbeat, 
final C2ProtocolContext context) {
+
+        C2HeartbeatResponse c2HeartbeatResponse = new C2HeartbeatResponse();
+        String currentFlowId = currentFlowIds.get(heartbeat.getAgentId());
+        if (currentFlowId == null || 
!currentFlowId.equals(context.getSha256())) {
+            // Create a single UPDATE operation to fetch the flow from the 
specified URL
+            C2Operation c2Operation = new C2Operation();
+            final String operationID = UUID.randomUUID().toString();
+            issuedOperationIds.add(operationID);
+            c2Operation.setIdentifier(operationID);
+            c2Operation.setOperation(OperationType.UPDATE);
+            c2Operation.setOperand(OperandType.CONFIGURATION);
+            c2Operation.setArgs(Collections.singletonMap("location", 
context.getBaseUri().toString()));
+            List<C2Operation> requestedOperations = 
Collections.singletonList(c2Operation);
+            c2HeartbeatResponse.setRequestedOperations(requestedOperations);
+            currentFlowIds.put(heartbeat.getAgentId(), context.getSha256());
+        }
+
+        return c2HeartbeatResponse;
+    }
+
+    private static C2Heartbeat toHeartbeat(final C2OperationAck ack) {
+        final C2Heartbeat heartbeat = new C2Heartbeat();
+        heartbeat.setDeviceInfo(ack.getDeviceInfo());
+        heartbeat.setAgentInfo(ack.getAgentInfo());
+        heartbeat.setFlowInfo(ack.getFlowInfo());
+        return heartbeat;
+    }
+}
diff --git a/minifi/pom.xml b/minifi/pom.xml
index 7a55bc8cdd..796d4003a7 100644
--- a/minifi/pom.xml
+++ b/minifi/pom.xml
@@ -41,7 +41,7 @@ limitations under the License.
     </modules>
     <properties>
         <jersey.version>2.29</jersey.version>
-        <system.rules.version>1.16.1</system.rules.version>
+        <system.rules.version>1.19.0</system.rules.version>
         <aws.sdk.version>1.11.172</aws.sdk.version>
         <yammer.metrics.version>2.2.0</yammer.metrics.version>
     </properties>

Reply via email to