This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 054faf7 Adding field 'segmentCreationJobParallelism' to allow users to set segment generation job parallelism. Default to the number of input files. (#6012) 054faf7 is described below commit 054faf76cdef5a625d042a4d435d428529c8d798 Author: Xiang Fu <fx19880...@gmail.com> AuthorDate: Tue Sep 15 02:20:12 2020 -0700 Adding field 'segmentCreationJobParallelism' to allow users to set segment generation job parallelism. Default to the number of input files. (#6012) --- .../batch/hadoop/HadoopSegmentGenerationJobRunner.java | 6 +++++- .../batch/spark/SparkSegmentGenerationJobRunner.java | 12 +++++++++--- .../main/resources/segmentCreationAndTarPushJobSpec.yaml | 1 + .../spi/ingestion/batch/spec/SegmentGenerationJobSpec.java | 13 +++++++++++++ .../pinot/spi/ingestion/batch/IngestionJobLauncherTest.java | 2 ++ .../src/test/resources/ingestion_job_spec_template.yaml | 1 + 6 files changed, 31 insertions(+), 4 deletions(-) diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java index dee13e5..59beadc 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java @@ -230,7 +230,11 @@ public class HadoopSegmentGenerationJobRunner extends Configured implements Inge if (hadoopTokenFileLocation != null) { jobConf.set("mapreduce.job.credentials.binary", hadoopTokenFileLocation); } - jobConf.setInt(JobContext.NUM_MAPS, numDataFiles); + int jobParallelism = _spec.getSegmentCreationJobParallelism(); + if (jobParallelism <= 0 || jobParallelism > numDataFiles) { + jobParallelism = numDataFiles; + } + jobConf.setInt(JobContext.NUM_MAPS, jobParallelism); // Pinot plugins are necessary to launch Pinot ingestion job from every mapper. // In order to ensure pinot plugins would be loaded to each worker, this method diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java index c1b3f25..cdee5cf 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentGenerationJobRunner.java @@ -205,7 +205,8 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri } List<String> pathAndIdxList = new ArrayList<>(); - String localDirectorySequenceIdString = _spec.getSegmentNameGeneratorSpec().getConfigs().get(LOCAL_DIRECTORY_SEQUENCE_ID); + String localDirectorySequenceIdString = + _spec.getSegmentNameGeneratorSpec().getConfigs().get(LOCAL_DIRECTORY_SEQUENCE_ID); boolean localDirectorySequenceId = false; if (localDirectorySequenceIdString != null) { localDirectorySequenceId = Boolean.parseBoolean(localDirectorySequenceIdString); @@ -219,7 +220,7 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri } localDirIndex.get(filteredParentPath.toString()).add(filteredFile); } - for (String parentPath: localDirIndex.keySet()){ + for (String parentPath : localDirIndex.keySet()) { List<String> siblingFiles = localDirIndex.get(parentPath); Collections.sort(siblingFiles); for (int i = 0; i < siblingFiles.size(); i++) { @@ -231,7 +232,12 @@ public class SparkSegmentGenerationJobRunner implements IngestionJobRunner, Seri pathAndIdxList.add(String.format("%s %d", filteredFiles.get(i), i)); } } - JavaRDD<String> pathRDD = sparkContext.parallelize(pathAndIdxList, pathAndIdxList.size()); + int numDataFiles = pathAndIdxList.size(); + int jobParallelism = _spec.getSegmentCreationJobParallelism(); + if (jobParallelism <= 0 || jobParallelism > numDataFiles) { + jobParallelism = numDataFiles; + } + JavaRDD<String> pathRDD = sparkContext.parallelize(pathAndIdxList, jobParallelism); final String pluginsInclude = (sparkContext.getConf().contains(PLUGINS_INCLUDE_PROPERTY_NAME)) ? sparkContext.getConf() diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentCreationAndTarPushJobSpec.yaml b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentCreationAndTarPushJobSpec.yaml index 9b4da30..f4dd4b2 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentCreationAndTarPushJobSpec.yaml +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/resources/segmentCreationAndTarPushJobSpec.yaml @@ -28,6 +28,7 @@ includeFileNamePattern: 'glob:**/*.parquet' excludeFileNamePattern: 'glob:**/*.avro' outputDirURI: 'file:///path/to/output' overwriteOutput: true +parallelism: 10000 pinotFSSpecs: - scheme: file className: org.apache.pinot.spi.filesystem.LocalPinotFS diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java index 2e532c2..e41d5a4 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/SegmentGenerationJobSpec.java @@ -70,6 +70,11 @@ public class SegmentGenerationJobSpec implements Serializable { private String _outputDirURI; /** + * Segment creation job parallelism. + */ + private int _segmentCreationJobParallelism; + + /** * Should overwrite output segments if existed. */ private boolean _overwriteOutput; @@ -232,6 +237,14 @@ public class SegmentGenerationJobSpec implements Serializable { public void setPushJobSpec(PushJobSpec pushJobSpec) { _pushJobSpec = pushJobSpec; } + + public int getSegmentCreationJobParallelism() { + return _segmentCreationJobParallelism; + } + + public void setSegmentCreationJobParallelism(int segmentCreationJobParallelism) { + _segmentCreationJobParallelism = segmentCreationJobParallelism; + } } diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java index a00a15f..23110e8 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/ingestion/batch/IngestionJobLauncherTest.java @@ -60,6 +60,7 @@ public class IngestionJobLauncherTest { GroovyTemplateUtils.class.getClassLoader().getResource("job.config").getFile(), context); Assert.assertEquals(spec.getInputDirURI(), "file:///path/to/input/2020/06/07"); Assert.assertEquals(spec.getOutputDirURI(), "file:///path/to/output/2020/06/07"); + Assert.assertEquals(spec.getSegmentCreationJobParallelism(), 100); } @Test @@ -70,5 +71,6 @@ public class IngestionJobLauncherTest { GroovyTemplateUtils.class.getClassLoader().getResource("job_json.config").getFile(), null); Assert.assertEquals(spec.getInputDirURI(), "file:///path/to/input/2020/07/22"); Assert.assertEquals(spec.getOutputDirURI(), "file:///path/to/output/2020/07/22"); + Assert.assertEquals(spec.getSegmentCreationJobParallelism(), 0); } } diff --git a/pinot-spi/src/test/resources/ingestion_job_spec_template.yaml b/pinot-spi/src/test/resources/ingestion_job_spec_template.yaml index c20e88a..a786506 100644 --- a/pinot-spi/src/test/resources/ingestion_job_spec_template.yaml +++ b/pinot-spi/src/test/resources/ingestion_job_spec_template.yaml @@ -28,6 +28,7 @@ includeFileNamePattern: 'glob:**/*.parquet' excludeFileNamePattern: 'glob:**/*.avro' outputDirURI: 'file:///path/to/output/${year}/${month}/${day}' overwriteOutput: true +segmentCreationJobParallelism: 100 pinotFSSpecs: - scheme: file className: org.apache.pinot.spi.filesystem.LocalPinotFS --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org