This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f49328b  Add segment merge-rollup task executor (#5587)
f49328b is described below

commit f49328bf1d6fcc04eb73ed8a6c4555c231853fe9
Author: Seunghyun Lee <sn...@linkedin.com>
AuthorDate: Tue Jun 30 00:08:44 2020 -0700

    Add segment merge-rollup task executor (#5587)
    
    * Add segment merge-rollup task executor
    
    1. add segment merge-rollup task executor
    2. add the unit test for task executor
    
    * Updated the code format based on Pinot style
---
 .../apache/pinot/core/common/MinionConstants.java  |   6 +
 .../apache/pinot/core/minion/SegmentConverter.java |  27 ++---
 .../minion/rollup/MergeRollupSegmentConverter.java |  22 ++--
 .../impl/SegmentIndexCreationDriverImpl.java       |   4 +-
 .../minion/MergeRollupSegmentConverterTest.java    |  18 ++-
 .../minion/executor/MergeRollupTaskExecutor.java   |  80 ++++++++++++
 .../executor/MergeRollupTaskExecutorFactory.java   |  26 ++++
 .../executor/TaskExecutorFactoryRegistry.java      |   1 +
 .../executor/MergeRollupTaskExecutorTest.java      | 134 +++++++++++++++++++++
 .../segment/converter/SegmentMergeCommand.java     |   6 +-
 10 files changed, 281 insertions(+), 43 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 4b99b56..b8ff4df 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -55,4 +55,10 @@ public class MinionConstants {
   public static class PurgeTask {
     public static final String TASK_TYPE = "PurgeTask";
   }
+
+  public static class MergeRollupTask {
+    public static final String TASK_TYPE = "mergeRollupTask";
+    public static final String MERGE_TYPE_KEY = "mergeTypeKey";
+    public static final String MERGED_SEGMENT_NAME_KEY = 
"mergedSegmentNameKey";
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentConverter.java 
b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentConverter.java
index c399505..77b5b49 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentConverter.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentConverter.java
@@ -71,7 +71,6 @@ public class SegmentConverter {
   private RecordAggregator _recordAggregator;
   private List<String> _groupByColumns;
   private boolean _skipTimeValueCheck;
-  private IndexingConfig _indexingConfig;
 
   public SegmentConverter(List<File> inputIndexDirs, File workingDir, String 
tableName, String segmentName,
       int totalNumPartition, RecordTransformer recordTransformer, @Nullable 
RecordPartitioner recordPartitioner,
@@ -90,7 +89,6 @@ public class SegmentConverter {
     _recordAggregator = recordAggregator;
     _groupByColumns = groupByColumns;
     _skipTimeValueCheck = skipTimeValueCheck;
-    _indexingConfig = tableConfig.getIndexingConfig();
   }
 
   public List<File> convertSegment()
@@ -121,20 +119,19 @@ public class SegmentConverter {
       }
 
       // Sorting on sorted column and creating indices
-      if (_indexingConfig != null) {
-        List<String> sortedColumn = _indexingConfig.getSortedColumn();
-        List<String> invertedIndexColumns = 
_indexingConfig.getInvertedIndexColumns();
-
-        // Check if the table config has any index configured
-        if (CollectionUtils.isNotEmpty(sortedColumn) || 
CollectionUtils.isNotEmpty(invertedIndexColumns)) {
-          String indexGenerationOutputPath = _workingDir.getPath() + 
File.separator + INDEX_PREFIX + currentPartition;
-          try (PinotSegmentRecordReader pinotSegmentRecordReader = new 
PinotSegmentRecordReader(outputSegment, null,
-              sortedColumn)) {
-            buildSegment(indexGenerationOutputPath, outputSegmentName, 
pinotSegmentRecordReader,
-                pinotSegmentRecordReader.getSchema(), _tableConfig);
-          }
-          outputSegment = new File(indexGenerationOutputPath + File.separator 
+ outputSegmentName);
+      IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
+      List<String> sortedColumn = indexingConfig.getSortedColumn();
+      List<String> invertedIndexColumns = 
indexingConfig.getInvertedIndexColumns();
+
+      // Check if the table config has any index configured
+      if (CollectionUtils.isNotEmpty(sortedColumn) || 
CollectionUtils.isNotEmpty(invertedIndexColumns)) {
+        String indexGenerationOutputPath = _workingDir.getPath() + 
File.separator + INDEX_PREFIX + currentPartition;
+        try (PinotSegmentRecordReader pinotSegmentRecordReader = new 
PinotSegmentRecordReader(outputSegment, null,
+            sortedColumn)) {
+          buildSegment(indexGenerationOutputPath, outputSegmentName, 
pinotSegmentRecordReader,
+              pinotSegmentRecordReader.getSchema(), _tableConfig);
         }
+        outputSegment = new File(indexGenerationOutputPath + File.separator + 
outputSegmentName);
       }
 
       resultFiles.add(outputSegment);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/MergeRollupSegmentConverter.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/MergeRollupSegmentConverter.java
index 9e79070..ac3230e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/MergeRollupSegmentConverter.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/MergeRollupSegmentConverter.java
@@ -22,18 +22,15 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.core.minion.SegmentConverter;
 import org.apache.pinot.core.minion.segment.RecordAggregator;
 import org.apache.pinot.core.minion.segment.RecordTransformer;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
-import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
-import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 
 
@@ -49,18 +46,17 @@ public class MergeRollupSegmentConverter {
   private String _tableName;
   private String _segmentName;
   private MergeType _mergeType;
-  private Map<String, String> _rolllupPreAggregateType;
+  private Map<String, String> _rollupPreAggregateType;
 
-  private MergeRollupSegmentConverter(@Nonnull List<File> inputIndexDirs, 
@Nonnull File workingDir,
-      @Nonnull String tableName, @Nonnull String segmentName, @Nonnull String 
mergeType,
-      @Nullable Map<String, String> rollupPreAggregateType, TableConfig 
tableConfig) {
+  private MergeRollupSegmentConverter(List<File> inputIndexDirs, File 
workingDir, String tableName, String segmentName,
+      MergeType mergeType, TableConfig tableConfig, @Nullable Map<String, 
String> rollupPreAggregateType) {
     _inputIndexDirs = inputIndexDirs;
     _workingDir = workingDir;
     _tableName = tableName;
     _segmentName = segmentName;
-    _mergeType = MergeType.fromString(mergeType);
-    _rolllupPreAggregateType = rollupPreAggregateType;
+    _mergeType = mergeType;
     _tableConfig = tableConfig;
+    _rollupPreAggregateType = rollupPreAggregateType;
   }
 
   public List<File> convert()
@@ -122,7 +118,7 @@ public class MergeRollupSegmentConverter {
     RecordTransformer rollupRecordTransformer = (row) -> row;
 
     // Initialize roll-up record aggregator
-    RecordAggregator rollupRecordAggregator = new 
RollupRecordAggregator(schema, _rolllupPreAggregateType);
+    RecordAggregator rollupRecordAggregator = new 
RollupRecordAggregator(schema, _rollupPreAggregateType);
 
     SegmentConverter rollupSegmentConverter =
         new 
SegmentConverter.Builder().setTableName(_tableName).setSegmentName(_segmentName)
@@ -137,7 +133,7 @@ public class MergeRollupSegmentConverter {
     // Required
     private List<File> _inputIndexDirs;
     private File _workingDir;
-    private String _mergeType;
+    private MergeType _mergeType;
     private String _tableName;
     private String _segmentName;
     private TableConfig _tableConfig;
@@ -155,7 +151,7 @@ public class MergeRollupSegmentConverter {
       return this;
     }
 
-    public Builder setMergeType(String mergeType) {
+    public Builder setMergeType(MergeType mergeType) {
       _mergeType = mergeType;
       return this;
     }
@@ -182,7 +178,7 @@ public class MergeRollupSegmentConverter {
 
     public MergeRollupSegmentConverter build() {
       return new MergeRollupSegmentConverter(_inputIndexDirs, _workingDir, 
_tableName, _segmentName, _mergeType,
-          _rollupPreAggregateType, _tableConfig);
+          _tableConfig, _rollupPreAggregateType);
     }
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index ebb8c72..10fa29a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -257,7 +257,7 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
     FileUtils.deleteQuietly(tempIndexDir);
 
     // Convert segment format if necessary
-    convertFormatIfNeeded(segmentOutputDir);
+    convertFormatIfNecessary(segmentOutputDir);
 
     // Build star-tree V2 if necessary
     buildStarTreeV2IfNecessary(segmentOutputDir);
@@ -316,7 +316,7 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
   // Using converter is similar to option (2), plus it's battle-tested code. 
We will roll out with
   // this change to keep changes limited. Once we've migrated we can implement 
approach (1) with option to
   // copy for indexes for which we don't know sizes upfront.
-  private void convertFormatIfNeeded(File segmentDirectory)
+  private void convertFormatIfNecessary(File segmentDirectory)
       throws Exception {
     SegmentVersion versionToGenerate = config.getSegmentVersion();
     if (versionToGenerate.equals(SegmentVersion.v1)) {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/minion/MergeRollupSegmentConverterTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/minion/MergeRollupSegmentConverterTest.java
index 412f1c1..81430d1 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/minion/MergeRollupSegmentConverterTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/minion/MergeRollupSegmentConverterTest.java
@@ -25,13 +25,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.minion.rollup.MergeType;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.MetricFieldSpec;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.TimeFieldSpec;
 import org.apache.pinot.spi.data.TimeGranularitySpec;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.core.data.readers.GenericRowRecordReader;
@@ -85,11 +83,11 @@ public class MergeRollupSegmentConverterTest {
     for (int i = 0; i < NUM_ROWS; i++) {
       int dimensionValue = i % (NUM_ROWS / REPEAT_ROWS);
       GenericRow row = new GenericRow();
-      row.putField(D1, dimensionValue);
-      row.putField(D2, Integer.toString(dimensionValue));
-      row.putField(M1, (long) dimensionValue);
-      row.putField(M2, (double) dimensionValue);
-      row.putField(T, timestamp++);
+      row.putValue(D1, dimensionValue);
+      row.putValue(D2, Integer.toString(dimensionValue));
+      row.putValue(M1, (long) dimensionValue);
+      row.putValue(M2, (double) dimensionValue);
+      row.putValue(T, timestamp++);
       rows.add(row);
     }
 
@@ -113,7 +111,7 @@ public class MergeRollupSegmentConverterTest {
     // Run roll-up segment converter with "CONCATENATE" merge type
     MergeRollupSegmentConverter rollupSegmentConverter =
         new 
MergeRollupSegmentConverter.Builder().setInputIndexDirs(_segmentIndexDirList).setWorkingDir(WORKING_DIR)
-            
.setTableName(TABLE_NAME).setSegmentName("TestConcatenate").setMergeType("CONCATENATE")
+            
.setTableName(TABLE_NAME).setSegmentName("TestConcatenate").setMergeType(MergeType.CONCATENATE)
             .setTableConfig(_tableConfig).build();
     List<File> result = rollupSegmentConverter.convert();
     Assert.assertEquals(result.size(), 1);
@@ -153,7 +151,7 @@ public class MergeRollupSegmentConverterTest {
     // Run roll-up segment converter with "ROLLUP" merge type
     MergeRollupSegmentConverter rollupSegmentConverter =
         new 
MergeRollupSegmentConverter.Builder().setInputIndexDirs(_segmentIndexDirList).setWorkingDir(WORKING_DIR)
-            
.setTableName(TABLE_NAME).setSegmentName("TestSimpleRollup").setMergeType("ROLLUP")
+            
.setTableName(TABLE_NAME).setSegmentName("TestSimpleRollup").setMergeType(MergeType.ROLLUP)
             
.setRollupPreAggregateType(preAggregateType).setTableConfig(_tableConfig).build();
     List<File> result = rollupSegmentConverter.convert();
     Assert.assertEquals(result.size(), 1);
diff --git 
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutor.java
 
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutor.java
new file mode 100644
index 0000000..b94cf6a
--- /dev/null
+++ 
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutor.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.minion.executor;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.core.minion.rollup.MergeRollupSegmentConverter;
+import org.apache.pinot.core.minion.rollup.MergeType;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Task executor that provides merge and rollup service
+ *
+ * TODO:
+ *   1. Add the support for roll-up
+ *   2. Add the support for time split to provide backfill support for merged 
segments
+ *   3. Change the way to decide the number of output segments (explicit 
numPartition config -> maxNumRowsPerSegment)
+ */
+public class MergeRollupTaskExecutor extends 
BaseMultipleSegmentsConversionExecutor {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
+
+  @Override
+  protected List<SegmentConversionResult> convert(PinotTaskConfig 
pinotTaskConfig, List<File> originalIndexDirs,
+      File workingDir)
+      throws Exception {
+    Map<String, String> configs = pinotTaskConfig.getConfigs();
+    String mergeTypeString = 
configs.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY);
+    // TODO: add the support for rollup
+    Preconditions.checkNotNull(mergeTypeString, "MergeType cannot be null");
+
+    MergeType mergeType = MergeType.fromString(mergeTypeString);
+    Preconditions.checkState(mergeType == MergeType.CONCATENATE, "Only 
'CONCATENATE' mode is currently supported.");
+
+    String mergedSegmentName = 
configs.get(MinionConstants.MergeRollupTask.MERGED_SEGMENT_NAME_KEY);
+    String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
+
+    TableConfig tableConfig =
+        
ZKMetadataProvider.getTableConfig(MINION_CONTEXT.getHelixPropertyStore(), 
tableNameWithType);
+
+    MergeRollupSegmentConverter rollupSegmentConverter =
+        new 
MergeRollupSegmentConverter.Builder().setMergeType(mergeType).setTableName(tableNameWithType)
+            
.setSegmentName(mergedSegmentName).setInputIndexDirs(originalIndexDirs).setWorkingDir(workingDir)
+            .setTableConfig(tableConfig).build();
+
+    List<File> resultFiles = rollupSegmentConverter.convert();
+    List<SegmentConversionResult> results = new ArrayList<>();
+    for (File file : resultFiles) {
+      String outputSegmentName = file.getName();
+      results.add(new 
SegmentConversionResult.Builder().setFile(file).setSegmentName(outputSegmentName)
+          .setTableNameWithType(tableNameWithType).build());
+    }
+    return results;
+  }
+}
diff --git 
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutorFactory.java
 
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutorFactory.java
new file mode 100644
index 0000000..66d2c86
--- /dev/null
+++ 
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutorFactory.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.minion.executor;
+
+public class MergeRollupTaskExecutorFactory implements 
PinotTaskExecutorFactory {
+  @Override
+  public PinotTaskExecutor create() {
+    return new MergeRollupTaskExecutor();
+  }
+}
diff --git 
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
 
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
index 71a35d8..ba3646c 100644
--- 
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
+++ 
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
@@ -35,6 +35,7 @@ public class TaskExecutorFactoryRegistry {
     
registerTaskExecutorFactory(MinionConstants.ConvertToRawIndexTask.TASK_TYPE,
         new ConvertToRawIndexTaskExecutorFactory());
     registerTaskExecutorFactory(MinionConstants.PurgeTask.TASK_TYPE, new 
PurgeTaskExecutorFactory());
+    registerTaskExecutorFactory(MinionConstants.MergeRollupTask.TASK_TYPE, new 
MergeRollupTaskExecutorFactory());
   }
 
   /**
diff --git 
a/pinot-minion/src/test/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutorTest.java
 
b/pinot-minion/src/test/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutorTest.java
new file mode 100644
index 0000000..245a085
--- /dev/null
+++ 
b/pinot-minion/src/test/java/org/apache/pinot/minion/executor/MergeRollupTaskExecutorTest.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.minion.executor;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.AccessOption;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.utils.config.TableConfigUtils;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.core.data.readers.GenericRowRecordReader;
+import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.core.minion.rollup.MergeType;
+import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.minion.MinionContext;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+
+
+public class MergeRollupTaskExecutorTest {
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), 
"MergeRollupTaskExecutorTest");
+  private static final File ORIGINAL_SEGMENT_DIR = new File(TEMP_DIR, 
"originalSegment");
+  private static final File WORKING_DIR = new File(TEMP_DIR, "workingDir");
+  private static final int NUM_SEGMENTS = 10;
+  private static final int NUM_ROWS = 5;
+  private static final String MERGED_SEGMENT_NAME = "testMergedSegment";
+  private static final String TABLE_NAME = "testTable";
+  private static final String D1 = "d1";
+
+  private List<File> _segmentIndexDirList;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteDirectory(TEMP_DIR);
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+    Schema schema = new Schema.SchemaBuilder().addSingleValueDimension(D1, 
FieldSpec.DataType.INT).build();
+
+    List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
+    for (int i = 0; i < NUM_ROWS; i++) {
+      GenericRow row = new GenericRow();
+      row.putValue(D1, i);
+      rows.add(row);
+    }
+
+    _segmentIndexDirList = new ArrayList<>();
+    for (int i = 0; i < NUM_SEGMENTS; i++) {
+      String segmentName = MERGED_SEGMENT_NAME + i;
+      RecordReader recordReader = new GenericRowRecordReader(rows);
+      SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, 
schema);
+      config.setOutDir(ORIGINAL_SEGMENT_DIR.getPath());
+      config.setTableName(TABLE_NAME);
+      config.setSegmentName(segmentName);
+      SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+      driver.init(config, recordReader);
+      driver.build();
+      _segmentIndexDirList.add(new File(ORIGINAL_SEGMENT_DIR, segmentName));
+    }
+
+    MinionContext minionContext = MinionContext.getInstance();
+    @SuppressWarnings("unchecked")
+    ZkHelixPropertyStore<ZNRecord> helixPropertyStore = 
mock(ZkHelixPropertyStore.class);
+    when(helixPropertyStore.get("/CONFIGS/TABLE/testTable_OFFLINE", null, 
AccessOption.PERSISTENT))
+        .thenReturn(TableConfigUtils.toZNRecord(tableConfig));
+    minionContext.setHelixPropertyStore(helixPropertyStore);
+  }
+
+  @Test
+  public void testConvert()
+      throws Exception {
+    MergeRollupTaskExecutor mergeRollupTaskExecutor = new 
MergeRollupTaskExecutor();
+    Map<String, String> configs = new HashMap<>();
+    configs.put(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY, 
MergeType.CONCATENATE.toString());
+    configs.put(MinionConstants.MergeRollupTask.MERGED_SEGMENT_NAME_KEY, 
MERGED_SEGMENT_NAME);
+    configs.put(MinionConstants.TABLE_NAME_KEY, "testTable_OFFLINE");
+
+    PinotTaskConfig pinotTaskConfig = new 
PinotTaskConfig(MinionConstants.MergeRollupTask.TASK_TYPE, configs);
+    List<SegmentConversionResult> conversionResults =
+        mergeRollupTaskExecutor.convert(pinotTaskConfig, _segmentIndexDirList, 
WORKING_DIR);
+
+    Assert.assertEquals(conversionResults.size(), 1);
+    Assert.assertEquals(conversionResults.get(0).getSegmentName(), 
MERGED_SEGMENT_NAME);
+    File mergedSegment = conversionResults.get(0).getFile();
+    try (PinotSegmentRecordReader pinotSegmentRecordReader = new 
PinotSegmentRecordReader(mergedSegment)) {
+      int numRecords = 0;
+      GenericRow row = new GenericRow();
+      while (pinotSegmentRecordReader.hasNext()) {
+        row = pinotSegmentRecordReader.next(row);
+        numRecords++;
+      }
+      Assert.assertEquals(numRecords, NUM_SEGMENTS * NUM_ROWS);
+    }
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    FileUtils.deleteDirectory(TEMP_DIR);
+  }
+}
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/SegmentMergeCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/SegmentMergeCommand.java
index 76c691d..ff37a4f 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/SegmentMergeCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/SegmentMergeCommand.java
@@ -194,9 +194,9 @@ public class SegmentMergeCommand extends 
AbstractBaseAdminCommand implements Com
       // TODO: add support for rollup
       String tableName = 
TableNameBuilder.extractRawTableName(tableConfig.getTableName());
       MergeRollupSegmentConverter mergeRollupSegmentConverter =
-          new 
MergeRollupSegmentConverter.Builder().setMergeType(_mergeType).setSegmentName(_outputSegmentName)
-              
.setInputIndexDirs(inputIndexDirs).setWorkingDir(workingDir).setTableName(tableName)
-              .setTableConfig(tableConfig).build();
+          new 
MergeRollupSegmentConverter.Builder().setMergeType(MergeType.fromString(_mergeType))
+              
.setSegmentName(_outputSegmentName).setInputIndexDirs(inputIndexDirs).setWorkingDir(workingDir)
+              .setTableName(tableName).setTableConfig(tableConfig).build();
 
       List<File> outputSegments = mergeRollupSegmentConverter.convert();
       Preconditions.checkState(outputSegments.size() == 1);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to