This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch new_segment_creation
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 2930748924b88abbf2564a6884d073fc1aa6e50b
Author: Xiang Fu <fx19880...@gmail.com>
AuthorDate: Tue Sep 17 09:48:49 2019 +0800

    Adding CreateSegmentCommandV2 command
---
 .../generator/SegmentGeneratorConfig.java          |   1 +
 .../pinot/tools/admin/PinotAdministrator.java      |   2 +
 .../admin/command/CreateSegmentCommandV2.java      | 246 +++++++++++++++++++++
 3 files changed, 249 insertions(+)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
index e840c10..49bda62 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
@@ -177,6 +177,7 @@ public class SegmentGeneratorConfig {
     }
 
     IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+    _tableName = tableConfig.getTableName();
     List<String> noDictionaryColumns = indexingConfig.getNoDictionaryColumns();
     Map<String, String> noDictionaryColumnMap = 
indexingConfig.getNoDictionaryConfig();
 
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
index fa94806..e66fe50 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
@@ -28,6 +28,7 @@ import 
org.apache.pinot.tools.admin.command.AvroSchemaToPinotSchema;
 import org.apache.pinot.tools.admin.command.BackfillDateTimeColumnCommand;
 import org.apache.pinot.tools.admin.command.ChangeNumReplicasCommand;
 import org.apache.pinot.tools.admin.command.ChangeTableState;
+import org.apache.pinot.tools.admin.command.CreateSegmentCommandV2;
 import org.apache.pinot.tools.admin.command.CreateSegmentCommand;
 import org.apache.pinot.tools.admin.command.DeleteClusterCommand;
 import org.apache.pinot.tools.admin.command.GenerateDataCommand;
@@ -86,6 +87,7 @@ public class PinotAdministrator {
   @SubCommands({
       @SubCommand(name = "GenerateData", impl = GenerateDataCommand.class),
       @SubCommand(name = "CreateSegment", impl = CreateSegmentCommand.class),
+      @SubCommand(name = "CreateSegmentV2", impl = 
CreateSegmentCommandV2.class),
       @SubCommand(name = "StartZookeeper", impl = StartZookeeperCommand.class),
       @SubCommand(name = "StartKafka", impl = StartKafkaCommand.class),
       @SubCommand(name = "StreamAvroIntoKafka", impl = 
StreamAvroIntoKafkaCommand.class),
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommandV2.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommandV2.java
new file mode 100644
index 0000000..c26f8bc
--- /dev/null
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommandV2.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.admin.command;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.pinot.common.config.TableConfig;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.utils.JsonUtils;
+import org.apache.pinot.core.data.readers.FileFormat;
+import org.apache.pinot.core.data.readers.RecordReader;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.parquet.data.readers.ParquetRecordReader;
+import org.apache.pinot.tools.Command;
+import org.kohsuke.args4j.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class to implement CreateSegment command.
+ *
+ * CreateSegmentCommandV2 reuses configurations from existing table configs 
and table schema, which will
+ * be simpler for users to try out.
+ *
+ */
+public class CreateSegmentCommandV2 extends AbstractBaseAdminCommand 
implements Command {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CreateSegmentCommandV2.class);
+
+  @Option(name = "-schemaFile", metaVar = "<string>", usage = "File containing 
schema for data.", required = true)
+  private String _schemaFile;
+
+  @Option(name = "-tableConfig", metaVar = "<string>", usage = "File 
containing schema for data.", required = true)
+  private String _tableConfigFile;
+
+  @Option(name = "-dataDir", metaVar = "<string>", usage = "Directory 
containing the input data.", required = true)
+  private String _dataDir;
+
+  @Option(name = "-inputFormat", metaVar = "<string>", usage = "Input File 
format (CSV/JSON/AVRO/Thrift/Parquet).")
+  private FileFormat _inputFormat;
+
+  @Option(name = "-outDir", metaVar = "<string>", usage = "Name of output 
directory.", required = true)
+  private String _outDir;
+
+  @Option(name = "-overwrite", usage = "Overwrite existing output directory.")
+  private boolean _overwrite = false;
+
+  @Option(name = "-numThreads", metaVar = "<int>", usage = "Parallelism while 
generating segments, default is 1.")
+  private int _numThreads = 1;
+
+  @SuppressWarnings("FieldCanBeLocal")
+  @Option(name = "-help", help = true, aliases = {"-h", "--h", "--help"}, 
usage = "Print this message.")
+  private boolean _help = false;
+
+  public CreateSegmentCommandV2 setDataDir(String dataDir) {
+    _dataDir = dataDir;
+    return this;
+  }
+
+  public CreateSegmentCommandV2 setOutDir(String outDir) {
+    _outDir = outDir;
+    return this;
+  }
+
+  public CreateSegmentCommandV2 setOverwrite(boolean overwrite) {
+    _overwrite = overwrite;
+    return this;
+  }
+
+  public CreateSegmentCommandV2 setNumThreads(int numThreads) {
+    _numThreads = numThreads;
+    return this;
+  }
+
+  @Override
+  public final String getName() {
+    return "CreateNewSegment";
+  }
+
+  @Override
+  public String description() {
+    return "Create pinot segments from provided avro/csv/json/thrift/parquet 
input data.";
+  }
+
+  @Override
+  public boolean getHelp() {
+    return _help;
+  }
+
+  @Override
+  public String toString() {
+    return ("CreateSegment " + " -dataDir " + _dataDir + " -inputFormat " + 
_inputFormat + " -outDir " + _outDir
+        + " -overwrite " + _overwrite + " -schemaFile " + _schemaFile + " 
-tableConfigFile " + _tableConfigFile
+        + " -numThreads " + _numThreads);
+  }
+
+  @Override
+  public boolean execute()
+      throws Exception {
+    LOGGER.info("Executing command: {}", toString());
+
+    Schema schema = Schema.fromFile(new File(_schemaFile));
+
+    TableConfig tableConfig = 
TableConfig.fromJsonConfig(JsonUtils.fileToJsonNode(new 
File(_tableConfigFile)));
+
+    // Filter out all input files.
+    final Path dataDirPath = new Path(_dataDir);
+    FileSystem fileSystem = FileSystem.get(URI.create(_dataDir), new 
Configuration());
+
+    if (!fileSystem.exists(dataDirPath) || 
!fileSystem.isDirectory(dataDirPath)) {
+      throw new RuntimeException("Data directory " + _dataDir + " not found.");
+    }
+
+    // Gather all data files
+    List<Path> dataFilePaths = getDataFilePaths(dataDirPath);
+
+    if ((dataFilePaths == null) || (dataFilePaths.size() == 0)) {
+      throw new RuntimeException(
+          "Data directory " + _dataDir + " does not contain " + 
_inputFormat.toString().toUpperCase() + " files.");
+    }
+
+    LOGGER.info("Accepted files: {}", 
Arrays.toString(dataFilePaths.toArray()));
+
+    // Make sure output directory does not already exist, or can be 
overwritten.
+    File outDir = new File(_outDir);
+    if (outDir.exists()) {
+      if (!_overwrite) {
+        throw new IOException("Output directory " + _outDir + " already 
exists.");
+      } else {
+        FileUtils.deleteDirectory(outDir);
+      }
+    }
+
+    // Set other generator configs from command line.
+
+    ExecutorService executor = Executors.newFixedThreadPool(_numThreads);
+    int cnt = 0;
+
+    for (final Path dataFilePath : dataFilePaths) {
+      final int segCnt = cnt;
+
+      executor.execute(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            SegmentGeneratorConfig config = new 
SegmentGeneratorConfig(tableConfig, schema);
+            config.setDataDir(_dataDir);
+            config.setOutDir(_outDir);
+            config.setOverwrite(_overwrite);
+            String localFile = dataFilePath.getName();
+            if (_inputFormat == null) {
+              _inputFormat = 
FileFormat.valueOf(FilenameUtils.getExtension(localFile).toUpperCase());
+            }
+            config.setFormat(_inputFormat);
+            Path localFilePath = new Path(localFile);
+            dataDirPath.getFileSystem(new 
Configuration()).copyToLocalFile(dataFilePath, localFilePath);
+            String md5;
+            try (InputStream is = Files.newInputStream(Paths.get(localFile))) {
+              md5 = org.apache.commons.codec.digest.DigestUtils.shaHex(is);
+            }
+            config.setInputFilePath(localFile);
+            config.setSegmentName(config.getTableName() + "_" + md5 + "_" + 
segCnt);
+
+            final SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+            switch (config.getFormat()) {
+              case PARQUET:
+                RecordReader parquetRecordReader = new ParquetRecordReader();
+                parquetRecordReader.init(config);
+                driver.init(config, parquetRecordReader);
+                break;
+              default:
+                driver.init(config);
+            }
+            driver.build();
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        }
+      });
+      cnt += 1;
+    }
+
+    executor.shutdown();
+    return executor.awaitTermination(1, TimeUnit.HOURS);
+  }
+
+  protected List<Path> getDataFilePaths(Path pathPattern)
+      throws IOException {
+    List<Path> tarFilePaths = new ArrayList<>();
+    FileSystem fileSystem = FileSystem.get(pathPattern.toUri(), new 
Configuration());
+    getDataFilePathsHelper(fileSystem, fileSystem.globStatus(pathPattern), 
tarFilePaths);
+    return tarFilePaths;
+  }
+
+  protected void getDataFilePathsHelper(FileSystem fileSystem, FileStatus[] 
fileStatuses, List<Path> tarFilePaths)
+      throws IOException {
+    for (FileStatus fileStatus : fileStatuses) {
+      Path path = fileStatus.getPath();
+      if (fileStatus.isDirectory()) {
+        getDataFilePathsHelper(fileSystem, fileSystem.listStatus(path), 
tarFilePaths);
+      } else {
+        if (isDataFile(path.getName())) {
+          tarFilePaths.add(path);
+        }
+      }
+    }
+  }
+
+  protected boolean isDataFile(String fileName) {
+    return fileName.endsWith(".avro") || fileName.endsWith(".csv") || 
fileName.endsWith(".json") || fileName
+        .endsWith(".thrift") || fileName.endsWith(".parquet");
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to