[hotfix] [nifi] Minor style cleanups in NiFi source
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b08b64ab Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b08b64ab Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b08b64ab Branch: refs/heads/master Commit: b08b64abdb7c9bd7946e9c36e63ec368a1ac5032 Parents: 38362c4 Author: Stephan Ewen <se...@apache.org> Authored: Tue Jun 7 19:23:56 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Jun 8 15:17:10 2016 +0200 ---------------------------------------------------------------------- .../streaming/connectors/nifi/NiFiSource.java | 99 ++++++++++---------- 1 file changed, 49 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b08b64ab/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java index 00b6921..57c59ec 100644 --- a/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java +++ b/flink-streaming-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.connectors.nifi; import org.apache.flink.api.common.functions.StoppableFunction; @@ -26,6 +27,7 @@ import org.apache.nifi.remote.client.SiteToSiteClient; import org.apache.nifi.remote.client.SiteToSiteClientConfig; import org.apache.nifi.remote.protocol.DataPacket; import org.apache.nifi.stream.io.StreamUtils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,13 +42,20 @@ import java.util.Map; */ public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> implements StoppableFunction{ + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(NiFiSource.class); private static final long DEFAULT_WAIT_TIME_MS = 1000; - private long waitTimeMs; - private SiteToSiteClient client; - private SiteToSiteClientConfig clientConfig; + // ------------------------------------------------------------------------ + + private final SiteToSiteClientConfig clientConfig; + + private final long waitTimeMs; + + private transient SiteToSiteClient client; + private volatile boolean isRunning = true; /** @@ -73,63 +82,58 @@ public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> imple public void open(Configuration parameters) throws Exception { super.open(parameters); client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build(); - isRunning = true; } @Override public void run(SourceContext<NiFiDataPacket> ctx) throws Exception { - try { - while (isRunning) { - final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); - if (transaction == null) { - LOG.warn("A transaction could not be created, waiting and will try again..."); - try { - Thread.sleep(waitTimeMs); - } catch (InterruptedException ignored) { - - } - continue; + while (isRunning) { + final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); + if (transaction == null) { + LOG.warn("A transaction could not be created, waiting and will try again..."); + try { + Thread.sleep(waitTimeMs); + } catch (InterruptedException ignored) { + } + continue; + } - DataPacket dataPacket = transaction.receive(); - if (dataPacket == null) { - transaction.confirm(); - transaction.complete(); + DataPacket dataPacket = transaction.receive(); + if (dataPacket == null) { + transaction.confirm(); + transaction.complete(); - LOG.debug("No data available to pull, waiting and will try again..."); - try { - Thread.sleep(waitTimeMs); - } catch (InterruptedException ignored) { + LOG.debug("No data available to pull, waiting and will try again..."); + try { + Thread.sleep(waitTimeMs); + } catch (InterruptedException ignored) { - } - continue; } + continue; + } - final List<NiFiDataPacket> niFiDataPackets = new ArrayList<>(); - do { - // Read the data into a byte array and wrap it along with the attributes - // into a NiFiDataPacket. - final InputStream inStream = dataPacket.getData(); - final byte[] data = new byte[(int) dataPacket.getSize()]; - StreamUtils.fillBuffer(inStream, data); - - final Map<String, String> attributes = dataPacket.getAttributes(); + final List<NiFiDataPacket> niFiDataPackets = new ArrayList<>(); + do { + // Read the data into a byte array and wrap it along with the attributes + // into a NiFiDataPacket. + final InputStream inStream = dataPacket.getData(); + final byte[] data = new byte[(int) dataPacket.getSize()]; + StreamUtils.fillBuffer(inStream, data); - niFiDataPackets.add(new StandardNiFiDataPacket(data, attributes)); - dataPacket = transaction.receive(); - } while (dataPacket != null); + final Map<String, String> attributes = dataPacket.getAttributes(); - // Confirm transaction to verify the data - transaction.confirm(); + niFiDataPackets.add(new StandardNiFiDataPacket(data, attributes)); + dataPacket = transaction.receive(); + } while (dataPacket != null); - for (NiFiDataPacket dp : niFiDataPackets) { - ctx.collect(dp); - } + // Confirm transaction to verify the data + transaction.confirm(); - transaction.complete(); + for (NiFiDataPacket dp : niFiDataPackets) { + ctx.collect(dp); } - } finally { - ctx.close(); + + transaction.complete(); } } @@ -144,11 +148,6 @@ public class NiFiSource extends RichParallelSourceFunction<NiFiDataPacket> imple client.close(); } - /** - * {@inheritDoc} - * <p> - * Sets the {@link #isRunning} flag to {@code false}. - */ @Override public void stop() { this.isRunning = false;