no sort datawriter
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/dbc5262a Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/dbc5262a Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/dbc5262a Branch: refs/heads/12-dev Commit: dbc5262a0186a0da60fc1fb289162b1f34313aba Parents: a39e777 Author: QiangCai <qiang...@qq.com> Authored: Thu Apr 6 00:09:06 2017 +0530 Committer: jackylk <jacky.li...@huawei.com> Committed: Sun Apr 9 11:25:38 2017 +0800 ---------------------------------------------------------------------- .../processing/merger/CarbonDataMergerUtil.java | 2 +- .../processing/model/CarbonLoadModel.java | 6 +- .../newflow/DataLoadProcessBuilder.java | 38 ++- .../CarbonRowDataWriterProcessorStepImpl.java | 321 +++++++++++++++++++ .../newflow/steps/NoSortProcessorStepImpl.java | 153 --------- .../store/CarbonDataFileAttributes.java | 10 +- .../store/CarbonFactDataHandlerModel.java | 2 +- .../store/writer/AbstractFactDataWriter.java | 4 +- 8 files changed, 361 insertions(+), 175 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/dbc5262a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java index 2414993..cbd5a25 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java @@ -170,7 +170,7 @@ public final class CarbonDataMergerUtil { boolean updateLockStatus = false; boolean tableLockStatus = false; - String timestamp = carbonLoadModel.getFactTimeStamp(); + String timestamp = "" + carbonLoadModel.getFactTimeStamp(); List<String> updatedDeltaFilesList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/dbc5262a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java index 525874f..963a51b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java @@ -82,7 +82,7 @@ public class CarbonLoadModel implements Serializable { /** * new load start time */ - private String factTimeStamp; + private long factTimeStamp; /** * load Id */ @@ -587,7 +587,7 @@ public class CarbonLoadModel implements Serializable { /** * @return */ - public String getFactTimeStamp() { + public long getFactTimeStamp() { return factTimeStamp; } @@ -595,7 +595,7 @@ public class CarbonLoadModel implements Serializable { * @param factTimeStamp */ public void setFactTimeStamp(long factTimeStamp) { - this.factTimeStamp = factTimeStamp + ""; + this.factTimeStamp = factTimeStamp; } public String[] getDelimiters() { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/dbc5262a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java index 7dbffaf..b8b098b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java @@ -35,12 +35,12 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.processing.model.CarbonLoadModel; import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants; +import org.apache.carbondata.processing.newflow.steps.CarbonRowDataWriterProcessorStepImpl; import org.apache.carbondata.processing.newflow.steps.DataConverterProcessorStepImpl; import org.apache.carbondata.processing.newflow.steps.DataConverterProcessorWithBucketingStepImpl; import org.apache.carbondata.processing.newflow.steps.DataWriterBatchProcessorStepImpl; import org.apache.carbondata.processing.newflow.steps.DataWriterProcessorStepImpl; import org.apache.carbondata.processing.newflow.steps.InputProcessorStepImpl; -import org.apache.carbondata.processing.newflow.steps.NoSortProcessorStepImpl; import org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl; import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; @@ -59,7 +59,9 @@ public final class DataLoadProcessBuilder { CarbonCommonConstants.LOAD_USE_BATCH_SORT_DEFAULT)); CarbonDataLoadConfiguration configuration = createConfiguration(loadModel, storeLocation); - if (configuration.getBucketingInfo() != null) { + if (!configuration.isSortTable()) { + return buildInternalForNoSort(inputIterators, configuration); + } else if (configuration.getBucketingInfo() != null) { return buildInternalForBucketing(inputIterators, configuration); } else if (batchSort) { return buildInternalForBatchSort(inputIterators, configuration); @@ -77,16 +79,30 @@ public final class DataLoadProcessBuilder { // data types and configurations. AbstractDataLoadProcessorStep converterProcessorStep = new DataConverterProcessorStepImpl(configuration, inputProcessorStep); - // 3. Sorts the data by SortColumn or not - AbstractDataLoadProcessorStep sortProcessorStep = configuration.isSortTable() ? - new SortProcessorStepImpl(configuration, converterProcessorStep) : - new NoSortProcessorStepImpl(configuration, converterProcessorStep); + // 3. Sorts the data by SortColumn + AbstractDataLoadProcessorStep sortProcessorStep = + new SortProcessorStepImpl(configuration, converterProcessorStep); // 4. Writes the sorted data in carbondata format. AbstractDataLoadProcessorStep writerProcessorStep = new DataWriterProcessorStepImpl(configuration, sortProcessorStep); return writerProcessorStep; } + private AbstractDataLoadProcessorStep buildInternalForNoSort(CarbonIterator[] inputIterators, + CarbonDataLoadConfiguration configuration) { + // 1. Reads the data input iterators and parses the data. + AbstractDataLoadProcessorStep inputProcessorStep = + new InputProcessorStepImpl(configuration, inputIterators); + // 2. Converts the data like dictionary or non dictionary or complex objects depends on + // data types and configurations. + AbstractDataLoadProcessorStep converterProcessorStep = + new DataConverterProcessorStepImpl(configuration, inputProcessorStep); + // 3. Writes the sorted data in carbondata format. + AbstractDataLoadProcessorStep writerProcessorStep = + new CarbonRowDataWriterProcessorStepImpl(configuration, converterProcessorStep); + return writerProcessorStep; + } + private AbstractDataLoadProcessorStep buildInternalForBatchSort(CarbonIterator[] inputIterators, CarbonDataLoadConfiguration configuration) { // 1. Reads the data input iterators and parses the data. @@ -97,9 +113,8 @@ public final class DataLoadProcessBuilder { AbstractDataLoadProcessorStep converterProcessorStep = new DataConverterProcessorStepImpl(configuration, inputProcessorStep); // 3. Sorts the data by SortColumn or not - AbstractDataLoadProcessorStep sortProcessorStep = configuration.isSortTable() ? - new SortProcessorStepImpl(configuration, converterProcessorStep) : - new NoSortProcessorStepImpl(configuration, converterProcessorStep); + AbstractDataLoadProcessorStep sortProcessorStep = + new SortProcessorStepImpl(configuration, converterProcessorStep); // 4. Writes the sorted data in carbondata format. AbstractDataLoadProcessorStep writerProcessorStep = new DataWriterBatchProcessorStepImpl(configuration, sortProcessorStep); @@ -116,9 +131,8 @@ public final class DataLoadProcessBuilder { AbstractDataLoadProcessorStep converterProcessorStep = new DataConverterProcessorWithBucketingStepImpl(configuration, inputProcessorStep); // 3. Sorts the data by SortColumn or not - AbstractDataLoadProcessorStep sortProcessorStep = configuration.isSortTable() ? - new SortProcessorStepImpl(configuration, converterProcessorStep) : - new NoSortProcessorStepImpl(configuration, converterProcessorStep); + AbstractDataLoadProcessorStep sortProcessorStep = + new SortProcessorStepImpl(configuration, converterProcessorStep); // 4. Writes the sorted data in carbondata format. AbstractDataLoadProcessorStep writerProcessorStep = new DataWriterProcessorStepImpl(configuration, sortProcessorStep); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/dbc5262a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java new file mode 100644 index 0000000..929a09e --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.processing.newflow.steps; + +import java.io.File; +import java.io.IOException; +import java.math.BigDecimal; +import java.util.Iterator; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.keygenerator.KeyGenException; +import org.apache.carbondata.core.keygenerator.KeyGenerator; +import org.apache.carbondata.core.metadata.CarbonTableIdentifier; +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep; +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration; +import org.apache.carbondata.processing.newflow.DataField; +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; +import org.apache.carbondata.processing.newflow.row.CarbonRow; +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch; +import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel; +import org.apache.carbondata.processing.store.CarbonFactHandler; +import org.apache.carbondata.processing.store.CarbonFactHandlerFactory; +import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; + +/** + * It reads data from sorted files which are generated in previous sort step. + * And it writes data to carbondata file. It also generates mdk key while writing to carbondata file + */ +public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(CarbonRowDataWriterProcessorStepImpl.class.getName()); + + private SegmentProperties segmentProperties; + + private KeyGenerator keyGenerator; + + private int dimensionWithComplexCount; + + private int noDictWithComplextCount; + + private boolean[] isNoDictionaryDimensionColumn; + + private char[] aggType; + + private int dimensionCount; + + private int measureCount; + + private long[] readCounter; + + private long[] writeCounter; + + private int outputLength; + + private CarbonTableIdentifier tableIdentifier; + + private String tableName; + + public CarbonRowDataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration, + AbstractDataLoadProcessorStep child) { + super(configuration, child); + } + + @Override public DataField[] getOutput() { + return child.getOutput(); + } + + @Override public void initialize() throws IOException { + child.initialize(); + } + + private String getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) { + String storeLocation = CarbonDataProcessorUtil + .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(), + tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId, + configuration.getSegmentId() + "", false); + new File(storeLocation).mkdirs(); + return storeLocation; + } + + @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException { + final Iterator<CarbonRowBatch>[] iterators = child.execute(); + tableIdentifier = configuration.getTableIdentifier().getCarbonTableIdentifier(); + tableName = tableIdentifier.getTableName(); + try { + readCounter = new long[iterators.length]; + writeCounter = new long[iterators.length]; + dimensionWithComplexCount = configuration.getDimensionCount(); + noDictWithComplextCount = + configuration.getNoDictionaryCount() + configuration.getComplexDimensionCount(); + dimensionCount = configuration.getDimensionCount() - noDictWithComplextCount; + isNoDictionaryDimensionColumn = + CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields()); + aggType = CarbonDataProcessorUtil + .getAggType(configuration.getMeasureCount(), configuration.getMeasureFields()); + + CarbonFactDataHandlerModel dataHandlerModel = CarbonFactDataHandlerModel + .createCarbonFactDataHandlerModel(configuration, + getStoreLocation(tableIdentifier, String.valueOf(0)), 0, 0); + measureCount = dataHandlerModel.getMeasureCount(); + outputLength = measureCount + (this.noDictWithComplextCount > 0 ? 1 : 0) + 1; + segmentProperties = dataHandlerModel.getSegmentProperties(); + keyGenerator = segmentProperties.getDimensionKeyGenerator(); + + CarbonTimeStatisticsFactory.getLoadStatisticsInstance() + .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(), + System.currentTimeMillis()); + + if (iterators.length == 1) { + doExecute(iterators[0], 0, 0); + } else { + ExecutorService executorService = Executors.newFixedThreadPool(iterators.length); + Future[] futures = new Future[iterators.length]; + for (int i = 0; i < iterators.length; i++) { + futures[i] = executorService.submit(new DataWriterRunnable(iterators[i], i)); + } + for (Future future : futures) { + future.get(); + } + } + } catch (CarbonDataWriterException e) { + LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl"); + throw new CarbonDataLoadingException( + "Error while initializing data handler : " + e.getMessage()); + } catch (Exception e) { + LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl"); + throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage(), e); + } + return null; + } + + private void doExecute(Iterator<CarbonRowBatch> iterator, int partitionId, int iteratorIndex) { + String storeLocation = getStoreLocation(tableIdentifier, String.valueOf(partitionId)); + CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel + .createCarbonFactDataHandlerModel(configuration, storeLocation, partitionId, 0); + model.getCarbonDataFileAttributes() + .setFactTimeStamp(model.getCarbonDataFileAttributes().getFactTimeStamp() + iteratorIndex); + CarbonFactHandler dataHandler = null; + boolean rowsNotExist = true; + while (iterator.hasNext()) { + if (rowsNotExist) { + rowsNotExist = false; + dataHandler = CarbonFactHandlerFactory + .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR); + dataHandler.initialise(); + } + processBatch(iterator.next(), dataHandler, iteratorIndex); + } + if (!rowsNotExist) { + finish(dataHandler, iteratorIndex); + } + } + + @Override protected String getStepName() { + return "Data Writer"; + } + + private void finish(CarbonFactHandler dataHandler, int iteratorIndex) { + try { + dataHandler.finish(); + } catch (Exception e) { + LOGGER.error(e, "Failed for table: " + tableName + " in finishing data handler"); + } + LOGGER.info("Record Processed For table: " + tableName); + String logMessage = + "Finished Carbon DataWriterProcessorStepImpl: Read: " + readCounter[iteratorIndex] + + ": Write: " + readCounter[iteratorIndex]; + LOGGER.info(logMessage); + CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get()); + processingComplete(dataHandler); + CarbonTimeStatisticsFactory.getLoadStatisticsInstance() + .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(), + System.currentTimeMillis()); + CarbonTimeStatisticsFactory.getLoadStatisticsInstance() + .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis()); + } + + private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException { + if (null != dataHandler) { + try { + dataHandler.closeHandler(); + } catch (CarbonDataWriterException e) { + LOGGER.error(e, e.getMessage()); + throw new CarbonDataLoadingException(e.getMessage()); + } catch (Exception e) { + LOGGER.error(e, e.getMessage()); + throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage()); + } + } + } + + /** + * convert input CarbonRow to output CarbonRow + * e.g. There is a table as following, + * the number of dictionary dimensions is a, + * the number of no-dictionary dimensions is b, + * the number of complex dimensions is c, + * the number of measures is d. + * input CarbonRow format: the length of Object[] data is a+b+c+d, the number of all columns. + * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + * | Part | Object item | describe | + * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + * | Object[0 ~ a+b-1] | Integer, byte[], Integer, ... | dict + no dict dimensions| + * ---------------------------------------------------------------------------------------- + * | Object[a+b ~ a+b+c-1] | byte[], byte[], ... | complex dimensions | + * ---------------------------------------------------------------------------------------- + * | Object[a+b+c ~ a+b+c+d-1]| int, byte[], ... | measures | + * ---------------------------------------------------------------------------------------- + * output CarbonRow format: the length of object[] data is d + (b+c>0?1:0) + 1. + * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + * | Part | Object item | describe | + * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + * | Object[0 ~ d-1] | int, byte[], ... | measures | + * ---------------------------------------------------------------------------------------- + * | Object[d] | byte[b+c][] | no dict + complex dim | + * ---------------------------------------------------------------------------------------- + * | Object[d+1] | byte[] | mdkey | + * ---------------------------------------------------------------------------------------- + * + * @param row + * @return + */ + private Object[] convertRow(CarbonRow row) throws KeyGenException { + int dictIndex = 0; + int nonDicIndex = 0; + int[] dim = new int[this.dimensionCount]; + byte[][] nonDicArray = new byte[this.noDictWithComplextCount][]; + // read dimension values + int dimCount = 0; + for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) { + if (isNoDictionaryDimensionColumn[dimCount]) { + nonDicArray[nonDicIndex++] = (byte[]) row.getObject(dimCount); + } else { + dim[dictIndex++] = (int) row.getObject(dimCount); + } + } + + for (; dimCount < this.dimensionWithComplexCount; dimCount++) { + nonDicArray[nonDicIndex++] = (byte[]) row.getObject(dimCount); + } + + int l = 0; + Object[] outputRow = new Object[outputLength]; + for (; l < this.measureCount; l++) { + Object value = row.getObject(l + this.dimensionWithComplexCount); + if (null != value) { + if (aggType[l] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) { + BigDecimal val = (BigDecimal) value; + outputRow[l] = DataTypeUtil.bigDecimalToByte(val); + } else { + outputRow[l] = value; + } + } else { + outputRow[l] = null; + } + } + + if (this.noDictWithComplextCount > 0) { + outputRow[l++] = nonDicArray; + } + outputRow[l] = keyGenerator.generateKey(dim); + return outputRow; + } + + private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler, int iteratorIndex) + throws CarbonDataLoadingException { + try { + while (batch.hasNext()) { + dataHandler.addDataToStore(convertRow(batch.next())); + readCounter[iteratorIndex]++; + } + writeCounter[iteratorIndex] += batch.getSize(); + } catch (Exception e) { + throw new CarbonDataLoadingException("unable to generate the mdkey", e); + } + rowCounter.getAndAdd(batch.getSize()); + } + + @Override protected CarbonRow processRow(CarbonRow row) { + return null; + } + + class DataWriterRunnable implements Runnable { + + private Iterator<CarbonRowBatch> iterator; + private int iteratorIndex = 0; + + DataWriterRunnable(Iterator<CarbonRowBatch> iterator, int iteratorIndex) { + this.iterator = iterator; + this.iteratorIndex = iteratorIndex; + } + + @Override public void run() { + doExecute(this.iterator, 0, iteratorIndex); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/dbc5262a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/NoSortProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/NoSortProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/NoSortProcessorStepImpl.java deleted file mode 100644 index bde89ed..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/NoSortProcessorStepImpl.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.processing.newflow.steps; - -import java.io.IOException; -import java.math.BigDecimal; - -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.util.DataTypeUtil; -import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep; -import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration; -import org.apache.carbondata.processing.newflow.DataField; -import org.apache.carbondata.processing.newflow.row.CarbonRow; -import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; -import org.apache.carbondata.processing.util.NonDictionaryUtil; - -/** - * if the table doesn't have sort_columns, just convert row format. - */ -public class NoSortProcessorStepImpl extends AbstractDataLoadProcessorStep { - - private int dimensionCount; - - private int dimensionWithComplexCount; - - private int noDictCount; - - private int measureCount; - - private boolean[] isNoDictionaryDimensionColumn; - - private char[] aggType; - - public NoSortProcessorStepImpl(CarbonDataLoadConfiguration configuration, - AbstractDataLoadProcessorStep child) { - super(configuration, child); - this.dimensionWithComplexCount = configuration.getDimensionCount(); - this.noDictCount = - configuration.getNoDictionaryCount() + configuration.getComplexDimensionCount(); - this.dimensionCount = configuration.getDimensionCount() - this.noDictCount; - this.measureCount = configuration.getMeasureCount(); - this.isNoDictionaryDimensionColumn = - CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields()); - this.aggType = CarbonDataProcessorUtil - .getAggType(configuration.getMeasureCount(), configuration.getMeasureFields()); - } - - @Override public DataField[] getOutput() { - return child.getOutput(); - } - - @Override public void initialize() throws IOException { - child.initialize(); - } - - /** - * convert input CarbonRow to output CarbonRow - * e.g. There is a table as following, - * the number of dictionary dimensions is a, - * the number of no-dictionary dimensions is b, - * the number of complex dimensions is c, - * the number of measures is d. - * input CarbonRow format: the length of Object[] data is a+b+c+d, the number of all columns. - * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ - * | Part | Object item | describe | - * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ - * | Object[0 ~ a+b-1] | Integer, byte[], Integer, ... | dict + no dict dimensions| - * ---------------------------------------------------------------------------------------- - * | Object[a+b ~ a+b+c-1] | byte[], byte[], ... | complex dimensions | - * ---------------------------------------------------------------------------------------- - * | Object[a+b+c ~ a+b+c+d-1]| int, byte[], ... | measures | - * ---------------------------------------------------------------------------------------- - * output CarbonRow format: the length of object[] data is 3. - * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ - * | Part | Object item | describe | - * ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ - * | Object[0] | int[a] | dict dimension array | - * ---------------------------------------------------------------------------------------- - * | Object[1] | byte[b+c][] | no dict + complex dim | - * ---------------------------------------------------------------------------------------- - * | Object[2] | Object[d] | measures | - * ---------------------------------------------------------------------------------------- - * @param row - * @return - */ - @Override protected CarbonRow processRow(CarbonRow row) { - int dictIndex = 0; - int nonDicIndex = 0; - int[] dim = new int[this.dimensionCount]; - byte[][] nonDicArray = new byte[this.noDictCount][]; - Object[] measures = new Object[this.measureCount]; - // read dimension values - int dimCount = 0; - for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) { - if (isNoDictionaryDimensionColumn[dimCount]) { - nonDicArray[nonDicIndex++] = (byte[]) row.getObject(dimCount); - } else { - dim[dictIndex++] = (int) row.getObject(dimCount); - } - } - - for (; dimCount < this.dimensionWithComplexCount; dimCount++) { - nonDicArray[nonDicIndex++] = (byte[]) row.getObject(dimCount); - } - - // measure values - for (int mesCount = 0; mesCount < this.measureCount; mesCount++) { - Object value = row.getObject(mesCount + this.dimensionWithComplexCount); - if (null != value) { - if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) { - measures[mesCount] = value; - } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) { - measures[mesCount] = value; - } else if (aggType[mesCount] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) { - BigDecimal val = (BigDecimal) value; - measures[mesCount] = DataTypeUtil.bigDecimalToByte(val); - } - } else { - measures[mesCount] = null; - } - } - // create new row of size 3 (1 for dims , 1 for high card , 1 for measures) - Object[] holder = new Object[3]; - NonDictionaryUtil.prepareOutObj(holder, dim, nonDicArray, measures); - //return out row - return new CarbonRow(holder); - } - - @Override - public void close() { - if (!closed) { - super.close(); - } - } - - @Override protected String getStepName() { - return "No Sort Processor"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/dbc5262a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java index 188c468..0b606b0 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java @@ -39,13 +39,13 @@ public class CarbonDataFileAttributes { /** * load start time */ - private String factTimeStamp; + private long factTimeStamp; /** * @param taskId * @param factTimeStamp */ - public CarbonDataFileAttributes(int taskId, String factTimeStamp) { + public CarbonDataFileAttributes(int taskId, long factTimeStamp) { this.taskId = taskId; this.factTimeStamp = factTimeStamp; } @@ -57,10 +57,14 @@ public class CarbonDataFileAttributes { return taskId; } + public void setFactTimeStamp(long factTimeStamp) { + this.factTimeStamp = factTimeStamp; + } + /** * @return fact time stamp which is load start time */ - public String getFactTimeStamp() { + public long getFactTimeStamp() { return factTimeStamp; } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/dbc5262a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index 1c94bf1..75efe56 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -265,7 +265,7 @@ public class CarbonFactDataHandlerModel { CarbonDataFileAttributes carbonDataFileAttributes = new CarbonDataFileAttributes(Integer.parseInt(configuration.getTaskNo()), - (String) configuration.getDataLoadProperty(DataLoadProcessorConstants.FACT_TIME_STAMP)); + (Long) configuration.getDataLoadProperty(DataLoadProcessorConstants.FACT_TIME_STAMP)); String carbonDataDirectoryPath = getCarbonDataFolderLocation(configuration); CarbonFactDataHandlerModel carbonFactDataHandlerModel = new CarbonFactDataHandlerModel(); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/dbc5262a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java index cda907c..770d24c 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java @@ -296,7 +296,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< this.carbonDataFileName = carbonTablePath .getCarbonDataFileName(fileCount, dataWriterVo.getCarbonDataFileAttributes().getTaskId(), dataWriterVo.getBucketNumber(), dataWriterVo.getTaskExtension(), - dataWriterVo.getCarbonDataFileAttributes().getFactTimeStamp()); + "" + dataWriterVo.getCarbonDataFileAttributes().getFactTimeStamp()); String actualFileNameVal = carbonDataFileName + CarbonCommonConstants.FILE_INPROGRESS_STATUS; FileData fileData = new FileData(actualFileNameVal, dataWriterVo.getStoreLocation()); dataWriterVo.getFileManager().add(fileData); @@ -447,7 +447,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< String fileName = dataWriterVo.getStoreLocation() + File.separator + carbonTablePath .getCarbonIndexFileName(dataWriterVo.getCarbonDataFileAttributes().getTaskId(), dataWriterVo.getBucketNumber(), dataWriterVo.getTaskExtension(), - dataWriterVo.getCarbonDataFileAttributes().getFactTimeStamp()); + "" + dataWriterVo.getCarbonDataFileAttributes().getFactTimeStamp()); CarbonIndexFileWriter writer = new CarbonIndexFileWriter(); // open file writer.openThriftWriter(fileName);