no sort datawriter

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/dbc5262a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/dbc5262a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/dbc5262a

Branch: refs/heads/12-dev
Commit: dbc5262a0186a0da60fc1fb289162b1f34313aba
Parents: a39e777
Author: QiangCai <qiang...@qq.com>
Authored: Thu Apr 6 00:09:06 2017 +0530
Committer: jackylk <jacky.li...@huawei.com>
Committed: Sun Apr 9 11:25:38 2017 +0800

----------------------------------------------------------------------
 .../processing/merger/CarbonDataMergerUtil.java |   2 +-
 .../processing/model/CarbonLoadModel.java       |   6 +-
 .../newflow/DataLoadProcessBuilder.java         |  38 ++-
 .../CarbonRowDataWriterProcessorStepImpl.java   | 321 +++++++++++++++++++
 .../newflow/steps/NoSortProcessorStepImpl.java  | 153 ---------
 .../store/CarbonDataFileAttributes.java         |  10 +-
 .../store/CarbonFactDataHandlerModel.java       |   2 +-
 .../store/writer/AbstractFactDataWriter.java    |   4 +-
 8 files changed, 361 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/dbc5262a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 2414993..cbd5a25 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -170,7 +170,7 @@ public final class CarbonDataMergerUtil {
     boolean updateLockStatus = false;
     boolean tableLockStatus = false;
 
-    String timestamp = carbonLoadModel.getFactTimeStamp();
+    String timestamp = "" + carbonLoadModel.getFactTimeStamp();
 
     List<String> updatedDeltaFilesList =
         new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/dbc5262a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
 
b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
index 525874f..963a51b 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
@@ -82,7 +82,7 @@ public class CarbonLoadModel implements Serializable {
   /**
    * new load start time
    */
-  private String factTimeStamp;
+  private long factTimeStamp;
   /**
    * load Id
    */
@@ -587,7 +587,7 @@ public class CarbonLoadModel implements Serializable {
   /**
    * @return
    */
-  public String getFactTimeStamp() {
+  public long getFactTimeStamp() {
     return factTimeStamp;
   }
 
@@ -595,7 +595,7 @@ public class CarbonLoadModel implements Serializable {
    * @param factTimeStamp
    */
   public void setFactTimeStamp(long factTimeStamp) {
-    this.factTimeStamp = factTimeStamp + "";
+    this.factTimeStamp = factTimeStamp;
   }
 
   public String[] getDelimiters() {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/dbc5262a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
index 7dbffaf..b8b098b 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
@@ -35,12 +35,12 @@ import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.processing.model.CarbonLoadModel;
 import 
org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
+import 
org.apache.carbondata.processing.newflow.steps.CarbonRowDataWriterProcessorStepImpl;
 import 
org.apache.carbondata.processing.newflow.steps.DataConverterProcessorStepImpl;
 import 
org.apache.carbondata.processing.newflow.steps.DataConverterProcessorWithBucketingStepImpl;
 import 
org.apache.carbondata.processing.newflow.steps.DataWriterBatchProcessorStepImpl;
 import 
org.apache.carbondata.processing.newflow.steps.DataWriterProcessorStepImpl;
 import org.apache.carbondata.processing.newflow.steps.InputProcessorStepImpl;
-import org.apache.carbondata.processing.newflow.steps.NoSortProcessorStepImpl;
 import org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
@@ -59,7 +59,9 @@ public final class DataLoadProcessBuilder {
             CarbonCommonConstants.LOAD_USE_BATCH_SORT_DEFAULT));
     CarbonDataLoadConfiguration configuration =
         createConfiguration(loadModel, storeLocation);
-    if (configuration.getBucketingInfo() != null) {
+    if (!configuration.isSortTable()) {
+      return buildInternalForNoSort(inputIterators, configuration);
+    } else if (configuration.getBucketingInfo() != null) {
       return buildInternalForBucketing(inputIterators, configuration);
     } else if (batchSort) {
       return buildInternalForBatchSort(inputIterators, configuration);
@@ -77,16 +79,30 @@ public final class DataLoadProcessBuilder {
     // data types and configurations.
     AbstractDataLoadProcessorStep converterProcessorStep =
         new DataConverterProcessorStepImpl(configuration, inputProcessorStep);
-    // 3. Sorts the data by SortColumn or not
-    AbstractDataLoadProcessorStep sortProcessorStep = 
configuration.isSortTable() ?
-        new SortProcessorStepImpl(configuration, converterProcessorStep) :
-        new NoSortProcessorStepImpl(configuration, converterProcessorStep);
+    // 3. Sorts the data by SortColumn
+    AbstractDataLoadProcessorStep sortProcessorStep =
+        new SortProcessorStepImpl(configuration, converterProcessorStep);
     // 4. Writes the sorted data in carbondata format.
     AbstractDataLoadProcessorStep writerProcessorStep =
         new DataWriterProcessorStepImpl(configuration, sortProcessorStep);
     return writerProcessorStep;
   }
 
+  private AbstractDataLoadProcessorStep 
buildInternalForNoSort(CarbonIterator[] inputIterators,
+      CarbonDataLoadConfiguration configuration) {
+    // 1. Reads the data input iterators and parses the data.
+    AbstractDataLoadProcessorStep inputProcessorStep =
+        new InputProcessorStepImpl(configuration, inputIterators);
+    // 2. Converts the data like dictionary or non dictionary or complex 
objects depends on
+    // data types and configurations.
+    AbstractDataLoadProcessorStep converterProcessorStep =
+        new DataConverterProcessorStepImpl(configuration, inputProcessorStep);
+    // 3. Writes the sorted data in carbondata format.
+    AbstractDataLoadProcessorStep writerProcessorStep =
+        new CarbonRowDataWriterProcessorStepImpl(configuration, 
converterProcessorStep);
+    return writerProcessorStep;
+  }
+
   private AbstractDataLoadProcessorStep 
buildInternalForBatchSort(CarbonIterator[] inputIterators,
       CarbonDataLoadConfiguration configuration) {
     // 1. Reads the data input iterators and parses the data.
@@ -97,9 +113,8 @@ public final class DataLoadProcessBuilder {
     AbstractDataLoadProcessorStep converterProcessorStep =
         new DataConverterProcessorStepImpl(configuration, inputProcessorStep);
     // 3. Sorts the data by SortColumn or not
-    AbstractDataLoadProcessorStep sortProcessorStep = 
configuration.isSortTable() ?
-        new SortProcessorStepImpl(configuration, converterProcessorStep) :
-        new NoSortProcessorStepImpl(configuration, converterProcessorStep);
+    AbstractDataLoadProcessorStep sortProcessorStep =
+        new SortProcessorStepImpl(configuration, converterProcessorStep);
     // 4. Writes the sorted data in carbondata format.
     AbstractDataLoadProcessorStep writerProcessorStep =
         new DataWriterBatchProcessorStepImpl(configuration, sortProcessorStep);
@@ -116,9 +131,8 @@ public final class DataLoadProcessBuilder {
     AbstractDataLoadProcessorStep converterProcessorStep =
         new DataConverterProcessorWithBucketingStepImpl(configuration, 
inputProcessorStep);
     // 3. Sorts the data by SortColumn or not
-    AbstractDataLoadProcessorStep sortProcessorStep = 
configuration.isSortTable() ?
-        new SortProcessorStepImpl(configuration, converterProcessorStep) :
-        new NoSortProcessorStepImpl(configuration, converterProcessorStep);
+    AbstractDataLoadProcessorStep sortProcessorStep =
+        new SortProcessorStepImpl(configuration, converterProcessorStep);
     // 4. Writes the sorted data in carbondata format.
     AbstractDataLoadProcessorStep writerProcessorStep =
         new DataWriterProcessorStepImpl(configuration, sortProcessorStep);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/dbc5262a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java
new file mode 100644
index 0000000..929a09e
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.newflow.steps;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.Iterator;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
+import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.newflow.DataField;
+import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
+import org.apache.carbondata.processing.store.CarbonFactHandler;
+import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
+import 
org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It reads data from sorted files which are generated in previous sort step.
+ * And it writes data to carbondata file. It also generates mdk key while 
writing to carbondata file
+ */
+public class CarbonRowDataWriterProcessorStepImpl extends 
AbstractDataLoadProcessorStep {
+
+  private static final LogService LOGGER =
+      
LogServiceFactory.getLogService(CarbonRowDataWriterProcessorStepImpl.class.getName());
+
+  private SegmentProperties segmentProperties;
+
+  private KeyGenerator keyGenerator;
+
+  private int dimensionWithComplexCount;
+
+  private int noDictWithComplextCount;
+
+  private boolean[] isNoDictionaryDimensionColumn;
+
+  private char[] aggType;
+
+  private int dimensionCount;
+
+  private int measureCount;
+
+  private long[] readCounter;
+
+  private long[] writeCounter;
+
+  private int outputLength;
+
+  private CarbonTableIdentifier tableIdentifier;
+
+  private String tableName;
+
+  public CarbonRowDataWriterProcessorStepImpl(CarbonDataLoadConfiguration 
configuration,
+      AbstractDataLoadProcessorStep child) {
+    super(configuration, child);
+  }
+
+  @Override public DataField[] getOutput() {
+    return child.getOutput();
+  }
+
+  @Override public void initialize() throws IOException {
+    child.initialize();
+  }
+
+  private String getStoreLocation(CarbonTableIdentifier tableIdentifier, 
String partitionId) {
+    String storeLocation = CarbonDataProcessorUtil
+        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
+            tableIdentifier.getTableName(), 
String.valueOf(configuration.getTaskNo()), partitionId,
+            configuration.getSegmentId() + "", false);
+    new File(storeLocation).mkdirs();
+    return storeLocation;
+  }
+
+  @Override public Iterator<CarbonRowBatch>[] execute() throws 
CarbonDataLoadingException {
+    final Iterator<CarbonRowBatch>[] iterators = child.execute();
+    tableIdentifier = 
configuration.getTableIdentifier().getCarbonTableIdentifier();
+    tableName = tableIdentifier.getTableName();
+    try {
+      readCounter = new long[iterators.length];
+      writeCounter = new long[iterators.length];
+      dimensionWithComplexCount = configuration.getDimensionCount();
+      noDictWithComplextCount =
+          configuration.getNoDictionaryCount() + 
configuration.getComplexDimensionCount();
+      dimensionCount = configuration.getDimensionCount() - 
noDictWithComplextCount;
+      isNoDictionaryDimensionColumn =
+          
CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields());
+      aggType = CarbonDataProcessorUtil
+          .getAggType(configuration.getMeasureCount(), 
configuration.getMeasureFields());
+
+      CarbonFactDataHandlerModel dataHandlerModel = CarbonFactDataHandlerModel
+          .createCarbonFactDataHandlerModel(configuration,
+              getStoreLocation(tableIdentifier, String.valueOf(0)), 0, 0);
+      measureCount = dataHandlerModel.getMeasureCount();
+      outputLength = measureCount + (this.noDictWithComplextCount > 0 ? 1 : 0) 
+ 1;
+      segmentProperties = dataHandlerModel.getSegmentProperties();
+      keyGenerator = segmentProperties.getDimensionKeyGenerator();
+
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+          
.recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+              System.currentTimeMillis());
+
+      if (iterators.length == 1) {
+        doExecute(iterators[0], 0, 0);
+      } else {
+        ExecutorService executorService = 
Executors.newFixedThreadPool(iterators.length);
+        Future[] futures = new Future[iterators.length];
+        for (int i = 0; i < iterators.length; i++) {
+          futures[i] = executorService.submit(new 
DataWriterRunnable(iterators[i], i));
+        }
+        for (Future future : futures) {
+          future.get();
+        }
+      }
+    } catch (CarbonDataWriterException e) {
+      LOGGER.error(e, "Failed for table: " + tableName + " in 
DataWriterProcessorStepImpl");
+      throw new CarbonDataLoadingException(
+          "Error while initializing data handler : " + e.getMessage());
+    } catch (Exception e) {
+      LOGGER.error(e, "Failed for table: " + tableName + " in 
DataWriterProcessorStepImpl");
+      throw new CarbonDataLoadingException("There is an unexpected error: " + 
e.getMessage(), e);
+    }
+    return null;
+  }
+
+  private void doExecute(Iterator<CarbonRowBatch> iterator, int partitionId, 
int iteratorIndex) {
+    String storeLocation = getStoreLocation(tableIdentifier, 
String.valueOf(partitionId));
+    CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
+        .createCarbonFactDataHandlerModel(configuration, storeLocation, 
partitionId, 0);
+    model.getCarbonDataFileAttributes()
+        
.setFactTimeStamp(model.getCarbonDataFileAttributes().getFactTimeStamp() + 
iteratorIndex);
+    CarbonFactHandler dataHandler = null;
+    boolean rowsNotExist = true;
+    while (iterator.hasNext()) {
+      if (rowsNotExist) {
+        rowsNotExist = false;
+        dataHandler = CarbonFactHandlerFactory
+            .createCarbonFactHandler(model, 
CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
+        dataHandler.initialise();
+      }
+      processBatch(iterator.next(), dataHandler, iteratorIndex);
+    }
+    if (!rowsNotExist) {
+      finish(dataHandler, iteratorIndex);
+    }
+  }
+
+  @Override protected String getStepName() {
+    return "Data Writer";
+  }
+
+  private void finish(CarbonFactHandler dataHandler, int iteratorIndex) {
+    try {
+      dataHandler.finish();
+    } catch (Exception e) {
+      LOGGER.error(e, "Failed for table: " + tableName + " in  finishing data 
handler");
+    }
+    LOGGER.info("Record Processed For table: " + tableName);
+    String logMessage =
+        "Finished Carbon DataWriterProcessorStepImpl: Read: " + 
readCounter[iteratorIndex]
+            + ": Write: " + readCounter[iteratorIndex];
+    LOGGER.info(logMessage);
+    
CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
+    processingComplete(dataHandler);
+    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+        .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+            System.currentTimeMillis());
+    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+        .recordMdkGenerateTotalTime(configuration.getPartitionId(), 
System.currentTimeMillis());
+  }
+
+  private void processingComplete(CarbonFactHandler dataHandler) throws 
CarbonDataLoadingException {
+    if (null != dataHandler) {
+      try {
+        dataHandler.closeHandler();
+      } catch (CarbonDataWriterException e) {
+        LOGGER.error(e, e.getMessage());
+        throw new CarbonDataLoadingException(e.getMessage());
+      } catch (Exception e) {
+        LOGGER.error(e, e.getMessage());
+        throw new CarbonDataLoadingException("There is an unexpected error: " 
+ e.getMessage());
+      }
+    }
+  }
+
+  /**
+   * convert input CarbonRow to output CarbonRow
+   * e.g. There is a table as following,
+   * the number of dictionary dimensions is a,
+   * the number of no-dictionary dimensions is b,
+   * the number of complex dimensions is c,
+   * the number of measures is d.
+   * input CarbonRow format:  the length of Object[] data is a+b+c+d, the 
number of all columns.
+   * 
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+   * | Part                     | Object item                    | describe    
             |
+   * 
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+   * | Object[0 ~ a+b-1]        | Integer, byte[], Integer, ...  | dict + no 
dict dimensions|
+   * 
----------------------------------------------------------------------------------------
+   * | Object[a+b ~ a+b+c-1]    | byte[], byte[], ...            | complex 
dimensions       |
+   * 
----------------------------------------------------------------------------------------
+   * | Object[a+b+c ~ a+b+c+d-1]| int, byte[], ...               | measures    
             |
+   * 
----------------------------------------------------------------------------------------
+   * output CarbonRow format: the length of object[] data is d + (b+c>0?1:0) + 
1.
+   * 
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+   * | Part                     | Object item                    | describe    
             |
+   * 
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+   * | Object[0 ~ d-1]          | int, byte[], ...               | measures    
             |
+   * 
----------------------------------------------------------------------------------------
+   * | Object[d]                | byte[b+c][]                    | no dict + 
complex dim    |
+   * 
----------------------------------------------------------------------------------------
+   * | Object[d+1]              | byte[]                         | mdkey       
             |
+   * 
----------------------------------------------------------------------------------------
+   *
+   * @param row
+   * @return
+   */
+  private Object[] convertRow(CarbonRow row) throws KeyGenException {
+    int dictIndex = 0;
+    int nonDicIndex = 0;
+    int[] dim = new int[this.dimensionCount];
+    byte[][] nonDicArray = new byte[this.noDictWithComplextCount][];
+    // read dimension values
+    int dimCount = 0;
+    for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
+      if (isNoDictionaryDimensionColumn[dimCount]) {
+        nonDicArray[nonDicIndex++] = (byte[]) row.getObject(dimCount);
+      } else {
+        dim[dictIndex++] = (int) row.getObject(dimCount);
+      }
+    }
+
+    for (; dimCount < this.dimensionWithComplexCount; dimCount++) {
+      nonDicArray[nonDicIndex++] = (byte[]) row.getObject(dimCount);
+    }
+
+    int l = 0;
+    Object[] outputRow = new Object[outputLength];
+    for (; l < this.measureCount; l++) {
+      Object value = row.getObject(l + this.dimensionWithComplexCount);
+      if (null != value) {
+        if (aggType[l] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
+          BigDecimal val = (BigDecimal) value;
+          outputRow[l] = DataTypeUtil.bigDecimalToByte(val);
+        } else {
+          outputRow[l] = value;
+        }
+      } else {
+        outputRow[l] = null;
+      }
+    }
+
+    if (this.noDictWithComplextCount > 0) {
+      outputRow[l++] = nonDicArray;
+    }
+    outputRow[l] = keyGenerator.generateKey(dim);
+    return outputRow;
+  }
+
+  private void processBatch(CarbonRowBatch batch, CarbonFactHandler 
dataHandler, int iteratorIndex)
+      throws CarbonDataLoadingException {
+    try {
+      while (batch.hasNext()) {
+        dataHandler.addDataToStore(convertRow(batch.next()));
+        readCounter[iteratorIndex]++;
+      }
+      writeCounter[iteratorIndex] += batch.getSize();
+    } catch (Exception e) {
+      throw new CarbonDataLoadingException("unable to generate the mdkey", e);
+    }
+    rowCounter.getAndAdd(batch.getSize());
+  }
+
+  @Override protected CarbonRow processRow(CarbonRow row) {
+    return null;
+  }
+
+  class DataWriterRunnable implements Runnable {
+
+    private Iterator<CarbonRowBatch> iterator;
+    private int iteratorIndex = 0;
+
+    DataWriterRunnable(Iterator<CarbonRowBatch> iterator, int iteratorIndex) {
+      this.iterator = iterator;
+      this.iteratorIndex = iteratorIndex;
+    }
+
+    @Override public void run() {
+      doExecute(this.iterator, 0, iteratorIndex);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/dbc5262a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/NoSortProcessorStepImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/NoSortProcessorStepImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/NoSortProcessorStepImpl.java
deleted file mode 100644
index bde89ed..0000000
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/NoSortProcessorStepImpl.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.steps;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
-import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.row.CarbonRow;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-import org.apache.carbondata.processing.util.NonDictionaryUtil;
-
-/**
- * if the table doesn't have sort_columns, just convert row format.
- */
-public class NoSortProcessorStepImpl extends AbstractDataLoadProcessorStep {
-
-  private int dimensionCount;
-
-  private int dimensionWithComplexCount;
-
-  private int noDictCount;
-
-  private int measureCount;
-
-  private boolean[] isNoDictionaryDimensionColumn;
-
-  private char[] aggType;
-
-  public NoSortProcessorStepImpl(CarbonDataLoadConfiguration configuration,
-      AbstractDataLoadProcessorStep child) {
-    super(configuration, child);
-    this.dimensionWithComplexCount = configuration.getDimensionCount();
-    this.noDictCount =
-        configuration.getNoDictionaryCount() + 
configuration.getComplexDimensionCount();
-    this.dimensionCount = configuration.getDimensionCount() - this.noDictCount;
-    this.measureCount = configuration.getMeasureCount();
-    this.isNoDictionaryDimensionColumn =
-        
CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields());
-    this.aggType = CarbonDataProcessorUtil
-        .getAggType(configuration.getMeasureCount(), 
configuration.getMeasureFields());
-  }
-
-  @Override public DataField[] getOutput() {
-    return child.getOutput();
-  }
-
-  @Override public void initialize() throws IOException {
-    child.initialize();
-  }
-
-  /**
-   * convert input CarbonRow to output CarbonRow
-   * e.g. There is a table as following,
-   * the number of dictionary dimensions is a,
-   * the number of no-dictionary dimensions is b,
-   * the number of complex dimensions is c,
-   * the number of measures is d.
-   * input CarbonRow format:  the length of Object[] data is a+b+c+d, the 
number of all columns.
-   * 
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
-   * | Part                     | Object item                    | describe    
             |
-   * 
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
-   * | Object[0 ~ a+b-1]        | Integer, byte[], Integer, ...  | dict + no 
dict dimensions|
-   * 
----------------------------------------------------------------------------------------
-   * | Object[a+b ~ a+b+c-1]    | byte[], byte[], ...            | complex 
dimensions       |
-   * 
----------------------------------------------------------------------------------------
-   * | Object[a+b+c ~ a+b+c+d-1]| int, byte[], ...               | measures    
             |
-   * 
----------------------------------------------------------------------------------------
-   * output CarbonRow format: the length of object[] data is 3.
-   * 
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
-   * | Part                     | Object item                    | describe    
             |
-   * 
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
-   * | Object[0]                | int[a]                         | dict 
dimension array     |
-   * 
----------------------------------------------------------------------------------------
-   * | Object[1]                | byte[b+c][]                    | no dict + 
complex dim    |
-   * 
----------------------------------------------------------------------------------------
-   * | Object[2]                | Object[d]                      | measures    
             |
-   * 
----------------------------------------------------------------------------------------
-   * @param row
-   * @return
-   */
-  @Override protected CarbonRow processRow(CarbonRow row) {
-    int dictIndex = 0;
-    int nonDicIndex = 0;
-    int[] dim = new int[this.dimensionCount];
-    byte[][] nonDicArray = new byte[this.noDictCount][];
-    Object[] measures = new Object[this.measureCount];
-    // read dimension values
-    int dimCount = 0;
-    for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
-      if (isNoDictionaryDimensionColumn[dimCount]) {
-        nonDicArray[nonDicIndex++] = (byte[]) row.getObject(dimCount);
-      } else {
-        dim[dictIndex++] = (int) row.getObject(dimCount);
-      }
-    }
-
-    for (; dimCount < this.dimensionWithComplexCount; dimCount++) {
-      nonDicArray[nonDicIndex++] = (byte[]) row.getObject(dimCount);
-    }
-
-    // measure values
-    for (int mesCount = 0; mesCount < this.measureCount; mesCount++) {
-      Object value = row.getObject(mesCount + this.dimensionWithComplexCount);
-      if (null != value) {
-        if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
-          measures[mesCount] = value;
-        } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) 
{
-          measures[mesCount] = value;
-        } else if (aggType[mesCount] == 
CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-          BigDecimal val = (BigDecimal) value;
-          measures[mesCount] = DataTypeUtil.bigDecimalToByte(val);
-        }
-      } else {
-        measures[mesCount] = null;
-      }
-    }
-    // create new row of size 3 (1 for dims , 1 for high card , 1 for measures)
-    Object[] holder = new Object[3];
-    NonDictionaryUtil.prepareOutObj(holder, dim, nonDicArray, measures);
-    //return out row
-    return new CarbonRow(holder);
-  }
-
-  @Override
-  public void close() {
-    if (!closed) {
-      super.close();
-    }
-  }
-
-  @Override protected String getStepName() {
-    return "No Sort Processor";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/dbc5262a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java
index 188c468..0b606b0 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java
@@ -39,13 +39,13 @@ public class CarbonDataFileAttributes {
   /**
    * load start time
    */
-  private String factTimeStamp;
+  private long factTimeStamp;
 
   /**
    * @param taskId
    * @param factTimeStamp
    */
-  public CarbonDataFileAttributes(int taskId, String factTimeStamp) {
+  public CarbonDataFileAttributes(int taskId, long factTimeStamp) {
     this.taskId = taskId;
     this.factTimeStamp = factTimeStamp;
   }
@@ -57,10 +57,14 @@ public class CarbonDataFileAttributes {
     return taskId;
   }
 
+  public void setFactTimeStamp(long factTimeStamp) {
+    this.factTimeStamp = factTimeStamp;
+  }
+
   /**
    * @return fact time stamp which is load start time
    */
-  public String getFactTimeStamp() {
+  public long getFactTimeStamp() {
     return factTimeStamp;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/dbc5262a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 1c94bf1..75efe56 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -265,7 +265,7 @@ public class CarbonFactDataHandlerModel {
 
     CarbonDataFileAttributes carbonDataFileAttributes =
         new 
CarbonDataFileAttributes(Integer.parseInt(configuration.getTaskNo()),
-            (String) 
configuration.getDataLoadProperty(DataLoadProcessorConstants.FACT_TIME_STAMP));
+            (Long) 
configuration.getDataLoadProperty(DataLoadProcessorConstants.FACT_TIME_STAMP));
     String carbonDataDirectoryPath = 
getCarbonDataFolderLocation(configuration);
 
     CarbonFactDataHandlerModel carbonFactDataHandlerModel = new 
CarbonFactDataHandlerModel();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/dbc5262a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index cda907c..770d24c 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -296,7 +296,7 @@ public abstract class AbstractFactDataWriter<T> implements 
CarbonFactDataWriter<
     this.carbonDataFileName = carbonTablePath
         .getCarbonDataFileName(fileCount, 
dataWriterVo.getCarbonDataFileAttributes().getTaskId(),
             dataWriterVo.getBucketNumber(), dataWriterVo.getTaskExtension(),
-            dataWriterVo.getCarbonDataFileAttributes().getFactTimeStamp());
+            "" + 
dataWriterVo.getCarbonDataFileAttributes().getFactTimeStamp());
     String actualFileNameVal = carbonDataFileName + 
CarbonCommonConstants.FILE_INPROGRESS_STATUS;
     FileData fileData = new FileData(actualFileNameVal, 
dataWriterVo.getStoreLocation());
     dataWriterVo.getFileManager().add(fileData);
@@ -447,7 +447,7 @@ public abstract class AbstractFactDataWriter<T> implements 
CarbonFactDataWriter<
     String fileName = dataWriterVo.getStoreLocation() + File.separator + 
carbonTablePath
         
.getCarbonIndexFileName(dataWriterVo.getCarbonDataFileAttributes().getTaskId(),
             dataWriterVo.getBucketNumber(), dataWriterVo.getTaskExtension(),
-            dataWriterVo.getCarbonDataFileAttributes().getFactTimeStamp());
+            "" + 
dataWriterVo.getCarbonDataFileAttributes().getFactTimeStamp());
     CarbonIndexFileWriter writer = new CarbonIndexFileWriter();
     // open file
     writer.openThriftWriter(fileName);

Reply via email to