This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 209c448  Added Pulsar IO connector for local files (#2869)
209c448 is described below

commit 209c44883ad30b13b1a693a02aff8858c287401e
Author: David Kjerrumgaard <35466513+david-stream...@users.noreply.github.com>
AuthorDate: Thu Dec 27 09:49:04 2018 -0800

    Added Pulsar IO connector for local files (#2869)
    
    ### Motivation
    
    Added a Pulsar IO connector for consuming files from the local filesystem
    
    ### Modifications
    
    Added a new module to the pulsar-io module that includes the Pulsar file 
connector and its associated classes & tests
    
    ### Result
    
    After your change, users will be able to consume files from the local 
filesystem, and have the contents directly published to a Pulsar topic.
---
 pulsar-io/file/pom.xml                             |  70 ++++++
 .../apache/pulsar/io/file/FileConsumerThread.java  | 108 +++++++++
 .../apache/pulsar/io/file/FileListingThread.java   | 189 +++++++++++++++
 .../java/org/apache/pulsar/io/file/FileRecord.java |  75 ++++++
 .../java/org/apache/pulsar/io/file/FileSource.java |  70 ++++++
 .../apache/pulsar/io/file/FileSourceConfig.java    | 177 ++++++++++++++
 .../apache/pulsar/io/file/ProcessedFileThread.java |  60 +++++
 .../org/apache/pulsar/io/file/package-info.java    |  19 ++
 .../org/apache/pulsar/io/file/utils/GZipFiles.java |  96 ++++++++
 .../org/apache/pulsar/io/file/utils/ZipFiles.java  |  94 ++++++++
 .../apache/pulsar/io/file/utils/package-info.java  |  19 ++
 .../resources/META-INF/services/pulsar-io.yaml     |  22 ++
 .../apache/pulsar/io/file/AbstractFileTests.java   | 138 +++++++++++
 .../pulsar/io/file/FileConsumerThreadTests.java    | 148 ++++++++++++
 .../pulsar/io/file/FileListingThreadTests.java     | 238 +++++++++++++++++++
 .../pulsar/io/file/FileSourceConfigTests.java      | 121 ++++++++++
 .../pulsar/io/file/ProcessedFileThreadTests.java   | 259 +++++++++++++++++++++
 .../apache/pulsar/io/file/TestFileGenerator.java   |  91 ++++++++
 .../pulsar/io/file/utils/GZipFilesTests.java       |  68 ++++++
 .../apache/pulsar/io/file/utils/ZipFilesTests.java |  68 ++++++
 .../org/apache/pulsar/io/file/mislabelled.gz       |   1 +
 .../org/apache/pulsar/io/file/nonGzipFile.txt      |  20 ++
 .../org/apache/pulsar/io/file/validGzip.gz         | Bin 0 -> 60 bytes
 .../org/apache/pulsar/io/file/validZip.zip         | Bin 0 -> 200 bytes
 pulsar-io/file/src/test/resources/sinkConfig.yaml  |  33 +++
 pulsar-io/pom.xml                                  |   1 +
 26 files changed, 2185 insertions(+)

diff --git a/pulsar-io/file/pom.xml b/pulsar-io/file/pom.xml
new file mode 100644
index 0000000..c2d892f
--- /dev/null
+++ b/pulsar-io/file/pom.xml
@@ -0,0 +1,70 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-io</artifactId>
+    <version>2.3.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>pulsar-io-file</artifactId>
+  <name>Pulsar IO :: File</name>
+  
+  <dependencies>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    
+    <dependency>
+      <groupId>com.fasterxml.jackson.dataformat</groupId>
+      <artifactId>jackson-dataformat-yaml</artifactId>
+    </dependency>
+
+    <dependency>
+       <groupId>com.fasterxml.jackson.core</groupId>
+       <artifactId>jackson-databind</artifactId>
+    </dependency>
+    
+    <dependency>
+       <groupId>commons-io</groupId>
+       <artifactId>commons-io</artifactId>
+    </dependency>
+
+    <dependency>
+       <groupId>org.apache.commons</groupId>
+       <artifactId>commons-lang3</artifactId>
+    </dependency>
+  </dependencies>
+  
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+  
+</project>
\ No newline at end of file
diff --git 
a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileConsumerThread.java
 
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileConsumerThread.java
new file mode 100644
index 0000000..8c9803a
--- /dev/null
+++ 
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileConsumerThread.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
+
+import org.apache.pulsar.io.core.PushSource;
+import org.apache.pulsar.io.file.utils.GZipFiles;
+import org.apache.pulsar.io.file.utils.ZipFiles;
+
+/**
+ * Worker thread that consumes the contents of the files
+ * and publishes them to a Pulsar topic.
+ */
+public class FileConsumerThread extends Thread {
+
+    private final PushSource<byte[]> source;
+    private final BlockingQueue<File> workQueue;
+    private final BlockingQueue<File> inProcess;
+    private final BlockingQueue<File> recentlyProcessed;
+
+    public FileConsumerThread(PushSource<byte[]> source,
+            BlockingQueue<File> workQueue,
+            BlockingQueue<File> inProcess,
+            BlockingQueue<File> recentlyProcessed) {
+        this.source = source;
+        this.workQueue = workQueue;
+        this.inProcess = inProcess;
+        this.recentlyProcessed = recentlyProcessed;
+    }
+
+    public void run() {
+        try {
+            while (true) {
+                File file = workQueue.take();
+
+                boolean added = false;
+                do {
+                    added = inProcess.add(file);
+                } while (!added);
+
+                consumeFile(file);
+            }
+        } catch (InterruptedException ie) {
+            // just terminate
+        }
+    }
+
+    private void consumeFile(File file) {
+        final AtomicInteger idx = new AtomicInteger(1);
+        try (Stream<String> lines = getLines(file)) {
+             lines.forEachOrdered(line -> process(file, idx.getAndIncrement(), 
line));
+        } catch (IOException e) {
+            e.printStackTrace();
+        } finally {
+
+            boolean removed = false;
+            do {
+                removed = inProcess.remove(file);
+            } while (!removed);
+
+            boolean added = false;
+            do {
+                added = recentlyProcessed.add(file);
+            } while (!added);
+        }
+    }
+
+    private Stream<String> getLines(File file) throws IOException {
+        if (file == null) {
+            return null;
+        } else if (GZipFiles.isGzip(file)) {
+            return GZipFiles.lines(Paths.get(file.getAbsolutePath()));
+        } else if (ZipFiles.isZip(file)) {
+            return ZipFiles.lines(Paths.get(file.getAbsolutePath()));
+        } else {
+            return Files.lines(Paths.get(file.getAbsolutePath()), 
Charset.defaultCharset());
+        }
+    }
+
+    private void process(File srcFile, int lineNumber, String line) {
+        source.consume(new FileRecord(srcFile, lineNumber, line.getBytes()));
+    }
+
+}
diff --git 
a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java 
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java
new file mode 100644
index 0000000..a13b923
--- /dev/null
+++ 
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
+
+/**
+ * Worker thread that checks the configured input directory for
+ * files that meet the provided filtering criteria, and publishes
+ * them to a work queue for processing by the FileConsumerThreads.
+ */
+public class FileListingThread extends Thread {
+
+    private final AtomicLong queueLastUpdated = new AtomicLong(0L);
+    private final Lock listingLock = new ReentrantLock();
+    private final AtomicReference<FileFilter> fileFilterRef = new 
AtomicReference<>();
+    private final BlockingQueue<File> workQueue;
+    private final BlockingQueue<File> inProcess;
+    private final BlockingQueue<File> recentlyProcessed;
+
+    private final String inputDir;
+    private final boolean recurseDirs;
+    private final boolean keepOriginal;
+    private final long pollingInterval;
+
+    public FileListingThread(FileSourceConfig fileConfig,
+            BlockingQueue<File> workQueue,
+            BlockingQueue<File> inProcess,
+            BlockingQueue<File> recentlyProcessed) {
+        this.workQueue = workQueue;
+        this.inProcess = inProcess;
+        this.recentlyProcessed = recentlyProcessed;
+
+        inputDir = fileConfig.getInputDirectory();
+        recurseDirs = 
Optional.ofNullable(fileConfig.getRecurse()).orElse(true);
+        keepOriginal = 
Optional.ofNullable(fileConfig.getKeepFile()).orElse(false);
+        pollingInterval = 
Optional.ofNullable(fileConfig.getPollingInterval()).orElse(10000L);
+        fileFilterRef.set(createFileFilter(fileConfig));
+    }
+
+    public void run() {
+        while (true) {
+            if ((queueLastUpdated.get() < System.currentTimeMillis() - 
pollingInterval) && listingLock.tryLock()) {
+                try {
+                    final File directory = new File(inputDir);
+                    final Set<File> listing = performListing(directory, 
fileFilterRef.get(), recurseDirs);
+
+                    if (listing != null && !listing.isEmpty()) {
+
+                        // Remove any files that have been or are currently 
being processed.
+                        listing.removeAll(inProcess);
+                        if (!keepOriginal) {
+                            listing.removeAll(recentlyProcessed);
+                        }
+
+                        for (File f: listing) {
+                            if (!workQueue.contains(f)) {
+                                workQueue.offer(f);
+                            }
+                        }
+                        queueLastUpdated.set(System.currentTimeMillis());
+                    }
+
+                 } finally {
+                    listingLock.unlock();
+                 }
+            }
+
+            try {
+                sleep(pollingInterval - 1);
+            } catch (InterruptedException e) {
+                // Just ignore
+            }
+        }
+    }
+
+    private Set<File> performListing(final File directory, final FileFilter 
filter,
+            final boolean recurseSubdirectories) {
+        Path p = directory.toPath();
+        if (!Files.isWritable(p) || !Files.isReadable(p)) {
+            throw new IllegalStateException("Directory '" + directory
+                    + "' does not have sufficient permissions (i.e., not 
writable and readable)");
+        }
+        final Set<File> queue = new HashSet<>();
+        if (!directory.exists()) {
+            return queue;
+        }
+
+        final File[] children = directory.listFiles();
+        if (children == null) {
+            return queue;
+        }
+
+        for (final File child : children) {
+            if (child.isDirectory()) {
+                if (recurseSubdirectories) {
+                    queue.addAll(performListing(child, filter, 
recurseSubdirectories));
+                }
+            } else if (filter.accept(child)) {
+                queue.add(child);
+            }
+        }
+
+        return queue;
+    }
+
+    private FileFilter createFileFilter(FileSourceConfig fileConfig) {
+        final long minSize = 
Optional.ofNullable(fileConfig.getMinimumSize()).orElse(1);
+        final Double maxSize = 
Optional.ofNullable(fileConfig.getMaximumSize()).orElse(Double.MAX_VALUE);
+        final long minAge = 
Optional.ofNullable(fileConfig.getMinimumFileAge()).orElse(0);
+        final Long maxAge = 
Optional.ofNullable(fileConfig.getMaximumFileAge()).orElse(Long.MAX_VALUE);
+        final boolean ignoreHidden = 
Optional.ofNullable(fileConfig.getIgnoreHiddenFiles()).orElse(true);
+        final Pattern filePattern = 
Pattern.compile(Optional.ofNullable(fileConfig.getFileFilter()).orElse("[^\\.].*"));
+        final String indir = fileConfig.getInputDirectory();
+        final String pathPatternStr = fileConfig.getPathFilter();
+        final Pattern pathPattern = (!recurseDirs || pathPatternStr == null) ? 
null : Pattern.compile(pathPatternStr);
+
+        return new FileFilter() {
+            @Override
+            public boolean accept(final File file) {
+                if (minSize > file.length()) {
+                    return false;
+                }
+                if (maxSize != null && maxSize < file.length()) {
+                    return false;
+                }
+                final long fileAge = System.currentTimeMillis() - 
file.lastModified();
+                if (minAge > fileAge) {
+                    return false;
+                }
+                if (maxAge != null && maxAge < fileAge) {
+                    return false;
+                }
+                if (ignoreHidden && file.isHidden()) {
+                    return false;
+                }
+                if (pathPattern != null) {
+                    Path reldir = 
Paths.get(indir).relativize(file.toPath()).getParent();
+                    if (reldir != null && !reldir.toString().isEmpty()) {
+                        if (!pathPattern.matcher(reldir.toString()).matches()) 
{
+                            return false;
+                        }
+                    }
+                }
+                //Verify that we have at least read permissions on the file 
we're considering grabbing
+                if (!Files.isReadable(file.toPath())) {
+                    return false;
+                }
+
+                /* Verify that if we're not keeping original that we have write
+                 * permissions on the directory the file is in
+                 */
+                if (!keepOriginal && 
!Files.isWritable(file.toPath().getParent())) {
+                    return false;
+                }
+                return filePattern.matcher(file.getName()).matches();
+            }
+        };
+    }
+}
diff --git 
a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileRecord.java 
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileRecord.java
new file mode 100644
index 0000000..0fb9448
--- /dev/null
+++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileRecord.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import lombok.Data;
+
+import org.apache.pulsar.functions.api.Record;
+
+/**
+ * Implementation of the Record interface for File Source data.
+ *    - The key is set to the source file name + the line number of the record.
+ *    - The value is set to the file contents for the given line number (in 
bytes)
+ *    - The following user properties are also set:
+ *      - The source file name
+ *      - The absolute path of the source file
+ *      - The last modified time of the source file.
+ */
+@Data
+public class FileRecord implements Record<byte[]> {
+
+    public static final String FILE_NAME = "file.name";
+    public static final String FILE_ABSOLUTE_PATH = "file.path";
+    public static final String FILE_MODIFIED_TIME = "file.modified.time";
+
+    private final Optional<String> key;
+    private final byte[] value;
+    private final HashMap<String, String> userProperties = new HashMap<String, 
String> ();
+
+    public FileRecord(File srcFile, int lineNumber, byte[] value) {
+        this.key = Optional.of(srcFile.getName() + "_" + lineNumber);
+        this.value = value;
+        this.setProperty(FILE_NAME, srcFile.getName());
+        this.setProperty(FILE_ABSOLUTE_PATH, srcFile.getAbsolutePath());
+        this.setProperty(FILE_MODIFIED_TIME, srcFile.lastModified() + "");
+    }
+
+    @Override
+    public Optional<String> getKey() {
+        return key;
+    }
+
+    @Override
+    public byte[] getValue() {
+        return value;
+    }
+
+    public Map<String, String> getProperties() {
+        return userProperties;
+    }
+
+    public void setProperty(String key, String value) {
+        userProperties.put(key, value);
+    }
+}
diff --git 
a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSource.java 
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSource.java
new file mode 100644
index 0000000..bc09c97
--- /dev/null
+++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSource.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import java.io.File;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.io.core.PushSource;
+import org.apache.pulsar.io.core.SourceContext;
+
+/**
+ * A simple connector to consume messages from the local file system.
+ * It can be configured to consume files recursively from a given
+ * directory, and can handle plain text, gzip, and zip formatted files.
+ */
+public class FileSource extends PushSource<byte[]> {
+
+    private ExecutorService executor;
+    private final BlockingQueue<File> workQueue = new LinkedBlockingQueue<>();
+    private final BlockingQueue<File> inProcess = new LinkedBlockingQueue<>();
+    private final BlockingQueue<File> recentlyProcessed = new 
LinkedBlockingQueue<>();
+
+    @Override
+    public void open(Map<String, Object> config, SourceContext sourceContext) 
throws Exception {
+        FileSourceConfig fileConfig = FileSourceConfig.load(config);
+        fileConfig.validate();
+
+        // One extra for the File listing thread, and another for the cleanup 
thread
+        executor = Executors.newFixedThreadPool(fileConfig.getNumWorkers() + 
2);
+        executor.execute(new FileListingThread(fileConfig, workQueue, 
inProcess, recentlyProcessed));
+        executor.execute(new ProcessedFileThread(fileConfig, 
recentlyProcessed));
+
+        for (int idx = 0; idx < fileConfig.getNumWorkers(); idx++) {
+            executor.execute(new FileConsumerThread(this, workQueue, 
inProcess, recentlyProcessed));
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        executor.shutdown();
+        try {
+            if (!executor.awaitTermination(800, TimeUnit.MILLISECONDS)) {
+                executor.shutdownNow();
+            }
+        } catch (InterruptedException e) {
+            executor.shutdownNow();
+        }
+    }
+}
diff --git 
a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSourceConfig.java 
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSourceConfig.java
new file mode 100644
index 0000000..efc7f63
--- /dev/null
+++ 
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSourceConfig.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.LinkOption;
+import java.nio.file.Paths;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * Configuration class for the File Source Connector.
+ */
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class FileSourceConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * The input directory from which to pull files.
+     */
+    private String inputDirectory;
+
+    /**
+     * Indicates whether or not to pull files from sub-directories.
+     */
+    private Boolean recurse;
+
+    /**
+     * If true, the file is not deleted after it has been processed and
+     * causes the file to be picked up continually.
+     */
+    private Boolean keepFile = Boolean.FALSE;
+
+    /**
+     * Only files whose names match the given regular expression will be 
picked up.
+     */
+    private String fileFilter = "[^\\.].*";
+
+    /**
+     * When 'recurse' property is true, then only sub-directories whose
+     * path matches the given regular expression will be scanned.
+     */
+    private String pathFilter;
+
+    /**
+     * The minimum age that a file must be in order to be processed; any file 
younger
+     * than this amount of time (according to last modification date) will be 
ignored.
+     */
+    private Integer minimumFileAge;
+
+    /**
+     * The maximum age that a file must be in order to be processed; any file 
older
+     * than this amount of time (according to last modification date) will be 
ignored.
+     */
+    private Long maximumFileAge;
+
+    /**
+     * The minimum size (in bytes) that a file must be in order to be 
processed.
+     */
+    private Integer minimumSize;
+
+    /**
+     * The maximum size (in bytes) that a file can be in order to be processed.
+     */
+    private Double maximumSize;
+
+    /**
+     * Indicates whether or not hidden files should be ignored or not.
+     */
+    private Boolean ignoreHiddenFiles;
+
+    /**
+     * Indicates how long to wait before performing a directory listing.
+     */
+    private Long pollingInterval;
+
+    /**
+     * The number of worker threads that will be processing the files.
+     * This allows you to process a larger number of files concurrently.
+     * However, setting this to a value greater than 1 will result in the data
+     * from multiple files being "intermingled" in the target topic.
+     */
+    private Integer numWorkers = 1;
+
+    public static FileSourceConfig load(String yamlFile) throws IOException {
+        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+        return mapper.readValue(new File(yamlFile), FileSourceConfig.class);
+    }
+
+    public static FileSourceConfig load(Map<String, Object> map) throws 
IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
FileSourceConfig.class);
+    }
+
+    public void validate() {
+        if (StringUtils.isBlank(inputDirectory)) {
+            throw new IllegalArgumentException("Required property not set.");
+        } else if (Files.notExists(Paths.get(inputDirectory), 
LinkOption.NOFOLLOW_LINKS)) {
+            throw new IllegalArgumentException("Specified input directory does 
not exist");
+        } else if (!Files.isReadable(Paths.get(inputDirectory))) {
+            throw new IllegalArgumentException("Specified input directory is 
not readable");
+        } else if (Optional.ofNullable(keepFile).orElse(false) && 
!Files.isWritable(Paths.get(inputDirectory))) {
+            throw new IllegalArgumentException("You have requested the 
consumed files to be deleted, but the "
+                    + "source directory is not writeable.");
+        }
+
+        if (StringUtils.isNotBlank(fileFilter)) {
+            try {
+                Pattern.compile(fileFilter);
+            } catch (final PatternSyntaxException psEx) {
+                throw new IllegalArgumentException("Invalid Regex pattern 
provided for fileFilter");
+            }
+        }
+
+        if (minimumFileAge != null &&  Math.signum(minimumFileAge) < 0) {
+            throw new IllegalArgumentException("The property minimumFileAge 
must be non-negative");
+        }
+
+        if (maximumFileAge != null && Math.signum(maximumFileAge) < 0) {
+            throw new IllegalArgumentException("The property maximumFileAge 
must be non-negative");
+        }
+
+        if (minimumSize != null && Math.signum(minimumSize) < 0) {
+            throw new IllegalArgumentException("The property minimumSize must 
be non-negative");
+        }
+
+        if (maximumSize != null && Math.signum(maximumSize) < 0) {
+            throw new IllegalArgumentException("The property maximumSize must 
be non-negative");
+        }
+
+        if (pollingInterval != null && pollingInterval <= 0) {
+            throw new IllegalArgumentException("The property pollingInterval 
must be greater than zero");
+        }
+
+        if (numWorkers != null && numWorkers <= 0) {
+            throw new IllegalArgumentException("The property numWorkers must 
be greater than zero");
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/ProcessedFileThread.java
 
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/ProcessedFileThread.java
new file mode 100644
index 0000000..65153e3
--- /dev/null
+++ 
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/ProcessedFileThread.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Worker thread that cleans up all the files that have been processed.
+ */
+public class ProcessedFileThread extends Thread {
+
+    private final BlockingQueue<File> recentlyProcessed;
+    private final boolean keepOriginal;
+
+    public ProcessedFileThread(FileSourceConfig fileConfig, 
BlockingQueue<File> recentlyProcessed) {
+        keepOriginal = 
Optional.ofNullable(fileConfig.getKeepFile()).orElse(false);
+        this.recentlyProcessed = recentlyProcessed;
+    }
+
+    public void run() {
+        try {
+            while (true) {
+                File file = recentlyProcessed.take();
+                handle(file);
+            }
+        } catch (InterruptedException ie) {
+            // just terminate
+        }
+    }
+
+    private void handle(File f) {
+        if (!keepOriginal) {
+            try {
+                Files.deleteIfExists(f.toPath());
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+}
diff --git 
a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/package-info.java 
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/package-info.java
new file mode 100644
index 0000000..0795343
--- /dev/null
+++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
\ No newline at end of file
diff --git 
a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/GZipFiles.java 
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/GZipFiles.java
new file mode 100644
index 0000000..aa38356
--- /dev/null
+++ 
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/GZipFiles.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file.utils;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PushbackInputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.stream.Stream;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.commons.io.IOUtils;
+
+/**
+ * Helper class that provides helper methods for working with
+ * gzip-formatted files.
+ */
+public class GZipFiles {
+
+    /**
+     * Returns true if the given file is a gzip file.
+     */
+   @SuppressWarnings("deprecation")
+   public static boolean isGzip(File f) {
+
+       InputStream input = null;
+        try {
+            input = new FileInputStream(f);
+            PushbackInputStream pb = new PushbackInputStream(input, 2);
+            byte [] signature = new byte[2];
+            int len = pb.read(signature); //read the signature
+            pb.unread(signature, 0, len); //push back the signature to the 
stream
+            // check if matches standard gzip magic number
+            return (signature[ 0 ] == (byte) 0x1f && signature[1] == (byte) 
0x8b);
+        } catch (final Exception e) {
+            return false;
+        } finally {
+            IOUtils.closeQuietly(input);
+        }
+    }
+
+    /**
+     * Get a lazily loaded stream of lines from a gzipped file, similar to
+     * {@link Files#lines(java.nio.file.Path)}.
+     *
+     * @param path
+     *          The path to the gzipped file.
+     * @return stream with lines.
+     */
+    public static Stream<String> lines(Path path) {
+      GZIPInputStream gzipStream = null;
+
+      try {
+        gzipStream = new GZIPInputStream(Files.newInputStream(path));
+      } catch (IOException e) {
+        closeSafely(gzipStream);
+        throw new UncheckedIOException(e);
+      }
+
+      BufferedReader reader = new BufferedReader(new 
InputStreamReader(gzipStream));
+      return reader.lines().onClose(() -> closeSafely(reader));
+    }
+
+    private static void closeSafely(Closeable closeable) {
+      if (closeable != null) {
+        try {
+          closeable.close();
+        } catch (IOException e) {
+          // Ignore
+        }
+      }
+    }
+}
diff --git 
a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/ZipFiles.java 
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/ZipFiles.java
new file mode 100644
index 0000000..09cd0c2
--- /dev/null
+++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/ZipFiles.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file.utils;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.stream.Stream;
+import java.util.zip.ZipInputStream;
+
+import org.apache.commons.io.IOUtils;
+
+/**
+ * Helper class that provides helper methods for working with
+ * zip-formatted files.
+ */
+public class ZipFiles {
+
+    /**
+     * Returns true if the given file is a gzip file.
+     */
+   @SuppressWarnings("deprecation")
+   public static boolean isZip(File f) {
+
+       InputStream input = null;
+        try {
+            DataInputStream in = new DataInputStream(new 
BufferedInputStream(new FileInputStream(f)));
+            int test = in.readInt();
+            in.close();
+            return test == 0x504b0304;
+        } catch (final Exception e) {
+            return false;
+        } finally {
+            IOUtils.closeQuietly(input);
+        }
+    }
+
+    /**
+     * Get a lazily loaded stream of lines from a gzipped file, similar to
+     * {@link Files#lines(java.nio.file.Path)}.
+     *
+     * @param path
+     *          The path to the zipped file.
+     * @return stream with lines.
+     */
+    public static Stream<String> lines(Path path) {
+        ZipInputStream zipStream = null;
+
+        try {
+          zipStream = new ZipInputStream(Files.newInputStream(path));
+        } catch (IOException e) {
+          closeSafely(zipStream);
+          throw new UncheckedIOException(e);
+        }
+        // Reader decoder = new InputStreamReader(gzipStream, 
Charset.defaultCharset());
+        BufferedReader reader = new BufferedReader(new 
InputStreamReader(zipStream));
+        return reader.lines().onClose(() -> closeSafely(reader));
+    }
+
+    private static void closeSafely(Closeable closeable) {
+        if (closeable != null) {
+          try {
+            closeable.close();
+          } catch (IOException e) {
+            // Ignore
+          }
+        }
+    }
+}
diff --git 
a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/package-info.java
 
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/package-info.java
new file mode 100644
index 0000000..2b21e91
--- /dev/null
+++ 
b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file.utils;
\ No newline at end of file
diff --git a/pulsar-io/file/src/main/resources/META-INF/services/pulsar-io.yaml 
b/pulsar-io/file/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000..df455c2
--- /dev/null
+++ b/pulsar-io/file/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: file
+description: Reads data from local filesystem
+sourceClass: org.apache.pulsar.io.file.FileSource
diff --git 
a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTests.java 
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTests.java
new file mode 100644
index 0000000..5934151
--- /dev/null
+++ 
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTests.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.LinkOption;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.nio.file.attribute.FileAttribute;
+import java.nio.file.attribute.PosixFilePermission;
+import java.nio.file.attribute.PosixFilePermissions;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+
+public abstract class AbstractFileTests {
+
+    public static final String TMP_DIR = "/tmp/foo";
+    
+    protected BlockingQueue<File> workQueue;
+    protected BlockingQueue<File> inProcess;
+    protected BlockingQueue<File> recentlyProcessed;
+    protected BlockingQueue<File> producedFiles;
+    
+    protected TestFileGenerator generatorThread; 
+    protected FileListingThread listingThread;
+    protected ExecutorService executor;
+    
+    @BeforeMethod
+    public void init() throws IOException {
+        
+        // Create the directory we are going to read from
+        Path directory = Paths.get(TMP_DIR);
+        
+        if (!Files.exists(directory, LinkOption.NOFOLLOW_LINKS)) {
+            Files.createDirectory(directory, getPermissions());
+        }
+        
+        workQueue = Mockito.spy(new LinkedBlockingQueue<>());
+        inProcess = Mockito.spy(new LinkedBlockingQueue<>());         
+        recentlyProcessed = Mockito.spy(new LinkedBlockingQueue<>());
+        producedFiles = Mockito.spy(new LinkedBlockingQueue<>());
+        executor = Executors.newFixedThreadPool(10);
+    }
+    
+    @AfterMethod
+    public void tearDown() throws Exception {
+        // Shutdown all of the processing threads
+        stopThreads();
+        
+        // Delete the directory and all the files
+        cleanUp();
+    }
+    
+    protected static final void cleanUp() throws IOException {
+        Path directory = Paths.get(TMP_DIR);
+        
+        if (!Files.exists(directory, LinkOption.NOFOLLOW_LINKS)) {
+            return;
+        }
+        
+        Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
+           @Override
+           public FileVisitResult visitFile(Path file, BasicFileAttributes 
attrs) throws IOException {
+               Files.delete(file);
+               return FileVisitResult.CONTINUE;
+           }
+
+           @Override
+           public FileVisitResult postVisitDirectory(Path dir, IOException 
exc) throws IOException {
+               Files.delete(dir);
+               return FileVisitResult.CONTINUE;
+           }
+        });
+    }
+    
+    protected void stopThreads() throws Exception {
+        executor.shutdown();
+        try {
+            if (!executor.awaitTermination(800, TimeUnit.MILLISECONDS)) {
+                executor.shutdownNow();
+            } 
+        } catch (InterruptedException e) {
+            executor.shutdownNow();
+        }
+    }
+    
+    protected final void generateFiles(int numFiles) throws IOException, 
InterruptedException, ExecutionException {
+        generateFiles(numFiles, 1, TMP_DIR);
+    }
+    
+    protected final void generateFiles(int numFiles, int numLines) throws 
IOException, InterruptedException, ExecutionException {
+        generateFiles(numFiles, numLines, TMP_DIR);
+    }
+    
+    protected final void generateFiles(int numFiles, int numLines, String 
directory) throws IOException, InterruptedException, ExecutionException {
+        generatorThread = new TestFileGenerator(producedFiles, numFiles, 1, 
numLines, directory, "prefix", ".txt", getPermissions());
+        Future<?> f = executor.submit(generatorThread);
+        f.get();
+    }
+   
+    protected static final FileAttribute<Set<PosixFilePermission>> 
getPermissions() {
+        Set<PosixFilePermission> perms = 
PosixFilePermissions.fromString("rwxrwxrwx");
+        return PosixFilePermissions.asFileAttribute(perms);
+    }
+    
+}
diff --git 
a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java
 
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java
new file mode 100644
index 0000000..a21633d
--- /dev/null
+++ 
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.PushSource;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+@SuppressWarnings("unchecked")
+public class FileConsumerThreadTests extends AbstractFileTests {
+    
+    private PushSource<byte[]> consumer;
+    private FileConsumerThread consumerThread;
+
+    @Test
+    public final void singleFileTest() throws IOException {
+        
+        consumer = Mockito.mock(PushSource.class);
+        Mockito.doNothing().when(consumer).consume((Record<byte[]>) 
any(Record.class));
+        
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        
+        try {
+            generateFiles(1);
+            listingThread = new FileListingThread(FileSourceConfig.load(map), 
workQueue, inProcess, recentlyProcessed);
+            consumerThread = new FileConsumerThread(consumer, workQueue, 
inProcess, recentlyProcessed);
+            executor.execute(listingThread);
+            executor.execute(consumerThread);
+            Thread.sleep(2000);
+            
+            for (File produced : producedFiles) {
+                verify(workQueue, times(1)).offer(produced);
+                verify(inProcess, times(1)).add(produced);
+                verify(inProcess, times(1)).remove(produced);
+                verify(recentlyProcessed, times(1)).add(produced);
+            }
+            
+            verify(workQueue, times(1)).offer(any(File.class));
+            verify(workQueue, atLeast(1)).take();
+            verify(inProcess, times(1)).add(any(File.class));
+            verify(inProcess, times(1)).remove(any(File.class));
+            verify(recentlyProcessed, times(1)).add(any(File.class));
+            verify(consumer, times(1)).consume((Record<byte[]>) 
any(Record.class));
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } 
+    }
+    
+    @Test
+    public final void mulitpleFileTest() throws IOException {
+        
+        consumer = Mockito.mock(PushSource.class);
+        Mockito.doNothing().when(consumer).consume((Record<byte[]>) 
any(Record.class));
+        
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        
+        try {
+            generateFiles(50, 2);
+            listingThread = new FileListingThread(FileSourceConfig.load(map), 
workQueue, inProcess, recentlyProcessed);
+            consumerThread = new FileConsumerThread(consumer, workQueue, 
inProcess, recentlyProcessed);
+            executor.execute(listingThread);
+            executor.execute(consumerThread);
+            Thread.sleep(2000);
+            
+            for (File produced : producedFiles) {
+                verify(workQueue, times(1)).offer(produced);
+                verify(inProcess, times(1)).add(produced);
+                verify(inProcess, times(1)).remove(produced);
+                verify(recentlyProcessed, times(1)).add(produced);
+            }
+            
+            verify(workQueue, times(50)).offer(any(File.class));
+            verify(workQueue, atLeast(50)).take();
+            verify(inProcess, times(50)).add(any(File.class));
+            verify(inProcess, times(50)).remove(any(File.class));
+            verify(recentlyProcessed, times(50)).add(any(File.class));
+            verify(consumer, times(100)).consume((Record<byte[]>) 
any(Record.class));
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } 
+    }
+    
+    @Test
+    public final void multiLineFileTest() throws IOException {
+        
+        consumer = Mockito.mock(PushSource.class);
+        Mockito.doNothing().when(consumer).consume((Record<byte[]>) 
any(Record.class));
+        
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        
+        try {
+            generateFiles(1, 10);
+            listingThread = new FileListingThread(FileSourceConfig.load(map), 
workQueue, inProcess, recentlyProcessed);
+            consumerThread = new FileConsumerThread(consumer, workQueue, 
inProcess, recentlyProcessed);
+            executor.execute(listingThread);
+            executor.execute(consumerThread);
+            Thread.sleep(2000);
+            
+            for (File produced : producedFiles) {
+                verify(workQueue, times(1)).offer(produced);
+                verify(inProcess, times(1)).add(produced);
+                verify(inProcess, times(1)).remove(produced);
+                verify(recentlyProcessed, times(1)).add(produced);
+            }
+            
+            verify(workQueue, times(1)).offer(any(File.class));
+            verify(workQueue, atLeast(1)).take();
+            verify(inProcess, times(1)).add(any(File.class));
+            verify(inProcess, times(1)).remove(any(File.class));
+            verify(recentlyProcessed, times(1)).add(any(File.class));
+            verify(consumer, times(10)).consume((Record<byte[]>) 
any(Record.class));
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } 
+    }
+}
diff --git 
a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java
 
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java
new file mode 100644
index 0000000..855498e
--- /dev/null
+++ 
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import static org.testng.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.testng.annotations.Test;
+
+
+public class FileListingThreadTests extends AbstractFileTests {
+     
+    @Test
+    public final void singleFileTest() throws IOException {  
+        
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+      
+        try {
+            generateFiles(1);
+            listingThread = new FileListingThread(FileSourceConfig.load(map), 
workQueue, inProcess, recentlyProcessed);
+            executor.execute(listingThread);
+            Thread.sleep(2000);
+            verify(producedFiles, times(1)).put(any(File.class));
+            verify(workQueue, times(1)).offer(any(File.class));
+            
+            for (File produced : producedFiles) {
+                verify(workQueue, times(1)).offer(produced);
+            }
+            
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } 
+    }
+    
+    @Test
+    public final void fiftyFileTest() throws IOException {
+        
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        
+        try {
+            generateFiles(50);
+            listingThread = new FileListingThread(FileSourceConfig.load(map), 
workQueue, inProcess, recentlyProcessed);
+            executor.execute(listingThread);
+            Thread.sleep(2000);
+            verify(workQueue, times(50)).offer(any(File.class));
+            
+            for (File produced : producedFiles) {
+                verify(workQueue, times(1)).offer(produced);
+            }
+            
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } 
+    }
+    
+    @Test
+    public final void minimumSizeTest() throws IOException {
+        
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        
+        try {
+            // Create 50 zero size files
+            generateFiles(50, 0);
+            listingThread = new FileListingThread(FileSourceConfig.load(map), 
workQueue, inProcess, recentlyProcessed);
+            executor.execute(listingThread);
+            Thread.sleep(2000);
+            verify(workQueue, times(0)).offer(any(File.class));
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } 
+    }
+    
+    @Test
+    public final void maximumSizeTest() throws IOException {
+        
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        map.put("maximumSize", "1000");
+        
+        try {
+            // Create 5 files that exceed the limit and 45 that don't
+            generateFiles(5, 1000);
+            generateFiles(45, 10);
+            listingThread = new FileListingThread(FileSourceConfig.load(map), 
workQueue, inProcess, recentlyProcessed);
+            executor.execute(listingThread);
+            Thread.sleep(2000);
+            verify(workQueue, times(45)).offer(any(File.class));
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } finally {
+            cleanUp();
+        }
+    }
+    
+    @Test
+    public final void minimumAgeTest() throws IOException {
+        
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        map.put("minimumFileAge", "5000");
+        
+        try {
+            // Create 5 files that will be too "new" for processing
+            generateFiles(5);
+            listingThread = new FileListingThread(FileSourceConfig.load(map), 
workQueue, inProcess, recentlyProcessed);
+            executor.execute(listingThread);
+            Thread.sleep(2000);
+            verify(workQueue, times(0)).offer(any(File.class));
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } finally {
+            cleanUp();
+        }
+    }
+    
+    @Test
+    public final void maximumAgeTest() throws IOException {
+        
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        map.put("maximumFileAge", "5000");
+        
+        try {
+            // Create 5 files that will be processed
+            generateFiles(5);
+            Thread.sleep(5000);
+            
+            // Create 5 files that will be too "old" for processing
+            generateFiles(5);
+            listingThread = new FileListingThread(FileSourceConfig.load(map), 
workQueue, inProcess, recentlyProcessed);
+            executor.execute(listingThread);
+            Thread.sleep(2000);
+            verify(workQueue, times(5)).offer(any(File.class));
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } finally {
+            cleanUp();
+        }
+    }
+    
+    @Test
+    public final void doRecurseTest() throws IOException {
+       
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        map.put("recurse", Boolean.TRUE);
+        
+        try {
+            // Create 5 files in the root folder
+            generateFiles(5);
+            
+            // Create 5 files in a sub-folder
+            generateFiles(5, 1, TMP_DIR + File.separator + "sub-dir");
+            listingThread = new FileListingThread(FileSourceConfig.load(map), 
workQueue, inProcess, recentlyProcessed);
+            executor.execute(listingThread);
+            Thread.sleep(2000);
+            verify(workQueue, times(10)).offer(any(File.class));
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } finally {
+            cleanUp();
+        }       
+    }
+    
+    @Test
+    public final void doNotRecurseTest() throws IOException {
+        
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        map.put("recurse", Boolean.FALSE);
+        
+        try {
+            // Create 5 files in the root folder
+            generateFiles(5);
+            
+            // Create 5 files in a sub-folder
+            generateFiles(5, 1, TMP_DIR + File.separator + "sub-dir");
+            listingThread = new FileListingThread(FileSourceConfig.load(map), 
workQueue, inProcess, recentlyProcessed);
+            executor.execute(listingThread);
+            Thread.sleep(2000);
+            verify(workQueue, times(5)).offer(any(File.class));
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } finally {
+            cleanUp();
+        }    
+    }
+    
+    @Test
+    public final void pathFilterTest() throws IOException {
+         
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        map.put("recurse", Boolean.TRUE);
+        map.put("pathFilter", "sub-.*");
+        
+        try {
+            // Create 5 files in a sub-folder
+            generateFiles(5, 1, TMP_DIR + File.separator + "sub-dir-a");
+            generateFiles(5, 1, TMP_DIR + File.separator + "dir-b");
+            listingThread = new FileListingThread(FileSourceConfig.load(map), 
workQueue, inProcess, recentlyProcessed);
+            executor.execute(listingThread);
+            Thread.sleep(2000);
+            verify(workQueue, times(5)).offer(any(File.class));
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } finally {
+            cleanUp();
+        }       
+    }
+}
diff --git 
a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileSourceConfigTests.java
 
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileSourceConfigTests.java
new file mode 100644
index 0000000..64144e6
--- /dev/null
+++ 
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileSourceConfigTests.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import static org.testng.Assert.assertNotNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.testng.annotations.Test;
+
+public class FileSourceConfigTests {
+
+    @Test
+    public final void loadFromYamlFileTest() throws IOException {
+        File yamlFile = getFile("sinkConfig.yaml");
+        FileSourceConfig config = 
FileSourceConfig.load(yamlFile.getAbsolutePath());
+        assertNotNull(config);
+    }
+    
+    @Test
+    public final void loadFromMapTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", "/tmp");
+        map.put("keepFile", false);
+        
+        FileSourceConfig config = FileSourceConfig.load(map);
+        assertNotNull(config);
+    }
+    
+    @Test
+    public final void validValidateTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", "/tmp");
+        
+        FileSourceConfig config = FileSourceConfig.load(map);
+        assertNotNull(config);
+        config.validate();
+    }
+    
+    @Test(expectedExceptions = IllegalArgumentException.class, 
+            expectedExceptionsMessageRegExp = "Required property not set.")
+    public final void missingRequiredPropertiesTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("pathFilter", "/");
+        
+        FileSourceConfig config = FileSourceConfig.load(map);
+        assertNotNull(config);
+        config.validate();
+    }
+    
+    @Test(expectedExceptions = 
com.fasterxml.jackson.databind.exc.InvalidFormatException.class)
+    public final void InvalidBooleanPropertyTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", "/");
+        map.put("recurse", "not a boolean");
+        
+        FileSourceConfig config = FileSourceConfig.load(map);
+        assertNotNull(config);
+        config.validate();
+    }
+    
+    @Test(expectedExceptions = IllegalArgumentException.class, 
+            expectedExceptionsMessageRegExp = "The property pollingInterval 
must be greater than zero")
+    public final void ZeroValueTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", "/");
+        map.put("pollingInterval", 0);
+        
+        FileSourceConfig config = FileSourceConfig.load(map);
+        assertNotNull(config);
+        config.validate();
+    }
+    
+    @Test(expectedExceptions = IllegalArgumentException.class, 
+            expectedExceptionsMessageRegExp = "The property minimumFileAge 
must be non-negative")
+    public final void NegativeValueTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", "/");
+        map.put("minimumFileAge", "-50");
+        
+        FileSourceConfig config = FileSourceConfig.load(map);
+        assertNotNull(config);
+        config.validate();
+    }
+    
+    @Test(expectedExceptions = IllegalArgumentException.class, 
+            expectedExceptionsMessageRegExp = "Invalid Regex pattern provided 
for fileFilter")
+    public final void invalidFileFilterTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", "/");
+        map.put("fileFilter", "\\");  // Results in a single '\' being sent.
+        
+        FileSourceConfig config = FileSourceConfig.load(map);
+        assertNotNull(config);
+        config.validate();
+    }
+    
+    private File getFile(String name) {
+        ClassLoader classLoader = getClass().getClassLoader();
+        return new File(classLoader.getResource(name).getFile());
+    }
+}
diff --git 
a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java
 
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java
new file mode 100644
index 0000000..98d35a9
--- /dev/null
+++ 
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.PushSource;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+@SuppressWarnings("unchecked")
+public class ProcessedFileThreadTests extends AbstractFileTests {
+
+    private PushSource<byte[]> consumer;
+    private FileConsumerThread consumerThread;
+    private ProcessedFileThread cleanupThread;
+    private FileSourceConfig fileConfig;
+
+    @Test
+    public final void singleFileTest() throws IOException {
+        
+        consumer = Mockito.mock(PushSource.class);
+        Mockito.doNothing().when(consumer).consume((Record<byte[]>) 
any(Record.class));
+        
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        map.put("keepFile", Boolean.FALSE);
+        
+        try {
+            generateFiles(1);
+            fileConfig = FileSourceConfig.load(map);
+            listingThread = new FileListingThread(fileConfig, workQueue, 
inProcess, recentlyProcessed);
+            consumerThread = new FileConsumerThread(consumer, workQueue, 
inProcess, recentlyProcessed);
+            cleanupThread = new ProcessedFileThread(fileConfig, 
recentlyProcessed);
+            executor.execute(listingThread);
+            executor.execute(consumerThread);
+            executor.execute(cleanupThread);
+            Thread.sleep(2000);
+            
+            for (File produced : producedFiles) {
+                verify(workQueue, times(1)).offer(produced);
+                verify(inProcess, times(1)).add(produced);
+                verify(inProcess, times(1)).remove(produced);
+                verify(recentlyProcessed, times(1)).add(produced);
+            }
+            
+            verify(workQueue, times(1)).offer(any(File.class));
+            verify(workQueue, atLeast(1)).take();
+            verify(inProcess, times(1)).add(any(File.class));
+            verify(inProcess, times(1)).remove(any(File.class));
+            verify(recentlyProcessed, times(1)).add(any(File.class));
+            verify(recentlyProcessed, times(2)).take(); 
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } 
+    }
+    
+    @Test
+    public final void mulitpleFileTest() throws IOException {
+        
+        consumer = Mockito.mock(PushSource.class);
+        Mockito.doNothing().when(consumer).consume((Record<byte[]>) 
any(Record.class));
+       
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        map.put("keepFile", Boolean.FALSE);
+        
+        try {
+            generateFiles(50);
+            fileConfig = FileSourceConfig.load(map);
+            listingThread = new FileListingThread(fileConfig, workQueue, 
inProcess, recentlyProcessed);
+            consumerThread = new FileConsumerThread(consumer, workQueue, 
inProcess, recentlyProcessed);
+            cleanupThread = new ProcessedFileThread(fileConfig, 
recentlyProcessed);
+            executor.execute(listingThread);
+            executor.execute(consumerThread);
+            executor.execute(cleanupThread);
+            Thread.sleep(2000);
+            
+            for (File produced : producedFiles) {
+                verify(workQueue, times(1)).offer(produced);
+                verify(inProcess, times(1)).add(produced);
+                verify(inProcess, times(1)).remove(produced);
+                verify(recentlyProcessed, times(1)).add(produced);
+            }
+            
+            verify(workQueue, times(50)).offer(any(File.class));
+            verify(workQueue, atLeast(50)).take();
+            verify(inProcess, times(50)).add(any(File.class));
+            verify(inProcess, times(50)).remove(any(File.class));
+            verify(recentlyProcessed, times(50)).add(any(File.class));
+            verify(recentlyProcessed, times(50)).add(any(File.class));
+            verify(recentlyProcessed, times(51)).take(); 
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } 
+    }
+    
+    @Test
+    public final void keepFileTest() throws IOException {
+        
+        consumer = Mockito.mock(PushSource.class);
+        Mockito.doNothing().when(consumer).consume((Record<byte[]>) 
any(Record.class));
+       
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        map.put("keepFile", Boolean.TRUE);
+        map.put("pollingInterval", 1000L);
+        
+        try {
+            generateFiles(1);
+            fileConfig = FileSourceConfig.load(map);
+            listingThread = new FileListingThread(fileConfig, workQueue, 
inProcess, recentlyProcessed);
+            consumerThread = new FileConsumerThread(consumer, workQueue, 
inProcess, recentlyProcessed);
+            cleanupThread = new ProcessedFileThread(fileConfig, 
recentlyProcessed);
+            executor.execute(listingThread);
+            executor.execute(consumerThread);
+            executor.execute(cleanupThread);
+            Thread.sleep(7900);  // Should pull the same file 5 times?
+            
+            for (File produced : producedFiles) {
+                verify(workQueue, atLeast(4)).offer(produced);
+                verify(inProcess, atLeast(4)).add(produced);
+                verify(inProcess, atLeast(4)).remove(produced);
+                verify(recentlyProcessed, atLeast(4)).add(produced);
+            }
+            
+            verify(recentlyProcessed, atLeast(5)).take(); 
+        } catch (InterruptedException | ExecutionException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } 
+    }
+    
+    @Test
+    public final void continuousRunTest() throws IOException {
+        
+        consumer = Mockito.mock(PushSource.class);
+        Mockito.doNothing().when(consumer).consume((Record<byte[]>) 
any(Record.class));
+       
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        map.put("keepFile", Boolean.FALSE);
+        map.put("pollingInterval", 100);
+        fileConfig = FileSourceConfig.load(map);
+        
+        try {
+            // Start producing files, with a .1 sec delay between
+            generatorThread = new TestFileGenerator(producedFiles, 5000, 100, 
1, TMP_DIR, "continuous", ".txt", getPermissions());
+            executor.execute(generatorThread);
+            
+            listingThread = new FileListingThread(fileConfig, workQueue, 
inProcess, recentlyProcessed);
+            consumerThread = new FileConsumerThread(consumer, workQueue, 
inProcess, recentlyProcessed);
+            cleanupThread = new ProcessedFileThread(fileConfig, 
recentlyProcessed);
+            executor.execute(listingThread);
+            executor.execute(consumerThread);
+            executor.execute(cleanupThread);
+            
+            // Run for 30 seconds
+            Thread.sleep(30000);
+            
+            // Stop producing files
+            generatorThread.halt();
+            
+            // Let the consumer catch up
+            while (!workQueue.isEmpty() && !inProcess.isEmpty() && 
!recentlyProcessed.isEmpty()) {
+                Thread.sleep(2000);
+            }
+            
+            // Make sure every single file was processed.
+            for (File produced : producedFiles) {
+                verify(workQueue, times(1)).offer(produced);
+                verify(inProcess, times(1)).add(produced);
+                verify(inProcess, times(1)).remove(produced);
+                verify(recentlyProcessed, times(1)).add(produced);
+            }
+            
+        } catch (InterruptedException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } 
+    }
+    
+    @Test
+    public final void multipleConsumerTest() throws IOException {
+        
+        consumer = Mockito.mock(PushSource.class);
+        Mockito.doNothing().when(consumer).consume((Record<byte[]>) 
any(Record.class));
+       
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("inputDirectory", TMP_DIR);
+        map.put("keepFile", Boolean.FALSE);
+        map.put("pollingInterval", 100);
+        fileConfig = FileSourceConfig.load(map);
+        
+        try {
+            // Start producing files, with a .1 sec delay between
+            generatorThread = new TestFileGenerator(producedFiles, 5000, 100, 
1, TMP_DIR, "continuous", ".txt", getPermissions());
+            executor.execute(generatorThread);
+            
+            listingThread = new FileListingThread(fileConfig, workQueue, 
inProcess, recentlyProcessed);
+            consumerThread = new FileConsumerThread(consumer, workQueue, 
inProcess, recentlyProcessed);
+            FileConsumerThread consumerThread2 = new 
FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed);
+            FileConsumerThread consumerThread3 = new 
FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed);
+            cleanupThread = new ProcessedFileThread(fileConfig, 
recentlyProcessed);
+            executor.execute(listingThread);
+            executor.execute(consumerThread);
+            executor.execute(consumerThread2);
+            executor.execute(consumerThread3);
+            executor.execute(cleanupThread);
+            
+            // Run for 30 seconds
+            Thread.sleep(30000);
+            
+            // Stop producing files
+            generatorThread.halt();
+            
+            // Let the consumer catch up
+            while (!workQueue.isEmpty() && !inProcess.isEmpty() && 
!recentlyProcessed.isEmpty()) {
+                Thread.sleep(2000);
+            }
+            
+            // Make sure every single file was processed exactly once.
+            for (File produced : producedFiles) {
+                verify(workQueue, times(1)).offer(produced);
+                verify(inProcess, times(1)).add(produced);
+                verify(inProcess, times(1)).remove(produced);
+                verify(recentlyProcessed, times(1)).add(produced);
+            }
+            
+        } catch (InterruptedException e) {
+            fail("Unable to generate files" + e.getLocalizedMessage());
+        } 
+    }
+}
diff --git 
a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/TestFileGenerator.java 
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/TestFileGenerator.java
new file mode 100644
index 0000000..21ad852
--- /dev/null
+++ 
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/TestFileGenerator.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.nio.file.attribute.FileAttribute;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.RandomStringUtils;
+
+public class TestFileGenerator extends Thread {
+
+    // Allows us to communicate back which files we generated
+    private final BlockingQueue<File> producedFiles;
+    private final int numFiles;
+    private final long delay;
+    private final int numLines;
+    private final String prefix;
+    private final String suffix;
+    private final FileAttribute<?>[] attrs;
+    private final Path tempDir;
+    private boolean keepRunning = true;
+    
+    public TestFileGenerator(BlockingQueue<File> producedFiles, int numFiles, 
long delay, int numLines, 
+            String dir, String prefix, String suffix, FileAttribute<?>... 
attrs) throws IOException {
+        this.numFiles = numFiles;
+        this.delay = delay;
+        this.numLines = numLines;
+        this.producedFiles = producedFiles;
+        this.prefix = prefix;
+        this.suffix = suffix;
+        this.attrs = attrs;
+        tempDir = Files.createDirectories(Paths.get(dir), attrs);
+    }
+    
+    public void run() {
+        int counter = 0;
+        while  ( keepRunning && (counter++ < numFiles)) {
+            createFile();
+            try {
+                sleep(delay);
+            } catch (InterruptedException e) {
+                return;
+            }
+        }
+    }
+    
+    public void halt() {
+        keepRunning = false;
+    }
+    
+    private final void createFile() {
+        try {
+            Path path = Files.createTempFile(tempDir, prefix, suffix, attrs);
+            try(OutputStream out = Files.newOutputStream(path, 
StandardOpenOption.APPEND)) {
+              for (int idx = 0; idx < numLines; idx++) {
+                 IOUtils.write(RandomStringUtils.random(50, true, false) + 
"\n", out, "UTF-8");
+              }
+            }
+            
+            producedFiles.put(path.toFile());
+            
+        } catch (IOException | InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+}
diff --git 
a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/utils/GZipFilesTests.java
 
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/utils/GZipFilesTests.java
new file mode 100644
index 0000000..b08bb00
--- /dev/null
+++ 
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/utils/GZipFilesTests.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file.utils;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.stream.Stream;
+
+import org.testng.annotations.Test;
+
+public class GZipFilesTests {
+
+    @Test
+    public final void validGzipFileTest() {
+        
assertTrue(GZipFiles.isGzip(getFile("org/apache/pulsar/io/file/validGzip.gz")));
+    }
+    
+    @Test
+    public final void nonGzipFileTest() {
+        
assertFalse(GZipFiles.isGzip(getFile("org/apache/pulsar/io/file/nonGzipFile.txt")));
+    }
+    
+    @Test
+    public final void mislabelledGzipFileTest() {
+        
assertFalse(GZipFiles.isGzip(getFile("org/apache/pulsar/io/file/mislabelled.gz")));
+    }
+    
+    @Test
+    public final void nonExistantGzipFileTest() {
+        assertFalse(GZipFiles.isGzip(null));
+    }
+    
+    @Test
+    public final void streamGzipFileTest() {
+        Path path = 
Paths.get(getFile("org/apache/pulsar/io/file/validGzip.gz").getAbsolutePath(), 
"");
+        
+        try (Stream<String> lines = GZipFiles.lines(path)) {
+            lines.forEachOrdered(line -> assertTrue(line.startsWith("Line ")));
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+    
+    private File getFile(String name) {
+        ClassLoader classLoader = getClass().getClassLoader();
+        return new File(classLoader.getResource(name).getFile());
+    }
+}
diff --git 
a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/utils/ZipFilesTests.java
 
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/utils/ZipFilesTests.java
new file mode 100644
index 0000000..2fc7286
--- /dev/null
+++ 
b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/utils/ZipFilesTests.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.file.utils;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.stream.Stream;
+
+import org.testng.annotations.Test;
+
+public class ZipFilesTests {
+
+    @Test
+    public final void validZipFileTest() {
+        
assertTrue(ZipFiles.isZip(getFile("org/apache/pulsar/io/file/validZip.zip")));
+    }
+    
+    @Test
+    public final void nonZipFileTest() {
+        
assertFalse(ZipFiles.isZip(getFile("org/apache/pulsar/io/file/nonGzipFile.txt")));
+    }
+    
+    @Test
+    public final void mislabelledZipFileTest() {
+        
assertFalse(ZipFiles.isZip(getFile("org/apache/pulsar/io/file/mislabelled.gz")));
+    }
+    
+    @Test
+    public final void nonExistantGzipFileTest() {
+        assertFalse(ZipFiles.isZip(null));
+    }
+    
+    @Test
+    public final void streamZipFileTest() {
+        Path path = 
Paths.get(getFile("org/apache/pulsar/io/file/validZip.zip").getAbsolutePath(), 
"");
+        
+        try (Stream<String> lines = ZipFiles.lines(path)) {
+            lines.forEachOrdered(line -> assertTrue(line.startsWith("Line ")));
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+    
+    private File getFile(String name) {
+        ClassLoader classLoader = getClass().getClassLoader();
+        return new File(classLoader.getResource(name).getFile());
+    }
+}
diff --git 
a/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/mislabelled.gz 
b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/mislabelled.gz
new file mode 100644
index 0000000..529587e
--- /dev/null
+++ b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/mislabelled.gz
@@ -0,0 +1 @@
+This file isn't gzipped.
\ No newline at end of file
diff --git 
a/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/nonGzipFile.txt 
b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/nonGzipFile.txt
new file mode 100644
index 0000000..fbd35c6
--- /dev/null
+++ 
b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/nonGzipFile.txt
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+This file is not gzipped
\ No newline at end of file
diff --git 
a/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/validGzip.gz 
b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/validGzip.gz
new file mode 100644
index 0000000..f7d098d
Binary files /dev/null and 
b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/validGzip.gz 
differ
diff --git 
a/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/validZip.zip 
b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/validZip.zip
new file mode 100644
index 0000000..55c28e3
Binary files /dev/null and 
b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/validZip.zip 
differ
diff --git a/pulsar-io/file/src/test/resources/sinkConfig.yaml 
b/pulsar-io/file/src/test/resources/sinkConfig.yaml
new file mode 100644
index 0000000..554b0f9
--- /dev/null
+++ b/pulsar-io/file/src/test/resources/sinkConfig.yaml
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+{
+   "inputDirectory": "/Users/david",
+   "recurse": true,
+   "keepFile": true,
+   "fileFilter": "[^\\.].*",
+   "pathFilter": "*",
+   "minimumFileAge": 0,
+   "maximumFileAge": 9999999999,
+   "minimumSize": 1,
+   "maximumSize": 5000000,
+   "ignoreHiddenFiles": true,
+   "pollingInterval": 5000,
+   "numWorkers": 1
+}
\ No newline at end of file
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index 6503b66..628686b 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -48,6 +48,7 @@
     <module>debezium</module>
     <module>hdfs2</module>
     <module>canal</module>
+    <module>file</module>
     <module>netty</module>
   </modules>
 

Reply via email to