This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3216159  Support aws s3 with Parquet in pinot-tools (#4556)
3216159 is described below

commit 3216159261d7bbca28beba9dd3458520f9b40af5
Author: Xiang Fu <fx19880...@gmail.com>
AuthorDate: Fri Aug 30 10:54:50 2019 -0700

    Support aws s3 with Parquet in pinot-tools (#4556)
    
    * Support aws s3 with Parquet in pinot-tools
    
    * resolve dependency issue
    
    * Address comments
---
 .../apache/pinot/core/data/readers/FileFormat.java |  2 +-
 pinot-tools/pom.xml                                | 20 ++++++
 .../tools/admin/command/CreateSegmentCommand.java  | 74 ++++++++++++++++++----
 pom.xml                                            |  5 ++
 4 files changed, 86 insertions(+), 15 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java
index 5f7120d..43f8391 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/FileFormat.java
@@ -19,5 +19,5 @@
 package org.apache.pinot.core.data.readers;
 
 public enum FileFormat {
-  AVRO, GZIPPED_AVRO, CSV, JSON, PINOT, THRIFT, OTHER
+  AVRO, GZIPPED_AVRO, CSV, JSON, PINOT, THRIFT, PARQUET, OTHER
 }
diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml
index 55c2ecd..0799c97 100644
--- a/pinot-tools/pom.xml
+++ b/pinot-tools/pom.xml
@@ -56,6 +56,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-parquet</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
       <artifactId>pinot-connector-kafka-base</artifactId>
       <version>${project.version}</version>
     </dependency>
@@ -98,6 +103,21 @@
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-math3</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-aws</artifactId>
+      <scope>runtime</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-annotations</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
index 95c1371..5ce2c79 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
@@ -19,20 +19,29 @@
 package org.apache.pinot.tools.admin.command;
 
 import java.io.File;
-import java.io.FilenameFilter;
 import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.pinot.common.data.StarTreeIndexSpec;
 import org.apache.pinot.common.utils.JsonUtils;
 import org.apache.pinot.core.data.readers.FileFormat;
+import org.apache.pinot.core.data.readers.RecordReader;
+import org.apache.pinot.core.data.readers.RecordReaderFactory;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.parquet.data.readers.ParquetRecordReader;
 import org.apache.pinot.startree.hll.HllConfig;
 import org.apache.pinot.startree.hll.HllConstants;
 import org.apache.pinot.tools.Command;
@@ -283,24 +292,22 @@ public class CreateSegmentCommand extends 
AbstractBaseAdminCommand implements Co
     }
 
     // Filter out all input files.
-    File dir = new File(_dataDir);
-    if (!dir.exists() || !dir.isDirectory()) {
+    final Path dataDirPath = new Path(_dataDir);
+    FileSystem fileSystem = FileSystem.get(URI.create(_dataDir), new 
Configuration());
+
+    if (!fileSystem.exists(dataDirPath) || 
!fileSystem.isDirectory(dataDirPath)) {
       throw new RuntimeException("Data directory " + _dataDir + " not found.");
     }
 
-    File[] files = dir.listFiles(new FilenameFilter() {
-      @Override
-      public boolean accept(File dir, String name) {
-        return name.toLowerCase().endsWith(_format.toString().toLowerCase());
-      }
-    });
+    // Gather all data files
+    List<Path> dataFilePaths = getDataFilePaths(dataDirPath);
 
-    if ((files == null) || (files.length == 0)) {
+    if ((dataFilePaths == null) || (dataFilePaths.size() == 0)) {
       throw new RuntimeException(
           "Data directory " + _dataDir + " does not contain " + 
_format.toString().toUpperCase() + " files.");
     }
 
-    LOGGER.info("Accepted files: {}", Arrays.toString(files));
+    LOGGER.info("Accepted files: {}", 
Arrays.toString(dataFilePaths.toArray()));
 
     // Make sure output directory does not already exist, or can be 
overwritten.
     File outDir = new File(_outDir);
@@ -362,7 +369,7 @@ public class CreateSegmentCommand extends 
AbstractBaseAdminCommand implements Co
 
     ExecutorService executor = Executors.newFixedThreadPool(_numThreads);
     int cnt = 0;
-    for (final File file : files) {
+    for (final Path dataFilePath : dataFilePaths) {
       final int segCnt = cnt;
 
       executor.execute(new Runnable() {
@@ -370,12 +377,24 @@ public class CreateSegmentCommand extends 
AbstractBaseAdminCommand implements Co
         public void run() {
           try {
             SegmentGeneratorConfig config = new 
SegmentGeneratorConfig(segmentGeneratorConfig);
-            config.setInputFilePath(file.getAbsolutePath());
+
+            String localFile = dataFilePath.getName();
+            Path localFilePath = new Path(localFile);
+            dataDirPath.getFileSystem(new 
Configuration()).copyToLocalFile(dataFilePath, localFilePath);
+            config.setInputFilePath(localFile);
             config.setSegmentName(_segmentName + "_" + segCnt);
             config.loadConfigFiles();
 
             final SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
-            driver.init(config);
+            switch (config.getFormat()) {
+              case PARQUET:
+                RecordReader parquetRecordReader = new ParquetRecordReader();
+                parquetRecordReader.init(config);
+                driver.init(config, parquetRecordReader);
+                break;
+              default:
+                driver.init(config);
+            }
             driver.build();
           } catch (Exception e) {
             throw new RuntimeException(e);
@@ -388,4 +407,31 @@ public class CreateSegmentCommand extends 
AbstractBaseAdminCommand implements Co
     executor.shutdown();
     return executor.awaitTermination(1, TimeUnit.HOURS);
   }
+
+  protected List<Path> getDataFilePaths(Path pathPattern)
+      throws IOException {
+    List<Path> tarFilePaths = new ArrayList<>();
+    FileSystem fileSystem = FileSystem.get(pathPattern.toUri(), new 
Configuration());
+    getDataFilePathsHelper(fileSystem, fileSystem.globStatus(pathPattern), 
tarFilePaths);
+    return tarFilePaths;
+  }
+
+  protected void getDataFilePathsHelper(FileSystem fileSystem, FileStatus[] 
fileStatuses, List<Path> tarFilePaths)
+      throws IOException {
+    for (FileStatus fileStatus : fileStatuses) {
+      Path path = fileStatus.getPath();
+      if (fileStatus.isDirectory()) {
+        getDataFilePathsHelper(fileSystem, fileSystem.listStatus(path), 
tarFilePaths);
+      } else {
+        if (isDataFile(path.getName())) {
+          tarFilePaths.add(path);
+        }
+      }
+    }
+  }
+
+  protected boolean isDataFile(String fileName) {
+    return fileName.endsWith(".avro") || fileName.endsWith(".csv") || 
fileName.endsWith(".json") || fileName
+        .endsWith(".thrift") || fileName.endsWith(".parquet");
+  }
 }
diff --git a/pom.xml b/pom.xml
index cc8767c..d770f29 100644
--- a/pom.xml
+++ b/pom.xml
@@ -657,6 +657,11 @@
         </exclusions>
       </dependency>
       <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-aws</artifactId>
+        <version>${hadoop.version}</version>
+      </dependency>
+      <dependency>
         <groupId>org.apache.orc</groupId>
         <artifactId>orc-core</artifactId>
         <version>1.5.2</version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to