This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new f49328b Add segment merge-rollup task executor (#5587) f49328b is described below commit f49328bf1d6fcc04eb73ed8a6c4555c231853fe9 Author: Seunghyun Lee <sn...@linkedin.com> AuthorDate: Tue Jun 30 00:08:44 2020 -0700 Add segment merge-rollup task executor (#5587) * Add segment merge-rollup task executor 1. add segment merge-rollup task executor 2. add the unit test for task executor * Updated the code format based on Pinot style --- .../apache/pinot/core/common/MinionConstants.java | 6 + .../apache/pinot/core/minion/SegmentConverter.java | 27 ++--- .../minion/rollup/MergeRollupSegmentConverter.java | 22 ++-- .../impl/SegmentIndexCreationDriverImpl.java | 4 +- .../minion/MergeRollupSegmentConverterTest.java | 18 ++- .../minion/executor/MergeRollupTaskExecutor.java | 80 ++++++++++++ .../executor/MergeRollupTaskExecutorFactory.java | 26 ++++ .../executor/TaskExecutorFactoryRegistry.java | 1 + .../executor/MergeRollupTaskExecutorTest.java | 134 +++++++++++++++++++++ .../segment/converter/SegmentMergeCommand.java | 6 +- 10 files changed, 281 insertions(+), 43 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java index 4b99b56..b8ff4df 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java @@ -55,4 +55,10 @@ public class MinionConstants { public static class PurgeTask { public static final String TASK_TYPE = "PurgeTask"; } + + public static class MergeRollupTask { + public static final String TASK_TYPE = "mergeRollupTask"; + public static final String MERGE_TYPE_KEY = "mergeTypeKey"; + public static final String MERGED_SEGMENT_NAME_KEY = "mergedSegmentNameKey"; + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentConverter.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentConverter.java index c399505..77b5b49 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentConverter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentConverter.java @@ -71,7 +71,6 @@ public class SegmentConverter { private RecordAggregator _recordAggregator; private List<String> _groupByColumns; private boolean _skipTimeValueCheck; - private IndexingConfig _indexingConfig; public SegmentConverter(List<File> inputIndexDirs, File workingDir, String tableName, String segmentName, int totalNumPartition, RecordTransformer recordTransformer, @Nullable RecordPartitioner recordPartitioner, @@ -90,7 +89,6 @@ public class SegmentConverter { _recordAggregator = recordAggregator; _groupByColumns = groupByColumns; _skipTimeValueCheck = skipTimeValueCheck; - _indexingConfig = tableConfig.getIndexingConfig(); } public List<File> convertSegment() @@ -121,20 +119,19 @@ public class SegmentConverter { } // Sorting on sorted column and creating indices - if (_indexingConfig != null) { - List<String> sortedColumn = _indexingConfig.getSortedColumn(); - List<String> invertedIndexColumns = _indexingConfig.getInvertedIndexColumns(); - - // Check if the table config has any index configured - if (CollectionUtils.isNotEmpty(sortedColumn) || CollectionUtils.isNotEmpty(invertedIndexColumns)) { - String indexGenerationOutputPath = _workingDir.getPath() + File.separator + INDEX_PREFIX + currentPartition; - try (PinotSegmentRecordReader pinotSegmentRecordReader = new PinotSegmentRecordReader(outputSegment, null, - sortedColumn)) { - buildSegment(indexGenerationOutputPath, outputSegmentName, pinotSegmentRecordReader, - pinotSegmentRecordReader.getSchema(), _tableConfig); - } - outputSegment = new File(indexGenerationOutputPath + File.separator + outputSegmentName); + IndexingConfig indexingConfig = _tableConfig.getIndexingConfig(); + List<String> sortedColumn = indexingConfig.getSortedColumn(); + List<String> invertedIndexColumns = indexingConfig.getInvertedIndexColumns(); + + // Check if the table config has any index configured + if (CollectionUtils.isNotEmpty(sortedColumn) || CollectionUtils.isNotEmpty(invertedIndexColumns)) { + String indexGenerationOutputPath = _workingDir.getPath() + File.separator + INDEX_PREFIX + currentPartition; + try (PinotSegmentRecordReader pinotSegmentRecordReader = new PinotSegmentRecordReader(outputSegment, null, + sortedColumn)) { + buildSegment(indexGenerationOutputPath, outputSegmentName, pinotSegmentRecordReader, + pinotSegmentRecordReader.getSchema(), _tableConfig); } + outputSegment = new File(indexGenerationOutputPath + File.separator + outputSegmentName); } resultFiles.add(outputSegment); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/MergeRollupSegmentConverter.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/MergeRollupSegmentConverter.java index 9e79070..ac3230e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/MergeRollupSegmentConverter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/MergeRollupSegmentConverter.java @@ -22,18 +22,15 @@ import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.Map; -import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.apache.pinot.common.exception.InvalidConfigException; import org.apache.pinot.core.minion.SegmentConverter; import org.apache.pinot.core.minion.segment.RecordAggregator; import org.apache.pinot.core.minion.segment.RecordTransformer; import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl; -import org.apache.pinot.spi.config.table.IndexingConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.DimensionFieldSpec; -import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; @@ -49,18 +46,17 @@ public class MergeRollupSegmentConverter { private String _tableName; private String _segmentName; private MergeType _mergeType; - private Map<String, String> _rolllupPreAggregateType; + private Map<String, String> _rollupPreAggregateType; - private MergeRollupSegmentConverter(@Nonnull List<File> inputIndexDirs, @Nonnull File workingDir, - @Nonnull String tableName, @Nonnull String segmentName, @Nonnull String mergeType, - @Nullable Map<String, String> rollupPreAggregateType, TableConfig tableConfig) { + private MergeRollupSegmentConverter(List<File> inputIndexDirs, File workingDir, String tableName, String segmentName, + MergeType mergeType, TableConfig tableConfig, @Nullable Map<String, String> rollupPreAggregateType) { _inputIndexDirs = inputIndexDirs; _workingDir = workingDir; _tableName = tableName; _segmentName = segmentName; - _mergeType = MergeType.fromString(mergeType); - _rolllupPreAggregateType = rollupPreAggregateType; + _mergeType = mergeType; _tableConfig = tableConfig; + _rollupPreAggregateType = rollupPreAggregateType; } public List<File> convert() @@ -122,7 +118,7 @@ public class MergeRollupSegmentConverter { RecordTransformer rollupRecordTransformer = (row) -> row; // Initialize roll-up record aggregator - RecordAggregator rollupRecordAggregator = new RollupRecordAggregator(schema, _rolllupPreAggregateType); + RecordAggregator rollupRecordAggregator = new RollupRecordAggregator(schema, _rollupPreAggregateType); SegmentConverter rollupSegmentConverter = new SegmentConverter.Builder().setTableName(_tableName).setSegmentName(_segmentName) @@ -137,7 +133,7 @@ public class MergeRollupSegmentConverter { // Required private List<File> _inputIndexDirs; private File _workingDir; - private String _mergeType; + private MergeType _mergeType; private String _tableName; private String _segmentName; private TableConfig _tableConfig; @@ -155,7 +151,7 @@ public class MergeRollupSegmentConverter { return this; } - public Builder setMergeType(String mergeType) { + public Builder setMergeType(MergeType mergeType) { _mergeType = mergeType; return this; } @@ -182,7 +178,7 @@ public class MergeRollupSegmentConverter { public MergeRollupSegmentConverter build() { return new MergeRollupSegmentConverter(_inputIndexDirs, _workingDir, _tableName, _segmentName, _mergeType, - _rollupPreAggregateType, _tableConfig); + _tableConfig, _rollupPreAggregateType); } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java index ebb8c72..10fa29a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java @@ -257,7 +257,7 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive FileUtils.deleteQuietly(tempIndexDir); // Convert segment format if necessary - convertFormatIfNeeded(segmentOutputDir); + convertFormatIfNecessary(segmentOutputDir); // Build star-tree V2 if necessary buildStarTreeV2IfNecessary(segmentOutputDir); @@ -316,7 +316,7 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive // Using converter is similar to option (2), plus it's battle-tested code. We will roll out with // this change to keep changes limited. Once we've migrated we can implement approach (1) with option to // copy for indexes for which we don't know sizes upfront. - private void convertFormatIfNeeded(File segmentDirectory) + private void convertFormatIfNecessary(File segmentDirectory) throws Exception { SegmentVersion versionToGenerate = config.getSegmentVersion(); if (versionToGenerate.equals(SegmentVersion.v1)) { diff --git a/pinot-core/src/test/java/org/apache/pinot/core/minion/MergeRollupSegmentConverterTest.java b/pinot-core/src/test/java/org/apache/pinot/core/minion/MergeRollupSegmentConverterTest.java index 412f1c1..81430d1 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/minion/MergeRollupSegmentConverterTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/minion/MergeRollupSegmentConverterTest.java @@ -25,13 +25,11 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; +import org.apache.pinot.core.minion.rollup.MergeType; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; -import org.apache.pinot.spi.data.DimensionFieldSpec; import org.apache.pinot.spi.data.FieldSpec; -import org.apache.pinot.spi.data.MetricFieldSpec; import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.spi.data.TimeFieldSpec; import org.apache.pinot.spi.data.TimeGranularitySpec; import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.core.data.readers.GenericRowRecordReader; @@ -85,11 +83,11 @@ public class MergeRollupSegmentConverterTest { for (int i = 0; i < NUM_ROWS; i++) { int dimensionValue = i % (NUM_ROWS / REPEAT_ROWS); GenericRow row = new GenericRow(); - row.putField(D1, dimensionValue); - row.putField(D2, Integer.toString(dimensionValue)); - row.putField(M1, (long) dimensionValue); - row.putField(M2, (double) dimensionValue); - row.putField(T, timestamp++); + row.putValue(D1, dimensionValue); + row.putValue(D2, Integer.toString(dimensionValue)); + row.putValue(M1, (long) dimensionValue); + row.putValue(M2, (double) dimensionValue); + row.putValue(T, timestamp++); rows.add(row); } @@ -113,7 +111,7 @@ public class MergeRollupSegmentConverterTest { // Run roll-up segment converter with "CONCATENATE" merge type MergeRollupSegmentConverter rollupSegmentConverter = new MergeRollupSegmentConverter.Builder().setInputIndexDirs(_segmentIndexDirList).setWorkingDir(WORKING_DIR) - .setTableName(TABLE_NAME).setSegmentName("TestConcatenate").setMergeType("CONCATENATE") + .setTableName(TABLE_NAME).setSegmentName("TestConcatenate").setMergeType(MergeType.CONCATENATE) .setTableConfig(_tableConfig).build(); List<File> result = rollupSegmentConverter.convert(); Assert.assertEquals(result.size(), 1); @@ -153,7 +151,7 @@ public class MergeRollupSegmentConverterTest { // Run roll-up segment converter with "ROLLUP" merge type MergeRollupSegmentConverter rollupSegmentConverter = new MergeRollupSegmentConverter.Builder().setInputIndexDirs(_segmentIndexDirList).setWorkingDir(WORKING_DIR) - .setTableName(TABLE_NAME).setSegmentName("TestSimpleRollup").setMergeType("ROLLUP") + .setTableName(TABLE_NAME).setSegmentName("TestSimpleRollup").setMergeType(MergeType.ROLLUP) .setRollupPreAggregateType(preAggregateType).setTableConfig(_tableConfig).build(); List<File> result = rollupSegmentConverter.convert(); Assert.assertEquals(result.size(), 1); diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutor.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutor.java new file mode 100644 index 0000000..b94cf6a --- /dev/null +++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutor.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.minion.executor; + +import com.google.common.base.Preconditions; +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.core.minion.rollup.MergeRollupSegmentConverter; +import org.apache.pinot.core.minion.rollup.MergeType; +import org.apache.pinot.spi.config.table.TableConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Task executor that provides merge and rollup service + * + * TODO: + * 1. Add the support for roll-up + * 2. Add the support for time split to provide backfill support for merged segments + * 3. Change the way to decide the number of output segments (explicit numPartition config -> maxNumRowsPerSegment) + */ +public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecutor { + private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskExecutor.class); + + @Override + protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> originalIndexDirs, + File workingDir) + throws Exception { + Map<String, String> configs = pinotTaskConfig.getConfigs(); + String mergeTypeString = configs.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY); + // TODO: add the support for rollup + Preconditions.checkNotNull(mergeTypeString, "MergeType cannot be null"); + + MergeType mergeType = MergeType.fromString(mergeTypeString); + Preconditions.checkState(mergeType == MergeType.CONCATENATE, "Only 'CONCATENATE' mode is currently supported."); + + String mergedSegmentName = configs.get(MinionConstants.MergeRollupTask.MERGED_SEGMENT_NAME_KEY); + String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY); + + TableConfig tableConfig = + ZKMetadataProvider.getTableConfig(MINION_CONTEXT.getHelixPropertyStore(), tableNameWithType); + + MergeRollupSegmentConverter rollupSegmentConverter = + new MergeRollupSegmentConverter.Builder().setMergeType(mergeType).setTableName(tableNameWithType) + .setSegmentName(mergedSegmentName).setInputIndexDirs(originalIndexDirs).setWorkingDir(workingDir) + .setTableConfig(tableConfig).build(); + + List<File> resultFiles = rollupSegmentConverter.convert(); + List<SegmentConversionResult> results = new ArrayList<>(); + for (File file : resultFiles) { + String outputSegmentName = file.getName(); + results.add(new SegmentConversionResult.Builder().setFile(file).setSegmentName(outputSegmentName) + .setTableNameWithType(tableNameWithType).build()); + } + return results; + } +} diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutorFactory.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutorFactory.java new file mode 100644 index 0000000..66d2c86 --- /dev/null +++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutorFactory.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.minion.executor; + +public class MergeRollupTaskExecutorFactory implements PinotTaskExecutorFactory { + @Override + public PinotTaskExecutor create() { + return new MergeRollupTaskExecutor(); + } +} diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java index 71a35d8..ba3646c 100644 --- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java +++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java @@ -35,6 +35,7 @@ public class TaskExecutorFactoryRegistry { registerTaskExecutorFactory(MinionConstants.ConvertToRawIndexTask.TASK_TYPE, new ConvertToRawIndexTaskExecutorFactory()); registerTaskExecutorFactory(MinionConstants.PurgeTask.TASK_TYPE, new PurgeTaskExecutorFactory()); + registerTaskExecutorFactory(MinionConstants.MergeRollupTask.TASK_TYPE, new MergeRollupTaskExecutorFactory()); } /** diff --git a/pinot-minion/src/test/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutorTest.java b/pinot-minion/src/test/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutorTest.java new file mode 100644 index 0000000..245a085 --- /dev/null +++ b/pinot-minion/src/test/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutorTest.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.minion.executor; + +import java.io.File; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.helix.AccessOption; +import org.apache.helix.ZNRecord; +import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.pinot.common.utils.config.TableConfigUtils; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.data.readers.GenericRowRecordReader; +import org.apache.pinot.core.data.readers.PinotSegmentRecordReader; +import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; +import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.core.minion.rollup.MergeType; +import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.minion.MinionContext; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.data.readers.RecordReader; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.*; + + +public class MergeRollupTaskExecutorTest { + private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "MergeRollupTaskExecutorTest"); + private static final File ORIGINAL_SEGMENT_DIR = new File(TEMP_DIR, "originalSegment"); + private static final File WORKING_DIR = new File(TEMP_DIR, "workingDir"); + private static final int NUM_SEGMENTS = 10; + private static final int NUM_ROWS = 5; + private static final String MERGED_SEGMENT_NAME = "testMergedSegment"; + private static final String TABLE_NAME = "testTable"; + private static final String D1 = "d1"; + + private List<File> _segmentIndexDirList; + + @BeforeClass + public void setUp() + throws Exception { + FileUtils.deleteDirectory(TEMP_DIR); + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build(); + Schema schema = new Schema.SchemaBuilder().addSingleValueDimension(D1, FieldSpec.DataType.INT).build(); + + List<GenericRow> rows = new ArrayList<>(NUM_ROWS); + for (int i = 0; i < NUM_ROWS; i++) { + GenericRow row = new GenericRow(); + row.putValue(D1, i); + rows.add(row); + } + + _segmentIndexDirList = new ArrayList<>(); + for (int i = 0; i < NUM_SEGMENTS; i++) { + String segmentName = MERGED_SEGMENT_NAME + i; + RecordReader recordReader = new GenericRowRecordReader(rows); + SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema); + config.setOutDir(ORIGINAL_SEGMENT_DIR.getPath()); + config.setTableName(TABLE_NAME); + config.setSegmentName(segmentName); + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + driver.init(config, recordReader); + driver.build(); + _segmentIndexDirList.add(new File(ORIGINAL_SEGMENT_DIR, segmentName)); + } + + MinionContext minionContext = MinionContext.getInstance(); + @SuppressWarnings("unchecked") + ZkHelixPropertyStore<ZNRecord> helixPropertyStore = mock(ZkHelixPropertyStore.class); + when(helixPropertyStore.get("/CONFIGS/TABLE/testTable_OFFLINE", null, AccessOption.PERSISTENT)) + .thenReturn(TableConfigUtils.toZNRecord(tableConfig)); + minionContext.setHelixPropertyStore(helixPropertyStore); + } + + @Test + public void testConvert() + throws Exception { + MergeRollupTaskExecutor mergeRollupTaskExecutor = new MergeRollupTaskExecutor(); + Map<String, String> configs = new HashMap<>(); + configs.put(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY, MergeType.CONCATENATE.toString()); + configs.put(MinionConstants.MergeRollupTask.MERGED_SEGMENT_NAME_KEY, MERGED_SEGMENT_NAME); + configs.put(MinionConstants.TABLE_NAME_KEY, "testTable_OFFLINE"); + + PinotTaskConfig pinotTaskConfig = new PinotTaskConfig(MinionConstants.MergeRollupTask.TASK_TYPE, configs); + List<SegmentConversionResult> conversionResults = + mergeRollupTaskExecutor.convert(pinotTaskConfig, _segmentIndexDirList, WORKING_DIR); + + Assert.assertEquals(conversionResults.size(), 1); + Assert.assertEquals(conversionResults.get(0).getSegmentName(), MERGED_SEGMENT_NAME); + File mergedSegment = conversionResults.get(0).getFile(); + try (PinotSegmentRecordReader pinotSegmentRecordReader = new PinotSegmentRecordReader(mergedSegment)) { + int numRecords = 0; + GenericRow row = new GenericRow(); + while (pinotSegmentRecordReader.hasNext()) { + row = pinotSegmentRecordReader.next(row); + numRecords++; + } + Assert.assertEquals(numRecords, NUM_SEGMENTS * NUM_ROWS); + } + } + + @AfterClass + public void tearDown() + throws Exception { + FileUtils.deleteDirectory(TEMP_DIR); + } +} diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/SegmentMergeCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/SegmentMergeCommand.java index 76c691d..ff37a4f 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/SegmentMergeCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/SegmentMergeCommand.java @@ -194,9 +194,9 @@ public class SegmentMergeCommand extends AbstractBaseAdminCommand implements Com // TODO: add support for rollup String tableName = TableNameBuilder.extractRawTableName(tableConfig.getTableName()); MergeRollupSegmentConverter mergeRollupSegmentConverter = - new MergeRollupSegmentConverter.Builder().setMergeType(_mergeType).setSegmentName(_outputSegmentName) - .setInputIndexDirs(inputIndexDirs).setWorkingDir(workingDir).setTableName(tableName) - .setTableConfig(tableConfig).build(); + new MergeRollupSegmentConverter.Builder().setMergeType(MergeType.fromString(_mergeType)) + .setSegmentName(_outputSegmentName).setInputIndexDirs(inputIndexDirs).setWorkingDir(workingDir) + .setTableName(tableName).setTableConfig(tableConfig).build(); List<File> outputSegments = mergeRollupSegmentConverter.convert(); Preconditions.checkState(outputSegments.size() == 1); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org