Repository: nifi
Updated Branches:
  refs/heads/master 938c7cccb -> be83c0c5b


NIFI-1844 - Added ListFTP and FetchFTP processors

This closes #881.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/be83c0c5
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/be83c0c5
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/be83c0c5

Branch: refs/heads/master
Commit: be83c0c5b2b8a435b4745cbfc43f7c9251561727
Parents: 938c7cc
Author: Nicolas Dupont <contact.nicolas.dup...@gmail.com>
Authored: Wed Aug 17 17:32:05 2016 +0200
Committer: Pierre Villard <pierre.villard...@gmail.com>
Committed: Wed Sep 21 17:40:38 2016 +0200

----------------------------------------------------------------------
 .../nifi/processors/standard/FetchFTP.java      |  82 ++++++++++++++
 .../nifi/processors/standard/ListFTP.java       | 106 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   4 +-
 3 files changed, 191 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/be83c0c5/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java
new file mode 100644
index 0000000..2460048
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFTP.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.standard.util.FileTransfer;
+import org.apache.nifi.processors.standard.FetchFileTransfer;
+import org.apache.nifi.processors.standard.GetFTP;
+import org.apache.nifi.processors.standard.GetSFTP;
+import org.apache.nifi.processors.standard.PutFTP;
+import org.apache.nifi.processors.standard.PutSFTP;
+import org.apache.nifi.processors.standard.util.FTPTransfer;
+
+// Note that we do not use @SupportsBatching annotation. This processor cannot 
support batching because it must ensure that session commits happen before 
remote files are deleted.
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"ftp", "get", "retrieve", "files", "fetch", "remote", "ingest", 
"source", "input"})
+@CapabilityDescription("Fetches the content of a file from a remote SFTP 
server and overwrites the contents of an incoming FlowFile with the content of 
the remote file.")
+@SeeAlso({GetSFTP.class, PutSFTP.class, GetFTP.class, PutFTP.class})
+@WritesAttributes({
+    @WritesAttribute(attribute = "ftp.remote.host", description = "The 
hostname or IP address from which the file was pulled"),
+    @WritesAttribute(attribute = "ftp.remote.port", description = "The port 
that was used to communicate with the remote FTP server"),
+    @WritesAttribute(attribute = "ftp.remote.filename", description = "The 
name of the remote file that was pulled"),
+    @WritesAttribute(attribute = "filename", description = "The filename is 
updated to point to the filename fo the remote file"),
+    @WritesAttribute(attribute = "path", description = "If the Remote File 
contains a directory name, that directory name will be added to the FlowFile 
using the 'path' attribute")
+})
+public class FetchFTP extends FetchFileTransfer {
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final PropertyDescriptor port = new 
PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("21").build();
+
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(HOSTNAME);
+        properties.add(port);
+        properties.add(USERNAME);
+        properties.add(FTPTransfer.PASSWORD);
+        properties.add(REMOTE_FILENAME);
+        properties.add(COMPLETION_STRATEGY);
+        properties.add(MOVE_DESTINATION_DIR);
+        properties.add(FTPTransfer.CONNECTION_TIMEOUT);
+        properties.add(FTPTransfer.DATA_TIMEOUT);
+        properties.add(FTPTransfer.USE_COMPRESSION);
+        properties.add(FTPTransfer.CONNECTION_MODE);
+        properties.add(FTPTransfer.TRANSFER_MODE);
+        properties.add(FTPTransfer.PROXY_TYPE);
+        properties.add(FTPTransfer.PROXY_HOST);
+        properties.add(FTPTransfer.PROXY_PORT);
+        properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
+        properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
+        return properties;
+    }
+
+    @Override
+    protected FileTransfer createFileTransfer(final ProcessContext context) {
+        return new FTPTransfer(context, getLogger());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/be83c0c5/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
new file mode 100644
index 0000000..02468d3
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.standard.util.FileTransfer;
+import org.apache.nifi.processors.standard.ListFileTransfer;
+import org.apache.nifi.processors.standard.util.FTPTransfer;
+import org.apache.nifi.processors.standard.PutFTP;
+import org.apache.nifi.processors.standard.GetFTP;
+
+@TriggerSerially
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@Tags({"list", "ftp", "remote", "ingest", "source", "input", "files"})
+@CapabilityDescription("Performs a listing of the files residing on an FTP 
server. For each file that is found on the remote server, a new FlowFile will 
be created with the filename attribute "
+    + "set to the name of the file on the remote server. This can then be used 
in conjunction with FetchFTP in order to fetch those files.")
+@SeeAlso({FetchFTP.class, GetFTP.class, PutFTP.class})
+@WritesAttributes({
+    @WritesAttribute(attribute = "ftp.remote.host", description = "The 
hostname of the FTP Server"),
+    @WritesAttribute(attribute = "ftp.remote.port", description = "The port 
that was connected to on the FTP Server"),
+    @WritesAttribute(attribute = "ftp.listing.user", description = "The 
username of the user that performed the FTP Listing"),
+    @WritesAttribute(attribute = "file.owner", description = "The numeric 
owner id of the source file"),
+    @WritesAttribute(attribute = "file.group", description = "The numeric 
group id of the source file"),
+    @WritesAttribute(attribute = "file.permissions", description = "The 
read/write/execute permissions of the source file"),
+    @WritesAttribute(attribute = "filename", description = "The name of the 
file on the SFTP Server"),
+    @WritesAttribute(attribute = "path", description = "The fully qualified 
name of the directory on the SFTP Server from which the file was pulled"),
+})
+@Stateful(scopes = {Scope.CLUSTER}, description = "After performing a listing 
of files, the timestamp of the newest file is stored. "
+    + "This allows the Processor to list only files that have been added or 
modified after "
+    + "this date the next time that the Processor is run. State is stored 
across the cluster so that this Processor can be run on Primary Node only and 
if "
+    + "a new Primary Node is selected, the new node will not duplicate the 
data that was listed by the previous Primary Node.")
+public class ListFTP extends ListFileTransfer {
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final PropertyDescriptor port = new 
PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("21").build();
+
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(HOSTNAME);
+        properties.add(port);
+        properties.add(USERNAME);
+        properties.add(FTPTransfer.PASSWORD);
+        properties.add(REMOTE_PATH);
+        properties.add(DISTRIBUTED_CACHE_SERVICE);
+        properties.add(FTPTransfer.RECURSIVE_SEARCH);
+        properties.add(FTPTransfer.FILE_FILTER_REGEX);
+        properties.add(FTPTransfer.PATH_FILTER_REGEX);
+        properties.add(FTPTransfer.IGNORE_DOTTED_FILES);
+        properties.add(FTPTransfer.REMOTE_POLL_BATCH_SIZE);
+        properties.add(FTPTransfer.CONNECTION_TIMEOUT);
+        properties.add(FTPTransfer.DATA_TIMEOUT);
+        properties.add(FTPTransfer.CONNECTION_MODE);
+        properties.add(FTPTransfer.TRANSFER_MODE);
+        properties.add(FTPTransfer.PROXY_TYPE);
+        properties.add(FTPTransfer.PROXY_HOST);
+        properties.add(FTPTransfer.PROXY_PORT);
+        properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
+        properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
+        return properties;
+    }
+
+    @Override
+    protected FileTransfer getFileTransfer(final ProcessContext context) {
+        return new FTPTransfer(context, getLogger());
+    }
+
+    @Override
+    protected String getProtocolName() {
+        return "ftp";
+    }
+
+    @Override
+    protected Scope getStateScope(final ProcessContext context) {
+        // Use cluster scope so that component can be run on Primary Node Only 
and can still
+        // pick up where it left off, even if the Primary Node changes.
+        return Scope.CLUSTER;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/be83c0c5/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 716e736..dc1f012 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -89,4 +89,6 @@ org.apache.nifi.processors.standard.UnpackContent
 org.apache.nifi.processors.standard.ValidateXml
 org.apache.nifi.processors.standard.ValidateCsv
 org.apache.nifi.processors.standard.ExecuteSQL
-org.apache.nifi.processors.standard.FetchDistributedMapCache
\ No newline at end of file
+org.apache.nifi.processors.standard.FetchDistributedMapCache
+org.apache.nifi.processors.standard.ListFTP
+org.apache.nifi.processors.standard.FetchFTP

Reply via email to