[FLINK-3405] [nifi] Extend NiFiSource with interface StoppableFunction This closes #2047
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/38362c40 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/38362c40 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/38362c40 Branch: refs/heads/master Commit: 38362c40bdd2e9a350630988c534b2859854d379 Parents: 6afb2b0 Author: smarthi <smar...@apache.org> Authored: Sun May 29 02:15:16 2016 -0400 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Jun 8 15:17:10 2016 +0200 ---------------------------------------------------------------------- .../flink-connector-nifi/pom.xml | 2 +- .../streaming/connectors/nifi/NiFiSource.java | 24 ++++++++++++++------ 2 files changed, 18 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/38362c40/flink-streaming-connectors/flink-connector-nifi/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-nifi/pom.xml b/flink-streaming-connectors/flink-connector-nifi/pom.xml index d93bce7..a18d7b9 100644 --- a/flink-streaming-connectors/flink-connector-nifi/pom.xml +++ b/flink-streaming-connectors/flink-connector-nifi/pom.xml @@ -37,7 +37,7 @@ under the License. <!-- Allow users to pass custom connector versions --> <properties> - <nifi.version>0.3.0</nifi.version> + <nifi.version>0.6.1</nifi.version> </properties> <dependencies> http://git-wip-us.apache.org/repos/asf/flink/blob/38362c40/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java index a213bb4..00b6921 100644 --- a/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java +++ b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java @@ -17,6 +17,7 @@ */ package org.apache.flink.streaming.connectors.nifi; +import org.apache.flink.api.common.functions.StoppableFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.nifi.remote.Transaction; @@ -37,7 +38,7 @@ import java.util.Map; * A source that pulls data from Apache NiFi using the NiFi Site-to-Site client. This source * produces NiFiDataPackets which encapsulate the content and attributes of a NiFi FlowFile. */ -public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> { +public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> implements StoppableFunction{ private static final Logger LOG = LoggerFactory.getLogger(NiFiSource.class); @@ -46,7 +47,7 @@ public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> { private long waitTimeMs; private SiteToSiteClient client; private SiteToSiteClientConfig clientConfig; - private transient volatile boolean running; + private volatile boolean isRunning = true; /** * Constructs a new NiFiSource using the given client config and the default wait time of 1000 ms. @@ -72,19 +73,19 @@ public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> { public void open(Configuration parameters) throws Exception { super.open(parameters); client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build(); - running = true; + isRunning = true; } @Override public void run(SourceContext<NiFiDataPacket> ctx) throws Exception { try { - while (running) { + while (isRunning) { final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); if (transaction == null) { LOG.warn("A transaction could not be created, waiting and will try again..."); try { Thread.sleep(waitTimeMs); - } catch (InterruptedException e) { + } catch (InterruptedException ignored) { } continue; @@ -98,7 +99,7 @@ public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> { LOG.debug("No data available to pull, waiting and will try again..."); try { Thread.sleep(waitTimeMs); - } catch (InterruptedException e) { + } catch (InterruptedException ignored) { } continue; @@ -134,7 +135,7 @@ public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> { @Override public void cancel() { - running = false; + isRunning = false; } @Override @@ -143,4 +144,13 @@ public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> { client.close(); } + /** + * {@inheritDoc} + * <p> + * Sets the {@link #isRunning} flag to {@code false}. + */ + @Override + public void stop() { + this.isRunning = false; + } }