This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 209c448 Added Pulsar IO connector for local files (#2869) 209c448 is described below commit 209c44883ad30b13b1a693a02aff8858c287401e Author: David Kjerrumgaard <35466513+david-stream...@users.noreply.github.com> AuthorDate: Thu Dec 27 09:49:04 2018 -0800 Added Pulsar IO connector for local files (#2869) ### Motivation Added a Pulsar IO connector for consuming files from the local filesystem ### Modifications Added a new module to the pulsar-io module that includes the Pulsar file connector and its associated classes & tests ### Result After your change, users will be able to consume files from the local filesystem, and have the contents directly published to a Pulsar topic. --- pulsar-io/file/pom.xml | 70 ++++++ .../apache/pulsar/io/file/FileConsumerThread.java | 108 +++++++++ .../apache/pulsar/io/file/FileListingThread.java | 189 +++++++++++++++ .../java/org/apache/pulsar/io/file/FileRecord.java | 75 ++++++ .../java/org/apache/pulsar/io/file/FileSource.java | 70 ++++++ .../apache/pulsar/io/file/FileSourceConfig.java | 177 ++++++++++++++ .../apache/pulsar/io/file/ProcessedFileThread.java | 60 +++++ .../org/apache/pulsar/io/file/package-info.java | 19 ++ .../org/apache/pulsar/io/file/utils/GZipFiles.java | 96 ++++++++ .../org/apache/pulsar/io/file/utils/ZipFiles.java | 94 ++++++++ .../apache/pulsar/io/file/utils/package-info.java | 19 ++ .../resources/META-INF/services/pulsar-io.yaml | 22 ++ .../apache/pulsar/io/file/AbstractFileTests.java | 138 +++++++++++ .../pulsar/io/file/FileConsumerThreadTests.java | 148 ++++++++++++ .../pulsar/io/file/FileListingThreadTests.java | 238 +++++++++++++++++++ .../pulsar/io/file/FileSourceConfigTests.java | 121 ++++++++++ .../pulsar/io/file/ProcessedFileThreadTests.java | 259 +++++++++++++++++++++ .../apache/pulsar/io/file/TestFileGenerator.java | 91 ++++++++ .../pulsar/io/file/utils/GZipFilesTests.java | 68 ++++++ .../apache/pulsar/io/file/utils/ZipFilesTests.java | 68 ++++++ .../org/apache/pulsar/io/file/mislabelled.gz | 1 + .../org/apache/pulsar/io/file/nonGzipFile.txt | 20 ++ .../org/apache/pulsar/io/file/validGzip.gz | Bin 0 -> 60 bytes .../org/apache/pulsar/io/file/validZip.zip | Bin 0 -> 200 bytes pulsar-io/file/src/test/resources/sinkConfig.yaml | 33 +++ pulsar-io/pom.xml | 1 + 26 files changed, 2185 insertions(+) diff --git a/pulsar-io/file/pom.xml b/pulsar-io/file/pom.xml new file mode 100644 index 0000000..c2d892f --- /dev/null +++ b/pulsar-io/file/pom.xml @@ -0,0 +1,70 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-io</artifactId> + <version>2.3.0-SNAPSHOT</version> + </parent> + + <artifactId>pulsar-io-file</artifactId> + <name>Pulsar IO :: File</name> + + <dependencies> + + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>pulsar-io-core</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.dataformat</groupId> + <artifactId>jackson-dataformat-yaml</artifactId> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + +</project> \ No newline at end of file diff --git a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileConsumerThread.java b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileConsumerThread.java new file mode 100644 index 0000000..8c9803a --- /dev/null +++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileConsumerThread.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.file; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; + +import org.apache.pulsar.io.core.PushSource; +import org.apache.pulsar.io.file.utils.GZipFiles; +import org.apache.pulsar.io.file.utils.ZipFiles; + +/** + * Worker thread that consumes the contents of the files + * and publishes them to a Pulsar topic. + */ +public class FileConsumerThread extends Thread { + + private final PushSource<byte[]> source; + private final BlockingQueue<File> workQueue; + private final BlockingQueue<File> inProcess; + private final BlockingQueue<File> recentlyProcessed; + + public FileConsumerThread(PushSource<byte[]> source, + BlockingQueue<File> workQueue, + BlockingQueue<File> inProcess, + BlockingQueue<File> recentlyProcessed) { + this.source = source; + this.workQueue = workQueue; + this.inProcess = inProcess; + this.recentlyProcessed = recentlyProcessed; + } + + public void run() { + try { + while (true) { + File file = workQueue.take(); + + boolean added = false; + do { + added = inProcess.add(file); + } while (!added); + + consumeFile(file); + } + } catch (InterruptedException ie) { + // just terminate + } + } + + private void consumeFile(File file) { + final AtomicInteger idx = new AtomicInteger(1); + try (Stream<String> lines = getLines(file)) { + lines.forEachOrdered(line -> process(file, idx.getAndIncrement(), line)); + } catch (IOException e) { + e.printStackTrace(); + } finally { + + boolean removed = false; + do { + removed = inProcess.remove(file); + } while (!removed); + + boolean added = false; + do { + added = recentlyProcessed.add(file); + } while (!added); + } + } + + private Stream<String> getLines(File file) throws IOException { + if (file == null) { + return null; + } else if (GZipFiles.isGzip(file)) { + return GZipFiles.lines(Paths.get(file.getAbsolutePath())); + } else if (ZipFiles.isZip(file)) { + return ZipFiles.lines(Paths.get(file.getAbsolutePath())); + } else { + return Files.lines(Paths.get(file.getAbsolutePath()), Charset.defaultCharset()); + } + } + + private void process(File srcFile, int lineNumber, String line) { + source.consume(new FileRecord(srcFile, lineNumber, line.getBytes())); + } + +} diff --git a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java new file mode 100644 index 0000000..a13b923 --- /dev/null +++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileListingThread.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.file; + +import java.io.File; +import java.io.FileFilter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.regex.Pattern; + +/** + * Worker thread that checks the configured input directory for + * files that meet the provided filtering criteria, and publishes + * them to a work queue for processing by the FileConsumerThreads. + */ +public class FileListingThread extends Thread { + + private final AtomicLong queueLastUpdated = new AtomicLong(0L); + private final Lock listingLock = new ReentrantLock(); + private final AtomicReference<FileFilter> fileFilterRef = new AtomicReference<>(); + private final BlockingQueue<File> workQueue; + private final BlockingQueue<File> inProcess; + private final BlockingQueue<File> recentlyProcessed; + + private final String inputDir; + private final boolean recurseDirs; + private final boolean keepOriginal; + private final long pollingInterval; + + public FileListingThread(FileSourceConfig fileConfig, + BlockingQueue<File> workQueue, + BlockingQueue<File> inProcess, + BlockingQueue<File> recentlyProcessed) { + this.workQueue = workQueue; + this.inProcess = inProcess; + this.recentlyProcessed = recentlyProcessed; + + inputDir = fileConfig.getInputDirectory(); + recurseDirs = Optional.ofNullable(fileConfig.getRecurse()).orElse(true); + keepOriginal = Optional.ofNullable(fileConfig.getKeepFile()).orElse(false); + pollingInterval = Optional.ofNullable(fileConfig.getPollingInterval()).orElse(10000L); + fileFilterRef.set(createFileFilter(fileConfig)); + } + + public void run() { + while (true) { + if ((queueLastUpdated.get() < System.currentTimeMillis() - pollingInterval) && listingLock.tryLock()) { + try { + final File directory = new File(inputDir); + final Set<File> listing = performListing(directory, fileFilterRef.get(), recurseDirs); + + if (listing != null && !listing.isEmpty()) { + + // Remove any files that have been or are currently being processed. + listing.removeAll(inProcess); + if (!keepOriginal) { + listing.removeAll(recentlyProcessed); + } + + for (File f: listing) { + if (!workQueue.contains(f)) { + workQueue.offer(f); + } + } + queueLastUpdated.set(System.currentTimeMillis()); + } + + } finally { + listingLock.unlock(); + } + } + + try { + sleep(pollingInterval - 1); + } catch (InterruptedException e) { + // Just ignore + } + } + } + + private Set<File> performListing(final File directory, final FileFilter filter, + final boolean recurseSubdirectories) { + Path p = directory.toPath(); + if (!Files.isWritable(p) || !Files.isReadable(p)) { + throw new IllegalStateException("Directory '" + directory + + "' does not have sufficient permissions (i.e., not writable and readable)"); + } + final Set<File> queue = new HashSet<>(); + if (!directory.exists()) { + return queue; + } + + final File[] children = directory.listFiles(); + if (children == null) { + return queue; + } + + for (final File child : children) { + if (child.isDirectory()) { + if (recurseSubdirectories) { + queue.addAll(performListing(child, filter, recurseSubdirectories)); + } + } else if (filter.accept(child)) { + queue.add(child); + } + } + + return queue; + } + + private FileFilter createFileFilter(FileSourceConfig fileConfig) { + final long minSize = Optional.ofNullable(fileConfig.getMinimumSize()).orElse(1); + final Double maxSize = Optional.ofNullable(fileConfig.getMaximumSize()).orElse(Double.MAX_VALUE); + final long minAge = Optional.ofNullable(fileConfig.getMinimumFileAge()).orElse(0); + final Long maxAge = Optional.ofNullable(fileConfig.getMaximumFileAge()).orElse(Long.MAX_VALUE); + final boolean ignoreHidden = Optional.ofNullable(fileConfig.getIgnoreHiddenFiles()).orElse(true); + final Pattern filePattern = Pattern.compile(Optional.ofNullable(fileConfig.getFileFilter()).orElse("[^\\.].*")); + final String indir = fileConfig.getInputDirectory(); + final String pathPatternStr = fileConfig.getPathFilter(); + final Pattern pathPattern = (!recurseDirs || pathPatternStr == null) ? null : Pattern.compile(pathPatternStr); + + return new FileFilter() { + @Override + public boolean accept(final File file) { + if (minSize > file.length()) { + return false; + } + if (maxSize != null && maxSize < file.length()) { + return false; + } + final long fileAge = System.currentTimeMillis() - file.lastModified(); + if (minAge > fileAge) { + return false; + } + if (maxAge != null && maxAge < fileAge) { + return false; + } + if (ignoreHidden && file.isHidden()) { + return false; + } + if (pathPattern != null) { + Path reldir = Paths.get(indir).relativize(file.toPath()).getParent(); + if (reldir != null && !reldir.toString().isEmpty()) { + if (!pathPattern.matcher(reldir.toString()).matches()) { + return false; + } + } + } + //Verify that we have at least read permissions on the file we're considering grabbing + if (!Files.isReadable(file.toPath())) { + return false; + } + + /* Verify that if we're not keeping original that we have write + * permissions on the directory the file is in + */ + if (!keepOriginal && !Files.isWritable(file.toPath().getParent())) { + return false; + } + return filePattern.matcher(file.getName()).matches(); + } + }; + } +} diff --git a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileRecord.java b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileRecord.java new file mode 100644 index 0000000..0fb9448 --- /dev/null +++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileRecord.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.file; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import lombok.Data; + +import org.apache.pulsar.functions.api.Record; + +/** + * Implementation of the Record interface for File Source data. + * - The key is set to the source file name + the line number of the record. + * - The value is set to the file contents for the given line number (in bytes) + * - The following user properties are also set: + * - The source file name + * - The absolute path of the source file + * - The last modified time of the source file. + */ +@Data +public class FileRecord implements Record<byte[]> { + + public static final String FILE_NAME = "file.name"; + public static final String FILE_ABSOLUTE_PATH = "file.path"; + public static final String FILE_MODIFIED_TIME = "file.modified.time"; + + private final Optional<String> key; + private final byte[] value; + private final HashMap<String, String> userProperties = new HashMap<String, String> (); + + public FileRecord(File srcFile, int lineNumber, byte[] value) { + this.key = Optional.of(srcFile.getName() + "_" + lineNumber); + this.value = value; + this.setProperty(FILE_NAME, srcFile.getName()); + this.setProperty(FILE_ABSOLUTE_PATH, srcFile.getAbsolutePath()); + this.setProperty(FILE_MODIFIED_TIME, srcFile.lastModified() + ""); + } + + @Override + public Optional<String> getKey() { + return key; + } + + @Override + public byte[] getValue() { + return value; + } + + public Map<String, String> getProperties() { + return userProperties; + } + + public void setProperty(String key, String value) { + userProperties.put(key, value); + } +} diff --git a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSource.java b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSource.java new file mode 100644 index 0000000..bc09c97 --- /dev/null +++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSource.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.file; + +import java.io.File; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.io.core.PushSource; +import org.apache.pulsar.io.core.SourceContext; + +/** + * A simple connector to consume messages from the local file system. + * It can be configured to consume files recursively from a given + * directory, and can handle plain text, gzip, and zip formatted files. + */ +public class FileSource extends PushSource<byte[]> { + + private ExecutorService executor; + private final BlockingQueue<File> workQueue = new LinkedBlockingQueue<>(); + private final BlockingQueue<File> inProcess = new LinkedBlockingQueue<>(); + private final BlockingQueue<File> recentlyProcessed = new LinkedBlockingQueue<>(); + + @Override + public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception { + FileSourceConfig fileConfig = FileSourceConfig.load(config); + fileConfig.validate(); + + // One extra for the File listing thread, and another for the cleanup thread + executor = Executors.newFixedThreadPool(fileConfig.getNumWorkers() + 2); + executor.execute(new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed)); + executor.execute(new ProcessedFileThread(fileConfig, recentlyProcessed)); + + for (int idx = 0; idx < fileConfig.getNumWorkers(); idx++) { + executor.execute(new FileConsumerThread(this, workQueue, inProcess, recentlyProcessed)); + } + } + + @Override + public void close() throws Exception { + executor.shutdown(); + try { + if (!executor.awaitTermination(800, TimeUnit.MILLISECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException e) { + executor.shutdownNow(); + } + } +} diff --git a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSourceConfig.java b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSourceConfig.java new file mode 100644 index 0000000..efc7f63 --- /dev/null +++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/FileSourceConfig.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.file; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.nio.file.Files; +import java.nio.file.LinkOption; +import java.nio.file.Paths; +import java.util.Map; +import java.util.Optional; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.experimental.Accessors; +import org.apache.commons.lang3.StringUtils; + +/** + * Configuration class for the File Source Connector. + */ +@Data +@Setter +@Getter +@EqualsAndHashCode +@ToString +@Accessors(chain = true) +public class FileSourceConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * The input directory from which to pull files. + */ + private String inputDirectory; + + /** + * Indicates whether or not to pull files from sub-directories. + */ + private Boolean recurse; + + /** + * If true, the file is not deleted after it has been processed and + * causes the file to be picked up continually. + */ + private Boolean keepFile = Boolean.FALSE; + + /** + * Only files whose names match the given regular expression will be picked up. + */ + private String fileFilter = "[^\\.].*"; + + /** + * When 'recurse' property is true, then only sub-directories whose + * path matches the given regular expression will be scanned. + */ + private String pathFilter; + + /** + * The minimum age that a file must be in order to be processed; any file younger + * than this amount of time (according to last modification date) will be ignored. + */ + private Integer minimumFileAge; + + /** + * The maximum age that a file must be in order to be processed; any file older + * than this amount of time (according to last modification date) will be ignored. + */ + private Long maximumFileAge; + + /** + * The minimum size (in bytes) that a file must be in order to be processed. + */ + private Integer minimumSize; + + /** + * The maximum size (in bytes) that a file can be in order to be processed. + */ + private Double maximumSize; + + /** + * Indicates whether or not hidden files should be ignored or not. + */ + private Boolean ignoreHiddenFiles; + + /** + * Indicates how long to wait before performing a directory listing. + */ + private Long pollingInterval; + + /** + * The number of worker threads that will be processing the files. + * This allows you to process a larger number of files concurrently. + * However, setting this to a value greater than 1 will result in the data + * from multiple files being "intermingled" in the target topic. + */ + private Integer numWorkers = 1; + + public static FileSourceConfig load(String yamlFile) throws IOException { + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + return mapper.readValue(new File(yamlFile), FileSourceConfig.class); + } + + public static FileSourceConfig load(Map<String, Object> map) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(new ObjectMapper().writeValueAsString(map), FileSourceConfig.class); + } + + public void validate() { + if (StringUtils.isBlank(inputDirectory)) { + throw new IllegalArgumentException("Required property not set."); + } else if (Files.notExists(Paths.get(inputDirectory), LinkOption.NOFOLLOW_LINKS)) { + throw new IllegalArgumentException("Specified input directory does not exist"); + } else if (!Files.isReadable(Paths.get(inputDirectory))) { + throw new IllegalArgumentException("Specified input directory is not readable"); + } else if (Optional.ofNullable(keepFile).orElse(false) && !Files.isWritable(Paths.get(inputDirectory))) { + throw new IllegalArgumentException("You have requested the consumed files to be deleted, but the " + + "source directory is not writeable."); + } + + if (StringUtils.isNotBlank(fileFilter)) { + try { + Pattern.compile(fileFilter); + } catch (final PatternSyntaxException psEx) { + throw new IllegalArgumentException("Invalid Regex pattern provided for fileFilter"); + } + } + + if (minimumFileAge != null && Math.signum(minimumFileAge) < 0) { + throw new IllegalArgumentException("The property minimumFileAge must be non-negative"); + } + + if (maximumFileAge != null && Math.signum(maximumFileAge) < 0) { + throw new IllegalArgumentException("The property maximumFileAge must be non-negative"); + } + + if (minimumSize != null && Math.signum(minimumSize) < 0) { + throw new IllegalArgumentException("The property minimumSize must be non-negative"); + } + + if (maximumSize != null && Math.signum(maximumSize) < 0) { + throw new IllegalArgumentException("The property maximumSize must be non-negative"); + } + + if (pollingInterval != null && pollingInterval <= 0) { + throw new IllegalArgumentException("The property pollingInterval must be greater than zero"); + } + + if (numWorkers != null && numWorkers <= 0) { + throw new IllegalArgumentException("The property numWorkers must be greater than zero"); + } + } +} \ No newline at end of file diff --git a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/ProcessedFileThread.java b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/ProcessedFileThread.java new file mode 100644 index 0000000..65153e3 --- /dev/null +++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/ProcessedFileThread.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.file; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; + +/** + * Worker thread that cleans up all the files that have been processed. + */ +public class ProcessedFileThread extends Thread { + + private final BlockingQueue<File> recentlyProcessed; + private final boolean keepOriginal; + + public ProcessedFileThread(FileSourceConfig fileConfig, BlockingQueue<File> recentlyProcessed) { + keepOriginal = Optional.ofNullable(fileConfig.getKeepFile()).orElse(false); + this.recentlyProcessed = recentlyProcessed; + } + + public void run() { + try { + while (true) { + File file = recentlyProcessed.take(); + handle(file); + } + } catch (InterruptedException ie) { + // just terminate + } + } + + private void handle(File f) { + if (!keepOriginal) { + try { + Files.deleteIfExists(f.toPath()); + } catch (IOException e) { + e.printStackTrace(); + } + } + } +} diff --git a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/package-info.java b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/package-info.java new file mode 100644 index 0000000..0795343 --- /dev/null +++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.file; \ No newline at end of file diff --git a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/GZipFiles.java b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/GZipFiles.java new file mode 100644 index 0000000..aa38356 --- /dev/null +++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/GZipFiles.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.file.utils; + +import java.io.BufferedReader; +import java.io.Closeable; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.PushbackInputStream; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.stream.Stream; +import java.util.zip.GZIPInputStream; + +import org.apache.commons.io.IOUtils; + +/** + * Helper class that provides helper methods for working with + * gzip-formatted files. + */ +public class GZipFiles { + + /** + * Returns true if the given file is a gzip file. + */ + @SuppressWarnings("deprecation") + public static boolean isGzip(File f) { + + InputStream input = null; + try { + input = new FileInputStream(f); + PushbackInputStream pb = new PushbackInputStream(input, 2); + byte [] signature = new byte[2]; + int len = pb.read(signature); //read the signature + pb.unread(signature, 0, len); //push back the signature to the stream + // check if matches standard gzip magic number + return (signature[ 0 ] == (byte) 0x1f && signature[1] == (byte) 0x8b); + } catch (final Exception e) { + return false; + } finally { + IOUtils.closeQuietly(input); + } + } + + /** + * Get a lazily loaded stream of lines from a gzipped file, similar to + * {@link Files#lines(java.nio.file.Path)}. + * + * @param path + * The path to the gzipped file. + * @return stream with lines. + */ + public static Stream<String> lines(Path path) { + GZIPInputStream gzipStream = null; + + try { + gzipStream = new GZIPInputStream(Files.newInputStream(path)); + } catch (IOException e) { + closeSafely(gzipStream); + throw new UncheckedIOException(e); + } + + BufferedReader reader = new BufferedReader(new InputStreamReader(gzipStream)); + return reader.lines().onClose(() -> closeSafely(reader)); + } + + private static void closeSafely(Closeable closeable) { + if (closeable != null) { + try { + closeable.close(); + } catch (IOException e) { + // Ignore + } + } + } +} diff --git a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/ZipFiles.java b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/ZipFiles.java new file mode 100644 index 0000000..09cd0c2 --- /dev/null +++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/ZipFiles.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.file.utils; + +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.Closeable; +import java.io.DataInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.stream.Stream; +import java.util.zip.ZipInputStream; + +import org.apache.commons.io.IOUtils; + +/** + * Helper class that provides helper methods for working with + * zip-formatted files. + */ +public class ZipFiles { + + /** + * Returns true if the given file is a gzip file. + */ + @SuppressWarnings("deprecation") + public static boolean isZip(File f) { + + InputStream input = null; + try { + DataInputStream in = new DataInputStream(new BufferedInputStream(new FileInputStream(f))); + int test = in.readInt(); + in.close(); + return test == 0x504b0304; + } catch (final Exception e) { + return false; + } finally { + IOUtils.closeQuietly(input); + } + } + + /** + * Get a lazily loaded stream of lines from a gzipped file, similar to + * {@link Files#lines(java.nio.file.Path)}. + * + * @param path + * The path to the zipped file. + * @return stream with lines. + */ + public static Stream<String> lines(Path path) { + ZipInputStream zipStream = null; + + try { + zipStream = new ZipInputStream(Files.newInputStream(path)); + } catch (IOException e) { + closeSafely(zipStream); + throw new UncheckedIOException(e); + } + // Reader decoder = new InputStreamReader(gzipStream, Charset.defaultCharset()); + BufferedReader reader = new BufferedReader(new InputStreamReader(zipStream)); + return reader.lines().onClose(() -> closeSafely(reader)); + } + + private static void closeSafely(Closeable closeable) { + if (closeable != null) { + try { + closeable.close(); + } catch (IOException e) { + // Ignore + } + } + } +} diff --git a/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/package-info.java b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/package-info.java new file mode 100644 index 0000000..2b21e91 --- /dev/null +++ b/pulsar-io/file/src/main/java/org/apache/pulsar/io/file/utils/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.file.utils; \ No newline at end of file diff --git a/pulsar-io/file/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/file/src/main/resources/META-INF/services/pulsar-io.yaml new file mode 100644 index 0000000..df455c2 --- /dev/null +++ b/pulsar-io/file/src/main/resources/META-INF/services/pulsar-io.yaml @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +name: file +description: Reads data from local filesystem +sourceClass: org.apache.pulsar.io.file.FileSource diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTests.java new file mode 100644 index 0000000..5934151 --- /dev/null +++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/AbstractFileTests.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.file; + +import java.io.File; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.LinkOption; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.nio.file.attribute.FileAttribute; +import java.nio.file.attribute.PosixFilePermission; +import java.nio.file.attribute.PosixFilePermissions; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.mockito.Mockito; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; + +public abstract class AbstractFileTests { + + public static final String TMP_DIR = "/tmp/foo"; + + protected BlockingQueue<File> workQueue; + protected BlockingQueue<File> inProcess; + protected BlockingQueue<File> recentlyProcessed; + protected BlockingQueue<File> producedFiles; + + protected TestFileGenerator generatorThread; + protected FileListingThread listingThread; + protected ExecutorService executor; + + @BeforeMethod + public void init() throws IOException { + + // Create the directory we are going to read from + Path directory = Paths.get(TMP_DIR); + + if (!Files.exists(directory, LinkOption.NOFOLLOW_LINKS)) { + Files.createDirectory(directory, getPermissions()); + } + + workQueue = Mockito.spy(new LinkedBlockingQueue<>()); + inProcess = Mockito.spy(new LinkedBlockingQueue<>()); + recentlyProcessed = Mockito.spy(new LinkedBlockingQueue<>()); + producedFiles = Mockito.spy(new LinkedBlockingQueue<>()); + executor = Executors.newFixedThreadPool(10); + } + + @AfterMethod + public void tearDown() throws Exception { + // Shutdown all of the processing threads + stopThreads(); + + // Delete the directory and all the files + cleanUp(); + } + + protected static final void cleanUp() throws IOException { + Path directory = Paths.get(TMP_DIR); + + if (!Files.exists(directory, LinkOption.NOFOLLOW_LINKS)) { + return; + } + + Files.walkFileTree(directory, new SimpleFileVisitor<Path>() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + Files.delete(file); + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { + Files.delete(dir); + return FileVisitResult.CONTINUE; + } + }); + } + + protected void stopThreads() throws Exception { + executor.shutdown(); + try { + if (!executor.awaitTermination(800, TimeUnit.MILLISECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException e) { + executor.shutdownNow(); + } + } + + protected final void generateFiles(int numFiles) throws IOException, InterruptedException, ExecutionException { + generateFiles(numFiles, 1, TMP_DIR); + } + + protected final void generateFiles(int numFiles, int numLines) throws IOException, InterruptedException, ExecutionException { + generateFiles(numFiles, numLines, TMP_DIR); + } + + protected final void generateFiles(int numFiles, int numLines, String directory) throws IOException, InterruptedException, ExecutionException { + generatorThread = new TestFileGenerator(producedFiles, numFiles, 1, numLines, directory, "prefix", ".txt", getPermissions()); + Future<?> f = executor.submit(generatorThread); + f.get(); + } + + protected static final FileAttribute<Set<PosixFilePermission>> getPermissions() { + Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rwxrwxrwx"); + return PosixFilePermissions.asFileAttribute(perms); + } + +} diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java new file mode 100644 index 0000000..a21633d --- /dev/null +++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileConsumerThreadTests.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.file; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.testng.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.PushSource; +import org.mockito.Mockito; +import org.testng.annotations.Test; + +@SuppressWarnings("unchecked") +public class FileConsumerThreadTests extends AbstractFileTests { + + private PushSource<byte[]> consumer; + private FileConsumerThread consumerThread; + + @Test + public final void singleFileTest() throws IOException { + + consumer = Mockito.mock(PushSource.class); + Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class)); + + Map<String, Object> map = new HashMap<String, Object> (); + map.put("inputDirectory", TMP_DIR); + + try { + generateFiles(1); + listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); + executor.execute(listingThread); + executor.execute(consumerThread); + Thread.sleep(2000); + + for (File produced : producedFiles) { + verify(workQueue, times(1)).offer(produced); + verify(inProcess, times(1)).add(produced); + verify(inProcess, times(1)).remove(produced); + verify(recentlyProcessed, times(1)).add(produced); + } + + verify(workQueue, times(1)).offer(any(File.class)); + verify(workQueue, atLeast(1)).take(); + verify(inProcess, times(1)).add(any(File.class)); + verify(inProcess, times(1)).remove(any(File.class)); + verify(recentlyProcessed, times(1)).add(any(File.class)); + verify(consumer, times(1)).consume((Record<byte[]>) any(Record.class)); + } catch (InterruptedException | ExecutionException e) { + fail("Unable to generate files" + e.getLocalizedMessage()); + } + } + + @Test + public final void mulitpleFileTest() throws IOException { + + consumer = Mockito.mock(PushSource.class); + Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class)); + + Map<String, Object> map = new HashMap<String, Object> (); + map.put("inputDirectory", TMP_DIR); + + try { + generateFiles(50, 2); + listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); + executor.execute(listingThread); + executor.execute(consumerThread); + Thread.sleep(2000); + + for (File produced : producedFiles) { + verify(workQueue, times(1)).offer(produced); + verify(inProcess, times(1)).add(produced); + verify(inProcess, times(1)).remove(produced); + verify(recentlyProcessed, times(1)).add(produced); + } + + verify(workQueue, times(50)).offer(any(File.class)); + verify(workQueue, atLeast(50)).take(); + verify(inProcess, times(50)).add(any(File.class)); + verify(inProcess, times(50)).remove(any(File.class)); + verify(recentlyProcessed, times(50)).add(any(File.class)); + verify(consumer, times(100)).consume((Record<byte[]>) any(Record.class)); + } catch (InterruptedException | ExecutionException e) { + fail("Unable to generate files" + e.getLocalizedMessage()); + } + } + + @Test + public final void multiLineFileTest() throws IOException { + + consumer = Mockito.mock(PushSource.class); + Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class)); + + Map<String, Object> map = new HashMap<String, Object> (); + map.put("inputDirectory", TMP_DIR); + + try { + generateFiles(1, 10); + listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); + executor.execute(listingThread); + executor.execute(consumerThread); + Thread.sleep(2000); + + for (File produced : producedFiles) { + verify(workQueue, times(1)).offer(produced); + verify(inProcess, times(1)).add(produced); + verify(inProcess, times(1)).remove(produced); + verify(recentlyProcessed, times(1)).add(produced); + } + + verify(workQueue, times(1)).offer(any(File.class)); + verify(workQueue, atLeast(1)).take(); + verify(inProcess, times(1)).add(any(File.class)); + verify(inProcess, times(1)).remove(any(File.class)); + verify(recentlyProcessed, times(1)).add(any(File.class)); + verify(consumer, times(10)).consume((Record<byte[]>) any(Record.class)); + } catch (InterruptedException | ExecutionException e) { + fail("Unable to generate files" + e.getLocalizedMessage()); + } + } +} diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java new file mode 100644 index 0000000..855498e --- /dev/null +++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileListingThreadTests.java @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.file; + +import static org.testng.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import org.testng.annotations.Test; + + +public class FileListingThreadTests extends AbstractFileTests { + + @Test + public final void singleFileTest() throws IOException { + + Map<String, Object> map = new HashMap<String, Object> (); + map.put("inputDirectory", TMP_DIR); + + try { + generateFiles(1); + listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingThread); + Thread.sleep(2000); + verify(producedFiles, times(1)).put(any(File.class)); + verify(workQueue, times(1)).offer(any(File.class)); + + for (File produced : producedFiles) { + verify(workQueue, times(1)).offer(produced); + } + + } catch (InterruptedException | ExecutionException e) { + fail("Unable to generate files" + e.getLocalizedMessage()); + } + } + + @Test + public final void fiftyFileTest() throws IOException { + + Map<String, Object> map = new HashMap<String, Object> (); + map.put("inputDirectory", TMP_DIR); + + try { + generateFiles(50); + listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingThread); + Thread.sleep(2000); + verify(workQueue, times(50)).offer(any(File.class)); + + for (File produced : producedFiles) { + verify(workQueue, times(1)).offer(produced); + } + + } catch (InterruptedException | ExecutionException e) { + fail("Unable to generate files" + e.getLocalizedMessage()); + } + } + + @Test + public final void minimumSizeTest() throws IOException { + + Map<String, Object> map = new HashMap<String, Object> (); + map.put("inputDirectory", TMP_DIR); + + try { + // Create 50 zero size files + generateFiles(50, 0); + listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingThread); + Thread.sleep(2000); + verify(workQueue, times(0)).offer(any(File.class)); + } catch (InterruptedException | ExecutionException e) { + fail("Unable to generate files" + e.getLocalizedMessage()); + } + } + + @Test + public final void maximumSizeTest() throws IOException { + + Map<String, Object> map = new HashMap<String, Object> (); + map.put("inputDirectory", TMP_DIR); + map.put("maximumSize", "1000"); + + try { + // Create 5 files that exceed the limit and 45 that don't + generateFiles(5, 1000); + generateFiles(45, 10); + listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingThread); + Thread.sleep(2000); + verify(workQueue, times(45)).offer(any(File.class)); + } catch (InterruptedException | ExecutionException e) { + fail("Unable to generate files" + e.getLocalizedMessage()); + } finally { + cleanUp(); + } + } + + @Test + public final void minimumAgeTest() throws IOException { + + Map<String, Object> map = new HashMap<String, Object> (); + map.put("inputDirectory", TMP_DIR); + map.put("minimumFileAge", "5000"); + + try { + // Create 5 files that will be too "new" for processing + generateFiles(5); + listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingThread); + Thread.sleep(2000); + verify(workQueue, times(0)).offer(any(File.class)); + } catch (InterruptedException | ExecutionException e) { + fail("Unable to generate files" + e.getLocalizedMessage()); + } finally { + cleanUp(); + } + } + + @Test + public final void maximumAgeTest() throws IOException { + + Map<String, Object> map = new HashMap<String, Object> (); + map.put("inputDirectory", TMP_DIR); + map.put("maximumFileAge", "5000"); + + try { + // Create 5 files that will be processed + generateFiles(5); + Thread.sleep(5000); + + // Create 5 files that will be too "old" for processing + generateFiles(5); + listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingThread); + Thread.sleep(2000); + verify(workQueue, times(5)).offer(any(File.class)); + } catch (InterruptedException | ExecutionException e) { + fail("Unable to generate files" + e.getLocalizedMessage()); + } finally { + cleanUp(); + } + } + + @Test + public final void doRecurseTest() throws IOException { + + Map<String, Object> map = new HashMap<String, Object> (); + map.put("inputDirectory", TMP_DIR); + map.put("recurse", Boolean.TRUE); + + try { + // Create 5 files in the root folder + generateFiles(5); + + // Create 5 files in a sub-folder + generateFiles(5, 1, TMP_DIR + File.separator + "sub-dir"); + listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingThread); + Thread.sleep(2000); + verify(workQueue, times(10)).offer(any(File.class)); + } catch (InterruptedException | ExecutionException e) { + fail("Unable to generate files" + e.getLocalizedMessage()); + } finally { + cleanUp(); + } + } + + @Test + public final void doNotRecurseTest() throws IOException { + + Map<String, Object> map = new HashMap<String, Object> (); + map.put("inputDirectory", TMP_DIR); + map.put("recurse", Boolean.FALSE); + + try { + // Create 5 files in the root folder + generateFiles(5); + + // Create 5 files in a sub-folder + generateFiles(5, 1, TMP_DIR + File.separator + "sub-dir"); + listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingThread); + Thread.sleep(2000); + verify(workQueue, times(5)).offer(any(File.class)); + } catch (InterruptedException | ExecutionException e) { + fail("Unable to generate files" + e.getLocalizedMessage()); + } finally { + cleanUp(); + } + } + + @Test + public final void pathFilterTest() throws IOException { + + Map<String, Object> map = new HashMap<String, Object> (); + map.put("inputDirectory", TMP_DIR); + map.put("recurse", Boolean.TRUE); + map.put("pathFilter", "sub-.*"); + + try { + // Create 5 files in a sub-folder + generateFiles(5, 1, TMP_DIR + File.separator + "sub-dir-a"); + generateFiles(5, 1, TMP_DIR + File.separator + "dir-b"); + listingThread = new FileListingThread(FileSourceConfig.load(map), workQueue, inProcess, recentlyProcessed); + executor.execute(listingThread); + Thread.sleep(2000); + verify(workQueue, times(5)).offer(any(File.class)); + } catch (InterruptedException | ExecutionException e) { + fail("Unable to generate files" + e.getLocalizedMessage()); + } finally { + cleanUp(); + } + } +} diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileSourceConfigTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileSourceConfigTests.java new file mode 100644 index 0000000..64144e6 --- /dev/null +++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/FileSourceConfigTests.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.file; + +import static org.testng.Assert.assertNotNull; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.testng.annotations.Test; + +public class FileSourceConfigTests { + + @Test + public final void loadFromYamlFileTest() throws IOException { + File yamlFile = getFile("sinkConfig.yaml"); + FileSourceConfig config = FileSourceConfig.load(yamlFile.getAbsolutePath()); + assertNotNull(config); + } + + @Test + public final void loadFromMapTest() throws IOException { + Map<String, Object> map = new HashMap<String, Object> (); + map.put("inputDirectory", "/tmp"); + map.put("keepFile", false); + + FileSourceConfig config = FileSourceConfig.load(map); + assertNotNull(config); + } + + @Test + public final void validValidateTest() throws IOException { + Map<String, Object> map = new HashMap<String, Object> (); + map.put("inputDirectory", "/tmp"); + + FileSourceConfig config = FileSourceConfig.load(map); + assertNotNull(config); + config.validate(); + } + + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "Required property not set.") + public final void missingRequiredPropertiesTest() throws IOException { + Map<String, Object> map = new HashMap<String, Object> (); + map.put("pathFilter", "/"); + + FileSourceConfig config = FileSourceConfig.load(map); + assertNotNull(config); + config.validate(); + } + + @Test(expectedExceptions = com.fasterxml.jackson.databind.exc.InvalidFormatException.class) + public final void InvalidBooleanPropertyTest() throws IOException { + Map<String, Object> map = new HashMap<String, Object> (); + map.put("inputDirectory", "/"); + map.put("recurse", "not a boolean"); + + FileSourceConfig config = FileSourceConfig.load(map); + assertNotNull(config); + config.validate(); + } + + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "The property pollingInterval must be greater than zero") + public final void ZeroValueTest() throws IOException { + Map<String, Object> map = new HashMap<String, Object> (); + map.put("inputDirectory", "/"); + map.put("pollingInterval", 0); + + FileSourceConfig config = FileSourceConfig.load(map); + assertNotNull(config); + config.validate(); + } + + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "The property minimumFileAge must be non-negative") + public final void NegativeValueTest() throws IOException { + Map<String, Object> map = new HashMap<String, Object> (); + map.put("inputDirectory", "/"); + map.put("minimumFileAge", "-50"); + + FileSourceConfig config = FileSourceConfig.load(map); + assertNotNull(config); + config.validate(); + } + + @Test(expectedExceptions = IllegalArgumentException.class, + expectedExceptionsMessageRegExp = "Invalid Regex pattern provided for fileFilter") + public final void invalidFileFilterTest() throws IOException { + Map<String, Object> map = new HashMap<String, Object> (); + map.put("inputDirectory", "/"); + map.put("fileFilter", "\\"); // Results in a single '\' being sent. + + FileSourceConfig config = FileSourceConfig.load(map); + assertNotNull(config); + config.validate(); + } + + private File getFile(String name) { + ClassLoader classLoader = getClass().getClassLoader(); + return new File(classLoader.getResource(name).getFile()); + } +} diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java new file mode 100644 index 0000000..98d35a9 --- /dev/null +++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/ProcessedFileThreadTests.java @@ -0,0 +1,259 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.file; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.testng.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.PushSource; +import org.mockito.Mockito; +import org.testng.annotations.Test; + +@SuppressWarnings("unchecked") +public class ProcessedFileThreadTests extends AbstractFileTests { + + private PushSource<byte[]> consumer; + private FileConsumerThread consumerThread; + private ProcessedFileThread cleanupThread; + private FileSourceConfig fileConfig; + + @Test + public final void singleFileTest() throws IOException { + + consumer = Mockito.mock(PushSource.class); + Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class)); + + Map<String, Object> map = new HashMap<String, Object> (); + map.put("inputDirectory", TMP_DIR); + map.put("keepFile", Boolean.FALSE); + + try { + generateFiles(1); + fileConfig = FileSourceConfig.load(map); + listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed); + consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); + cleanupThread = new ProcessedFileThread(fileConfig, recentlyProcessed); + executor.execute(listingThread); + executor.execute(consumerThread); + executor.execute(cleanupThread); + Thread.sleep(2000); + + for (File produced : producedFiles) { + verify(workQueue, times(1)).offer(produced); + verify(inProcess, times(1)).add(produced); + verify(inProcess, times(1)).remove(produced); + verify(recentlyProcessed, times(1)).add(produced); + } + + verify(workQueue, times(1)).offer(any(File.class)); + verify(workQueue, atLeast(1)).take(); + verify(inProcess, times(1)).add(any(File.class)); + verify(inProcess, times(1)).remove(any(File.class)); + verify(recentlyProcessed, times(1)).add(any(File.class)); + verify(recentlyProcessed, times(2)).take(); + } catch (InterruptedException | ExecutionException e) { + fail("Unable to generate files" + e.getLocalizedMessage()); + } + } + + @Test + public final void mulitpleFileTest() throws IOException { + + consumer = Mockito.mock(PushSource.class); + Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class)); + + Map<String, Object> map = new HashMap<String, Object> (); + map.put("inputDirectory", TMP_DIR); + map.put("keepFile", Boolean.FALSE); + + try { + generateFiles(50); + fileConfig = FileSourceConfig.load(map); + listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed); + consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); + cleanupThread = new ProcessedFileThread(fileConfig, recentlyProcessed); + executor.execute(listingThread); + executor.execute(consumerThread); + executor.execute(cleanupThread); + Thread.sleep(2000); + + for (File produced : producedFiles) { + verify(workQueue, times(1)).offer(produced); + verify(inProcess, times(1)).add(produced); + verify(inProcess, times(1)).remove(produced); + verify(recentlyProcessed, times(1)).add(produced); + } + + verify(workQueue, times(50)).offer(any(File.class)); + verify(workQueue, atLeast(50)).take(); + verify(inProcess, times(50)).add(any(File.class)); + verify(inProcess, times(50)).remove(any(File.class)); + verify(recentlyProcessed, times(50)).add(any(File.class)); + verify(recentlyProcessed, times(50)).add(any(File.class)); + verify(recentlyProcessed, times(51)).take(); + } catch (InterruptedException | ExecutionException e) { + fail("Unable to generate files" + e.getLocalizedMessage()); + } + } + + @Test + public final void keepFileTest() throws IOException { + + consumer = Mockito.mock(PushSource.class); + Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class)); + + Map<String, Object> map = new HashMap<String, Object> (); + map.put("inputDirectory", TMP_DIR); + map.put("keepFile", Boolean.TRUE); + map.put("pollingInterval", 1000L); + + try { + generateFiles(1); + fileConfig = FileSourceConfig.load(map); + listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed); + consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); + cleanupThread = new ProcessedFileThread(fileConfig, recentlyProcessed); + executor.execute(listingThread); + executor.execute(consumerThread); + executor.execute(cleanupThread); + Thread.sleep(7900); // Should pull the same file 5 times? + + for (File produced : producedFiles) { + verify(workQueue, atLeast(4)).offer(produced); + verify(inProcess, atLeast(4)).add(produced); + verify(inProcess, atLeast(4)).remove(produced); + verify(recentlyProcessed, atLeast(4)).add(produced); + } + + verify(recentlyProcessed, atLeast(5)).take(); + } catch (InterruptedException | ExecutionException e) { + fail("Unable to generate files" + e.getLocalizedMessage()); + } + } + + @Test + public final void continuousRunTest() throws IOException { + + consumer = Mockito.mock(PushSource.class); + Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class)); + + Map<String, Object> map = new HashMap<String, Object> (); + map.put("inputDirectory", TMP_DIR); + map.put("keepFile", Boolean.FALSE); + map.put("pollingInterval", 100); + fileConfig = FileSourceConfig.load(map); + + try { + // Start producing files, with a .1 sec delay between + generatorThread = new TestFileGenerator(producedFiles, 5000, 100, 1, TMP_DIR, "continuous", ".txt", getPermissions()); + executor.execute(generatorThread); + + listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed); + consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); + cleanupThread = new ProcessedFileThread(fileConfig, recentlyProcessed); + executor.execute(listingThread); + executor.execute(consumerThread); + executor.execute(cleanupThread); + + // Run for 30 seconds + Thread.sleep(30000); + + // Stop producing files + generatorThread.halt(); + + // Let the consumer catch up + while (!workQueue.isEmpty() && !inProcess.isEmpty() && !recentlyProcessed.isEmpty()) { + Thread.sleep(2000); + } + + // Make sure every single file was processed. + for (File produced : producedFiles) { + verify(workQueue, times(1)).offer(produced); + verify(inProcess, times(1)).add(produced); + verify(inProcess, times(1)).remove(produced); + verify(recentlyProcessed, times(1)).add(produced); + } + + } catch (InterruptedException e) { + fail("Unable to generate files" + e.getLocalizedMessage()); + } + } + + @Test + public final void multipleConsumerTest() throws IOException { + + consumer = Mockito.mock(PushSource.class); + Mockito.doNothing().when(consumer).consume((Record<byte[]>) any(Record.class)); + + Map<String, Object> map = new HashMap<String, Object> (); + map.put("inputDirectory", TMP_DIR); + map.put("keepFile", Boolean.FALSE); + map.put("pollingInterval", 100); + fileConfig = FileSourceConfig.load(map); + + try { + // Start producing files, with a .1 sec delay between + generatorThread = new TestFileGenerator(producedFiles, 5000, 100, 1, TMP_DIR, "continuous", ".txt", getPermissions()); + executor.execute(generatorThread); + + listingThread = new FileListingThread(fileConfig, workQueue, inProcess, recentlyProcessed); + consumerThread = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); + FileConsumerThread consumerThread2 = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); + FileConsumerThread consumerThread3 = new FileConsumerThread(consumer, workQueue, inProcess, recentlyProcessed); + cleanupThread = new ProcessedFileThread(fileConfig, recentlyProcessed); + executor.execute(listingThread); + executor.execute(consumerThread); + executor.execute(consumerThread2); + executor.execute(consumerThread3); + executor.execute(cleanupThread); + + // Run for 30 seconds + Thread.sleep(30000); + + // Stop producing files + generatorThread.halt(); + + // Let the consumer catch up + while (!workQueue.isEmpty() && !inProcess.isEmpty() && !recentlyProcessed.isEmpty()) { + Thread.sleep(2000); + } + + // Make sure every single file was processed exactly once. + for (File produced : producedFiles) { + verify(workQueue, times(1)).offer(produced); + verify(inProcess, times(1)).add(produced); + verify(inProcess, times(1)).remove(produced); + verify(recentlyProcessed, times(1)).add(produced); + } + + } catch (InterruptedException e) { + fail("Unable to generate files" + e.getLocalizedMessage()); + } + } +} diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/TestFileGenerator.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/TestFileGenerator.java new file mode 100644 index 0000000..21ad852 --- /dev/null +++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/TestFileGenerator.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.file; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.nio.file.attribute.FileAttribute; +import java.util.concurrent.BlockingQueue; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.RandomStringUtils; + +public class TestFileGenerator extends Thread { + + // Allows us to communicate back which files we generated + private final BlockingQueue<File> producedFiles; + private final int numFiles; + private final long delay; + private final int numLines; + private final String prefix; + private final String suffix; + private final FileAttribute<?>[] attrs; + private final Path tempDir; + private boolean keepRunning = true; + + public TestFileGenerator(BlockingQueue<File> producedFiles, int numFiles, long delay, int numLines, + String dir, String prefix, String suffix, FileAttribute<?>... attrs) throws IOException { + this.numFiles = numFiles; + this.delay = delay; + this.numLines = numLines; + this.producedFiles = producedFiles; + this.prefix = prefix; + this.suffix = suffix; + this.attrs = attrs; + tempDir = Files.createDirectories(Paths.get(dir), attrs); + } + + public void run() { + int counter = 0; + while ( keepRunning && (counter++ < numFiles)) { + createFile(); + try { + sleep(delay); + } catch (InterruptedException e) { + return; + } + } + } + + public void halt() { + keepRunning = false; + } + + private final void createFile() { + try { + Path path = Files.createTempFile(tempDir, prefix, suffix, attrs); + try(OutputStream out = Files.newOutputStream(path, StandardOpenOption.APPEND)) { + for (int idx = 0; idx < numLines; idx++) { + IOUtils.write(RandomStringUtils.random(50, true, false) + "\n", out, "UTF-8"); + } + } + + producedFiles.put(path.toFile()); + + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + } + } + +} diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/utils/GZipFilesTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/utils/GZipFilesTests.java new file mode 100644 index 0000000..b08bb00 --- /dev/null +++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/utils/GZipFilesTests.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.file.utils; + +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import java.io.File; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.stream.Stream; + +import org.testng.annotations.Test; + +public class GZipFilesTests { + + @Test + public final void validGzipFileTest() { + assertTrue(GZipFiles.isGzip(getFile("org/apache/pulsar/io/file/validGzip.gz"))); + } + + @Test + public final void nonGzipFileTest() { + assertFalse(GZipFiles.isGzip(getFile("org/apache/pulsar/io/file/nonGzipFile.txt"))); + } + + @Test + public final void mislabelledGzipFileTest() { + assertFalse(GZipFiles.isGzip(getFile("org/apache/pulsar/io/file/mislabelled.gz"))); + } + + @Test + public final void nonExistantGzipFileTest() { + assertFalse(GZipFiles.isGzip(null)); + } + + @Test + public final void streamGzipFileTest() { + Path path = Paths.get(getFile("org/apache/pulsar/io/file/validGzip.gz").getAbsolutePath(), ""); + + try (Stream<String> lines = GZipFiles.lines(path)) { + lines.forEachOrdered(line -> assertTrue(line.startsWith("Line "))); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private File getFile(String name) { + ClassLoader classLoader = getClass().getClassLoader(); + return new File(classLoader.getResource(name).getFile()); + } +} diff --git a/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/utils/ZipFilesTests.java b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/utils/ZipFilesTests.java new file mode 100644 index 0000000..2fc7286 --- /dev/null +++ b/pulsar-io/file/src/test/java/org/apache/pulsar/io/file/utils/ZipFilesTests.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.file.utils; + +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import java.io.File; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.stream.Stream; + +import org.testng.annotations.Test; + +public class ZipFilesTests { + + @Test + public final void validZipFileTest() { + assertTrue(ZipFiles.isZip(getFile("org/apache/pulsar/io/file/validZip.zip"))); + } + + @Test + public final void nonZipFileTest() { + assertFalse(ZipFiles.isZip(getFile("org/apache/pulsar/io/file/nonGzipFile.txt"))); + } + + @Test + public final void mislabelledZipFileTest() { + assertFalse(ZipFiles.isZip(getFile("org/apache/pulsar/io/file/mislabelled.gz"))); + } + + @Test + public final void nonExistantGzipFileTest() { + assertFalse(ZipFiles.isZip(null)); + } + + @Test + public final void streamZipFileTest() { + Path path = Paths.get(getFile("org/apache/pulsar/io/file/validZip.zip").getAbsolutePath(), ""); + + try (Stream<String> lines = ZipFiles.lines(path)) { + lines.forEachOrdered(line -> assertTrue(line.startsWith("Line "))); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private File getFile(String name) { + ClassLoader classLoader = getClass().getClassLoader(); + return new File(classLoader.getResource(name).getFile()); + } +} diff --git a/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/mislabelled.gz b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/mislabelled.gz new file mode 100644 index 0000000..529587e --- /dev/null +++ b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/mislabelled.gz @@ -0,0 +1 @@ +This file isn't gzipped. \ No newline at end of file diff --git a/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/nonGzipFile.txt b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/nonGzipFile.txt new file mode 100644 index 0000000..fbd35c6 --- /dev/null +++ b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/nonGzipFile.txt @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +This file is not gzipped \ No newline at end of file diff --git a/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/validGzip.gz b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/validGzip.gz new file mode 100644 index 0000000..f7d098d Binary files /dev/null and b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/validGzip.gz differ diff --git a/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/validZip.zip b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/validZip.zip new file mode 100644 index 0000000..55c28e3 Binary files /dev/null and b/pulsar-io/file/src/test/resources/org/apache/pulsar/io/file/validZip.zip differ diff --git a/pulsar-io/file/src/test/resources/sinkConfig.yaml b/pulsar-io/file/src/test/resources/sinkConfig.yaml new file mode 100644 index 0000000..554b0f9 --- /dev/null +++ b/pulsar-io/file/src/test/resources/sinkConfig.yaml @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +{ + "inputDirectory": "/Users/david", + "recurse": true, + "keepFile": true, + "fileFilter": "[^\\.].*", + "pathFilter": "*", + "minimumFileAge": 0, + "maximumFileAge": 9999999999, + "minimumSize": 1, + "maximumSize": 5000000, + "ignoreHiddenFiles": true, + "pollingInterval": 5000, + "numWorkers": 1 +} \ No newline at end of file diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml index 6503b66..628686b 100644 --- a/pulsar-io/pom.xml +++ b/pulsar-io/pom.xml @@ -48,6 +48,7 @@ <module>debezium</module> <module>hdfs2</module> <module>canal</module> + <module>file</module> <module>netty</module> </modules>