[FLINK-3405] [nifi] Extend NiFiSource with interface StoppableFunction

This closes #2047


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/38362c40
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/38362c40
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/38362c40

Branch: refs/heads/master
Commit: 38362c40bdd2e9a350630988c534b2859854d379
Parents: 6afb2b0
Author: smarthi <smar...@apache.org>
Authored: Sun May 29 02:15:16 2016 -0400
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jun 8 15:17:10 2016 +0200

----------------------------------------------------------------------
 .../flink-connector-nifi/pom.xml                |  2 +-
 .../streaming/connectors/nifi/NiFiSource.java   | 24 ++++++++++++++------
 2 files changed, 18 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/38362c40/flink-streaming-connectors/flink-connector-nifi/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-nifi/pom.xml 
b/flink-streaming-connectors/flink-connector-nifi/pom.xml
index d93bce7..a18d7b9 100644
--- a/flink-streaming-connectors/flink-connector-nifi/pom.xml
+++ b/flink-streaming-connectors/flink-connector-nifi/pom.xml
@@ -37,7 +37,7 @@ under the License.
 
        <!-- Allow users to pass custom connector versions -->
        <properties>
-               <nifi.version>0.3.0</nifi.version>
+               <nifi.version>0.6.1</nifi.version>
        </properties>
 
        <dependencies>

http://git-wip-us.apache.org/repos/asf/flink/blob/38362c40/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
 
b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
index a213bb4..00b6921 100644
--- 
a/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
+++ 
b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.nifi;
 
+import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.nifi.remote.Transaction;
@@ -37,7 +38,7 @@ import java.util.Map;
  * A source that pulls data from Apache NiFi using the NiFi Site-to-Site 
client. This source
  * produces NiFiDataPackets which encapsulate the content and attributes of a 
NiFi FlowFile.
  */
-public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> {
+public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> 
implements StoppableFunction{
 
        private static final Logger LOG = 
LoggerFactory.getLogger(NiFiSource.class);
 
@@ -46,7 +47,7 @@ public class NiFiSource extends 
RichParallelSourceFunction<NiFiDataPacket> {
        private long waitTimeMs;
        private SiteToSiteClient client;
        private SiteToSiteClientConfig clientConfig;
-       private transient volatile boolean running;
+       private volatile boolean isRunning = true;
 
        /**
         * Constructs a new NiFiSource using the given client config and the 
default wait time of 1000 ms.
@@ -72,19 +73,19 @@ public class NiFiSource extends 
RichParallelSourceFunction<NiFiDataPacket> {
        public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                client = new 
SiteToSiteClient.Builder().fromConfig(clientConfig).build();
-               running = true;
+               isRunning = true;
        }
 
        @Override
        public void run(SourceContext<NiFiDataPacket> ctx) throws Exception {
                try {
-                       while (running) {
+                       while (isRunning) {
                                final Transaction transaction = 
client.createTransaction(TransferDirection.RECEIVE);
                                if (transaction == null) {
                                        LOG.warn("A transaction could not be 
created, waiting and will try again...");
                                        try {
                                                Thread.sleep(waitTimeMs);
-                                       } catch (InterruptedException e) {
+                                       } catch (InterruptedException ignored) {
 
                                        }
                                        continue;
@@ -98,7 +99,7 @@ public class NiFiSource extends 
RichParallelSourceFunction<NiFiDataPacket> {
                                        LOG.debug("No data available to pull, 
waiting and will try again...");
                                        try {
                                                Thread.sleep(waitTimeMs);
-                                       } catch (InterruptedException e) {
+                                       } catch (InterruptedException ignored) {
 
                                        }
                                        continue;
@@ -134,7 +135,7 @@ public class NiFiSource extends 
RichParallelSourceFunction<NiFiDataPacket> {
 
        @Override
        public void cancel() {
-               running = false;
+               isRunning = false;
        }
 
        @Override
@@ -143,4 +144,13 @@ public class NiFiSource extends 
RichParallelSourceFunction<NiFiDataPacket> {
                client.close();
        }
 
+ /**
+       * {@inheritDoc}
+       * <p>
+       * Sets the {@link #isRunning} flag to {@code false}.
+       */
+       @Override
+       public void stop() {
+               this.isRunning = false;
+       }
 }

Reply via email to