This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 3216159 Support aws s3 with Parquet in pinot-tools (#4556) 3216159 is described below commit 3216159261d7bbca28beba9dd3458520f9b40af5 Author: Xiang Fu <fx19880...@gmail.com> AuthorDate: Fri Aug 30 10:54:50 2019 -0700 Support aws s3 with Parquet in pinot-tools (#4556) * Support aws s3 with Parquet in pinot-tools * resolve dependency issue * Address comments --- .../apache/pinot/core/data/readers/FileFormat.java | 2 +- pinot-tools/pom.xml | 20 ++++++ .../tools/admin/command/CreateSegmentCommand.java | 74 ++++++++++++++++++---- pom.xml | 5 ++ 4 files changed, 86 insertions(+), 15 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java index 5f7120d..43f8391 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java @@ -19,5 +19,5 @@ package org.apache.pinot.core.data.readers; public enum FileFormat { - AVRO, GZIPPED_AVRO, CSV, JSON, PINOT, THRIFT, OTHER + AVRO, GZIPPED_AVRO, CSV, JSON, PINOT, THRIFT, PARQUET, OTHER } diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml index 55c2ecd..0799c97 100644 --- a/pinot-tools/pom.xml +++ b/pinot-tools/pom.xml @@ -56,6 +56,11 @@ </dependency> <dependency> <groupId>org.apache.pinot</groupId> + <artifactId>pinot-parquet</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.pinot</groupId> <artifactId>pinot-connector-kafka-base</artifactId> <version>${project.version}</version> </dependency> @@ -98,6 +103,21 @@ <groupId>org.apache.commons</groupId> <artifactId>commons-math3</artifactId> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-aws</artifactId> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-annotations</artifactId> + </exclusion> + </exclusions> + </dependency> </dependencies> <build> <plugins> diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java index 95c1371..5ce2c79 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java @@ -19,20 +19,29 @@ package org.apache.pinot.tools.admin.command; import java.io.File; -import java.io.FilenameFilter; import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.pinot.common.data.StarTreeIndexSpec; import org.apache.pinot.common.utils.JsonUtils; import org.apache.pinot.core.data.readers.FileFormat; +import org.apache.pinot.core.data.readers.RecordReader; +import org.apache.pinot.core.data.readers.RecordReaderFactory; import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.parquet.data.readers.ParquetRecordReader; import org.apache.pinot.startree.hll.HllConfig; import org.apache.pinot.startree.hll.HllConstants; import org.apache.pinot.tools.Command; @@ -283,24 +292,22 @@ public class CreateSegmentCommand extends AbstractBaseAdminCommand implements Co } // Filter out all input files. - File dir = new File(_dataDir); - if (!dir.exists() || !dir.isDirectory()) { + final Path dataDirPath = new Path(_dataDir); + FileSystem fileSystem = FileSystem.get(URI.create(_dataDir), new Configuration()); + + if (!fileSystem.exists(dataDirPath) || !fileSystem.isDirectory(dataDirPath)) { throw new RuntimeException("Data directory " + _dataDir + " not found."); } - File[] files = dir.listFiles(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return name.toLowerCase().endsWith(_format.toString().toLowerCase()); - } - }); + // Gather all data files + List<Path> dataFilePaths = getDataFilePaths(dataDirPath); - if ((files == null) || (files.length == 0)) { + if ((dataFilePaths == null) || (dataFilePaths.size() == 0)) { throw new RuntimeException( "Data directory " + _dataDir + " does not contain " + _format.toString().toUpperCase() + " files."); } - LOGGER.info("Accepted files: {}", Arrays.toString(files)); + LOGGER.info("Accepted files: {}", Arrays.toString(dataFilePaths.toArray())); // Make sure output directory does not already exist, or can be overwritten. File outDir = new File(_outDir); @@ -362,7 +369,7 @@ public class CreateSegmentCommand extends AbstractBaseAdminCommand implements Co ExecutorService executor = Executors.newFixedThreadPool(_numThreads); int cnt = 0; - for (final File file : files) { + for (final Path dataFilePath : dataFilePaths) { final int segCnt = cnt; executor.execute(new Runnable() { @@ -370,12 +377,24 @@ public class CreateSegmentCommand extends AbstractBaseAdminCommand implements Co public void run() { try { SegmentGeneratorConfig config = new SegmentGeneratorConfig(segmentGeneratorConfig); - config.setInputFilePath(file.getAbsolutePath()); + + String localFile = dataFilePath.getName(); + Path localFilePath = new Path(localFile); + dataDirPath.getFileSystem(new Configuration()).copyToLocalFile(dataFilePath, localFilePath); + config.setInputFilePath(localFile); config.setSegmentName(_segmentName + "_" + segCnt); config.loadConfigFiles(); final SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); - driver.init(config); + switch (config.getFormat()) { + case PARQUET: + RecordReader parquetRecordReader = new ParquetRecordReader(); + parquetRecordReader.init(config); + driver.init(config, parquetRecordReader); + break; + default: + driver.init(config); + } driver.build(); } catch (Exception e) { throw new RuntimeException(e); @@ -388,4 +407,31 @@ public class CreateSegmentCommand extends AbstractBaseAdminCommand implements Co executor.shutdown(); return executor.awaitTermination(1, TimeUnit.HOURS); } + + protected List<Path> getDataFilePaths(Path pathPattern) + throws IOException { + List<Path> tarFilePaths = new ArrayList<>(); + FileSystem fileSystem = FileSystem.get(pathPattern.toUri(), new Configuration()); + getDataFilePathsHelper(fileSystem, fileSystem.globStatus(pathPattern), tarFilePaths); + return tarFilePaths; + } + + protected void getDataFilePathsHelper(FileSystem fileSystem, FileStatus[] fileStatuses, List<Path> tarFilePaths) + throws IOException { + for (FileStatus fileStatus : fileStatuses) { + Path path = fileStatus.getPath(); + if (fileStatus.isDirectory()) { + getDataFilePathsHelper(fileSystem, fileSystem.listStatus(path), tarFilePaths); + } else { + if (isDataFile(path.getName())) { + tarFilePaths.add(path); + } + } + } + } + + protected boolean isDataFile(String fileName) { + return fileName.endsWith(".avro") || fileName.endsWith(".csv") || fileName.endsWith(".json") || fileName + .endsWith(".thrift") || fileName.endsWith(".parquet"); + } } diff --git a/pom.xml b/pom.xml index cc8767c..d770f29 100644 --- a/pom.xml +++ b/pom.xml @@ -657,6 +657,11 @@ </exclusions> </dependency> <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-aws</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> <groupId>org.apache.orc</groupId> <artifactId>orc-core</artifactId> <version>1.5.2</version> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org