http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java index 768dedb..caf121f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java @@ -42,7 +42,6 @@ import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.datamap.DataMapWriterListener; import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep; import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration; -import org.apache.carbondata.processing.loading.DataField; import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException; import org.apache.carbondata.processing.loading.row.CarbonRowBatch; import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel; @@ -78,10 +77,6 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { CarbonUtil.getLocalDictionaryModel(configuration.getTableSpec().getCarbonTable()); } - @Override public DataField[] getOutput() { - return child.getOutput(); - } - @Override public void initialize() throws IOException { super.initialize(); child.initialize(); @@ -187,7 +182,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { if (rowsNotExist) { rowsNotExist = false; dataHandler = CarbonFactHandlerFactory - .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR); + .createCarbonFactHandler(model); dataHandler.initialise(); } processBatch(insideRangeIterator.next(), dataHandler); @@ -260,10 +255,6 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { rowCounter.getAndAdd(1); } - @Override protected CarbonRow processRow(CarbonRow row) { - return null; - } - @Override public void close() { if (!closed) { super.close();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java index 9521db4..c9f5fcc 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java @@ -85,10 +85,6 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep { return outIterators; } - @Override protected CarbonRow processRow(CarbonRow row) { - return null; - } - @Override public void close() { if (!closed) { super.close(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java index 7c4f161..b6858e1 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java @@ -169,10 +169,6 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce return iterators; } - @Override protected CarbonRow processRow(CarbonRow row) { - return null; - } - @Override public void close() { if (!closed) { super.close(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/steps/JsonInputProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/JsonInputProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/JsonInputProcessorStepImpl.java index 892be93..13877f9 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/JsonInputProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/JsonInputProcessorStepImpl.java @@ -21,7 +21,6 @@ import java.util.Iterator; import java.util.List; import org.apache.carbondata.common.CarbonIterator; -import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep; import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration; @@ -72,10 +71,6 @@ public class JsonInputProcessorStepImpl extends AbstractDataLoadProcessorStep { return outIterators; } - @Override protected CarbonRow processRow(CarbonRow row) { - return null; - } - @Override public void close() { if (!closed) { super.close(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/loading/steps/SortProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/SortProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/SortProcessorStepImpl.java index 856d68c..60ae8d9 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/SortProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/SortProcessorStepImpl.java @@ -19,10 +19,8 @@ package org.apache.carbondata.processing.loading.steps; import java.io.IOException; import java.util.Iterator; -import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep; import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration; -import org.apache.carbondata.processing.loading.DataField; import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException; import org.apache.carbondata.processing.loading.row.CarbonRowBatch; import org.apache.carbondata.processing.loading.sort.Sorter; @@ -43,11 +41,6 @@ public class SortProcessorStepImpl extends AbstractDataLoadProcessorStep { } @Override - public DataField[] getOutput() { - return child.getOutput(); - } - - @Override public void initialize() throws IOException { super.initialize(); child.initialize(); @@ -63,11 +56,6 @@ public class SortProcessorStepImpl extends AbstractDataLoadProcessorStep { } @Override - protected CarbonRow processRow(CarbonRow row) { - return null; - } - - @Override public void close() { if (!closed) { super.close(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java index a4d3d2b..75a231e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java @@ -18,7 +18,6 @@ package org.apache.carbondata.processing.merger; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -388,35 +387,4 @@ public class CarbonCompactionUtil { } return restructuredBlockExists; } - - /** - * This method will check for any restructured block in the blocks selected for compaction - * - * @param segmentMapping - * @param tableLastUpdatedTime - * @return - */ - public static boolean checkIfAnyRestructuredBlockExists(Map<String, TaskBlockInfo> segmentMapping, - long tableLastUpdatedTime) { - boolean restructuredBlockExists = false; - for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) { - String segmentId = taskMap.getKey(); - TaskBlockInfo taskBlockInfo = taskMap.getValue(); - Collection<List<TableBlockInfo>> infoList = taskBlockInfo.getAllTableBlockInfoList(); - for (List<TableBlockInfo> listMetadata : infoList) { - for (TableBlockInfo blockInfo : listMetadata) { - // if schema modified timestamp is greater than footer stored schema timestamp, - // it indicates it is a restructured block - if (tableLastUpdatedTime > blockInfo.getDetailInfo().getSchemaUpdatedTimeStamp()) { - restructuredBlockExists = true; - break; - } - } - } - if (restructuredBlockExists) { - break; - } - } - return restructuredBlockExists; - } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java index 81031de..78af751 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java @@ -1185,11 +1185,10 @@ public final class CarbonDataMergerUtil { * @throws IOException */ public static List<CarbonDataMergerUtilResult> compactBlockDeleteDeltaFiles(String seg, - String blockName, CarbonTable table, - SegmentUpdateDetails[] segmentUpdateDetails, Long timestamp) throws IOException { + String blockName, CarbonTable table, SegmentUpdateDetails[] segmentUpdateDetails, + Long timestamp) throws IOException { - SegmentUpdateStatusManager segmentUpdateStatusManager = - new SegmentUpdateStatusManager(table); + SegmentUpdateStatusManager segmentUpdateStatusManager = new SegmentUpdateStatusManager(table); List<CarbonDataMergerUtilResult> resultList = new ArrayList<CarbonDataMergerUtilResult>(1); @@ -1218,11 +1217,8 @@ public final class CarbonDataMergerUtil { blockDetails.setDeleteDeltaEndTimestamp(timestamp.toString()); try { - if (startCompactionDeleteDeltaFiles(deleteFilePathList, blockName, fullBlockFilePath)) { - blockDetails.setCompactionStatus(true); - } else { - blockDetails.setCompactionStatus(false); - } + startCompactionDeleteDeltaFiles(deleteFilePathList, blockName, fullBlockFilePath); + blockDetails.setCompactionStatus(true); resultList.add(blockDetails); } catch (IOException e) { LOGGER.error("Compaction of Delete Delta Files failed. The complete file path is " @@ -1240,7 +1236,7 @@ public final class CarbonDataMergerUtil { * @param fullBlockFilePath * @return */ - public static Boolean startCompactionDeleteDeltaFiles(List<String> deleteDeltaFiles, + public static void startCompactionDeleteDeltaFiles(List<String> deleteDeltaFiles, String blockName, String fullBlockFilePath) throws IOException { DeleteDeltaBlockDetails deleteDeltaBlockDetails = null; @@ -1263,7 +1259,6 @@ public final class CarbonDataMergerUtil { LOGGER.error("Error while writing compacted delete delta file " + fullBlockFilePath); throw new IOException(); } - return true; } public static Boolean updateStatusFile( http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java index ff4e47b..0fc229a 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java @@ -438,8 +438,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { tempStoreLocation, carbonStoreLocation); carbonFactDataHandlerModel.setSegmentId(carbonLoadModel.getSegmentId()); setDataFileAttributesInModel(carbonLoadModel, compactionType, carbonFactDataHandlerModel); - dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(carbonFactDataHandlerModel, - CarbonFactHandlerFactory.FactHandlerType.COLUMNAR); + dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(carbonFactDataHandlerModel); try { dataHandler.initialise(); } catch (CarbonDataWriterException e) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/partition/impl/DefaultLoadBalancer.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/DefaultLoadBalancer.java b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/DefaultLoadBalancer.java deleted file mode 100644 index 93c5260..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/DefaultLoadBalancer.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.processing.partition.impl; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.processing.partition.Partition; - -/** - * A sample load balancer to distribute the partitions to the available nodes in a round robin mode. - */ -public class DefaultLoadBalancer { - private Map<String, List<Partition>> nodeToPartitionMap = - new HashMap<String, List<Partition>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - - private Map<Partition, String> partitionToNodeMap = - new HashMap<Partition, String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - - public DefaultLoadBalancer(List<String> nodes, List<Partition> partitions) { - //Per form a round robin allocation - int nodeCount = nodes.size(); - - int partitioner = 0; - for (Partition partition : partitions) { - int nodeindex = partitioner % nodeCount; - String node = nodes.get(nodeindex); - - List<Partition> oldList = nodeToPartitionMap.get(node); - if (oldList == null) { - oldList = new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN); - nodeToPartitionMap.put(node, oldList); - } - oldList.add(partition); - - partitionToNodeMap.put(partition, node); - - partitioner++; - } - } - - public String getNodeForPartitions(Partition partition) { - return partitionToNodeMap.get(partition); - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/partition/impl/PartitionMultiFileImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/PartitionMultiFileImpl.java b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/PartitionMultiFileImpl.java deleted file mode 100644 index c303efa..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/PartitionMultiFileImpl.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.processing.partition.impl; - -import java.util.List; - -import org.apache.carbondata.processing.partition.Partition; - -public class PartitionMultiFileImpl implements Partition { - private static final long serialVersionUID = -4363447826181193976L; - private String uniqueID; - private List<String> folderPath; - - public PartitionMultiFileImpl(String uniqueID, List<String> folderPath) { - this.uniqueID = uniqueID; - this.folderPath = folderPath; - } - - @Override public String getUniqueID() { - // TODO Auto-generated method stub - return uniqueID; - } - - @Override public List<String> getFilesPath() { - // TODO Auto-generated method stub - return folderPath; - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/partition/impl/SampleDataPartitionerImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/SampleDataPartitionerImpl.java b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/SampleDataPartitionerImpl.java deleted file mode 100644 index 92bd6ff..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/SampleDataPartitionerImpl.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.processing.partition.impl; - -import java.util.List; - -import org.apache.carbondata.processing.partition.DataPartitioner; -import org.apache.carbondata.processing.partition.Partition; - -/** - * Sample partition. - */ -public class SampleDataPartitionerImpl implements DataPartitioner { - - @Override - public List<Partition> getAllPartitions() { - return null; - } - - @Override - public List<Partition> getPartitions() { - return null; - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java index 9b09269..2dc79a3 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java @@ -26,7 +26,6 @@ import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.core.datastore.row.WriteStepRowUtil; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; -import org.apache.carbondata.processing.partition.spliter.exception.AlterPartitionSliceException; import org.apache.carbondata.processing.store.CarbonDataFileAttributes; import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar; import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel; @@ -80,7 +79,7 @@ public class RowResultProcessor { this.dataHandler.finish(); } processStatus = true; - } catch (AlterPartitionSliceException e) { + } catch (CarbonDataWriterException e) { LOGGER.error(e, e.getMessage()); LOGGER.error("Exception in executing RowResultProcessor" + e.getMessage()); processStatus = false; @@ -97,12 +96,12 @@ public class RowResultProcessor { return processStatus; } - private void addRow(Object[] carbonTuple) throws AlterPartitionSliceException { + private void addRow(Object[] carbonTuple) throws CarbonDataWriterException { CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segmentProperties); try { this.dataHandler.addDataToStore(row); } catch (CarbonDataWriterException e) { - throw new AlterPartitionSliceException("Exception in adding rows in RowResultProcessor", e); + throw new CarbonDataWriterException("Exception in adding rows in RowResultProcessor", e); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/exception/AlterPartitionSliceException.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/exception/AlterPartitionSliceException.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/exception/AlterPartitionSliceException.java deleted file mode 100644 index 21b53cf..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/exception/AlterPartitionSliceException.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.processing.partition.spliter.exception; - -import java.util.Locale; - -public class AlterPartitionSliceException extends Exception { - - /** - * default serial version ID. - */ - private static final long serialVersionUID = 1L; - - /** - * The Error message. - */ - private String msg = ""; - - /** - * Constructor - * - * @param msg The error message for this exception. - */ - public AlterPartitionSliceException(String msg) { - super(msg); - this.msg = msg; - } - - /** - * Constructor - * - * @param msg The error message for this exception. - */ - public AlterPartitionSliceException(String msg, Throwable t) { - super(msg, t); - this.msg = msg; - } - - /** - * This method is used to get the localized message. - * - * @param locale - A Locale object represents a specific geographical, - * political, or cultural region. - * @return - Localized error message. - */ - public String getLocalizedMessage(Locale locale) { - return ""; - } - - /** - * getLocalizedMessage - */ - @Override public String getLocalizedMessage() { - return super.getLocalizedMessage(); - } - - /** - * getMessage - */ - public String getMessage() { - return this.msg; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/splits/TableSplit.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/splits/TableSplit.java b/processing/src/main/java/org/apache/carbondata/processing/splits/TableSplit.java deleted file mode 100644 index c7d5dd8..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/splits/TableSplit.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.processing.splits; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.List; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.processing.partition.Partition; - -import org.apache.hadoop.io.Writable; - - -/** - * It represents one region server as one split. - */ -public class TableSplit implements Serializable, Writable { - private static final long serialVersionUID = -8058151330863145575L; - - private static final LogService LOGGER = - LogServiceFactory.getLogService(TableSplit.class.getName()); - private List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN); - - private Partition partition; - - /** - * @return the locations - */ - public List<String> getLocations() { - return locations; - } - - /** - * @param locations the locations to set - */ - public void setLocations(List<String> locations) { - this.locations = locations; - } - - /** - * @return Returns the partitions. - */ - public Partition getPartition() { - return partition; - } - - /** - * @param partition The partitions to set. - */ - public void setPartition(Partition partition) { - this.partition = partition; - } - - @Override public void readFields(DataInput in) throws IOException { - - int sizeLoc = in.readInt(); - for (int i = 0; i < sizeLoc; i++) { - byte[] b = new byte[in.readInt()]; - in.readFully(b); - locations.add(new String(b, Charset.defaultCharset())); - } - - byte[] buf = new byte[in.readInt()]; - in.readFully(buf); - ByteArrayInputStream bis = new ByteArrayInputStream(buf); - ObjectInputStream ois = new ObjectInputStream(bis); - try { - partition = (Partition) ois.readObject(); - } catch (ClassNotFoundException e) { - LOGGER.error(e, e.getMessage()); - } - ois.close(); - } - - @Override public void write(DataOutput out) throws IOException { - - int sizeLoc = locations.size(); - out.writeInt(sizeLoc); - for (int i = 0; i < sizeLoc; i++) { - byte[] bytes = locations.get(i).getBytes(Charset.defaultCharset()); - out.writeInt(bytes.length); - out.write(bytes); - } - - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - - ObjectOutputStream obs = new ObjectOutputStream(bos); - obs.writeObject(partition); - obs.close(); - byte[] byteArray = bos.toByteArray(); - out.writeInt(byteArray.length); - out.write(byteArray); - } - - public String toString() { - return partition.getUniqueID() + ' ' + locations; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java index 8bedd80..7861b52 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataFileAttributes.java @@ -48,10 +48,6 @@ public class CarbonDataFileAttributes { return taskId; } - public void setFactTimeStamp(long factTimeStamp) { - this.factTimeStamp = factTimeStamp; - } - /** * @return fact time stamp which is load start time */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java index f3cb9c3..97737d0 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java @@ -34,11 +34,9 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; -import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel; import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.core.keygenerator.KeyGenException; -import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter; import org.apache.carbondata.core.keygenerator.columnar.impl.MultiDimKeyVarLengthEquiSplitGenerator; import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; @@ -86,7 +84,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { private ExecutorService consumerExecutorService; private List<Future<Void>> consumerExecutorServiceTaskList; private List<CarbonRow> dataRows; - private ColumnGroupModel colGrpModel; /** * semaphore which will used for managing node holder objects */ @@ -136,7 +133,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { } private void initParameters(CarbonFactDataHandlerModel model) { - this.colGrpModel = model.getSegmentProperties().getColumnGroupModel(); this.numberOfCores = model.getNumberOfCores(); blockletProcessingCount = new AtomicInteger(0); producerExecutorService = Executors.newFixedThreadPool(model.getNumberOfCores(), @@ -358,19 +354,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { int dimSet = Integer.parseInt(CarbonCommonConstants.DIMENSION_SPLIT_VALUE_IN_COLUMNAR_DEFAULTVALUE); // if at least one dimension is present then initialize column splitter otherwise null - int noOfColStore = colGrpModel.getNoOfColumnStore(); - int[] keyBlockSize = new int[noOfColStore + getExpandedComplexColsCount()]; - - if (model.getDimLens().length > 0) { - //Using Variable length variable split generator - //This will help in splitting mdkey to columns. variable split is required because all - // columns which are part of - //row store will be in single column store - //e.g if {0,1,2,3,4,5} is dimension and {0,1,2) is row store dimension - //than below splitter will return column as {0,1,2}{3}{4}{5} - ColumnarSplitter columnarSplitter = model.getSegmentProperties().getFixedLengthKeySplitter(); - System.arraycopy(columnarSplitter.getBlockKeySize(), 0, keyBlockSize, 0, noOfColStore); - } + int[] keyBlockSize = new int[getExpandedComplexColsCount()]; // agg type List<Integer> otherMeasureIndexList = @@ -398,8 +382,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { int[] blockKeySize = getBlockKeySizeWithComplexTypes(new MultiDimKeyVarLengthEquiSplitGenerator( CarbonUtil.getIncrementedCardinalityFullyFilled(model.getDimLens().clone()), (byte) dimSet) .getBlockKeySize()); - System.arraycopy(blockKeySize, noOfColStore, keyBlockSize, noOfColStore, - blockKeySize.length - noOfColStore); + System.arraycopy(blockKeySize, 0, keyBlockSize, 0, blockKeySize.length); this.dataWriter = getFactDataWriter(); // initialize the channel; this.dataWriter.initializeWriter(); @@ -413,8 +396,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { */ private int[] getBlockKeySizeWithComplexTypes(int[] primitiveBlockKeySize) { int allColsCount = getExpandedComplexColsCount(); - int[] blockKeySizeWithComplexTypes = - new int[this.colGrpModel.getNoOfColumnStore() + allColsCount]; + int[] blockKeySizeWithComplexTypes = new int[allColsCount]; List<Integer> blockKeySizeWithComplex = new ArrayList<Integer>(blockKeySizeWithComplexTypes.length); http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index 266c75d..63e47f0 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -35,7 +35,6 @@ import org.apache.carbondata.core.metadata.CarbonMetadata; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.util.CarbonProperties; @@ -88,10 +87,6 @@ public class CarbonFactDataHandlerModel { * local store location */ private String[] storeLocation; - /** - * flag to check whether use inverted index - */ - private boolean[] isUseInvertedIndex; /** * length of each dimension, including dictionary, nodictioncy, complex dimension @@ -185,8 +180,6 @@ public class CarbonFactDataHandlerModel { int taskExtension, DataMapWriterListener listener) { CarbonTableIdentifier identifier = configuration.getTableIdentifier().getCarbonTableIdentifier(); - boolean[] isUseInvertedIndex = - CarbonDataProcessorUtil.getIsUseInvertedIndex(configuration.getDataFields()); int[] dimLensWithComplex = configuration.getCardinalityFinder().getCardinality(); if (!configuration.isSortTable()) { @@ -262,7 +255,6 @@ public class CarbonFactDataHandlerModel { carbonFactDataHandlerModel.setPrimitiveDimLens(simpleDimsLen); carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes); carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath); - carbonFactDataHandlerModel.setIsUseInvertedIndex(isUseInvertedIndex); carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB()); carbonFactDataHandlerModel.setComplexDimensionKeyGenerator( configuration.createKeyGeneratorForComplexDimension()); @@ -333,13 +325,6 @@ public class CarbonFactDataHandlerModel { carbonFactDataHandlerModel.setMeasureDataType(measureDataTypes); CarbonUtil.checkAndCreateFolderWithPermission(carbonDataDirectoryPath); carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath); - List<CarbonDimension> dimensionByTableName = carbonTable.getDimensionByTableName(tableName); - boolean[] isUseInvertedIndexes = new boolean[dimensionByTableName.size()]; - int index = 0; - for (CarbonDimension dimension : dimensionByTableName) { - isUseInvertedIndexes[index++] = dimension.isUseInvertedIndex(); - } - carbonFactDataHandlerModel.setIsUseInvertedIndex(isUseInvertedIndexes); carbonFactDataHandlerModel.setPrimitiveDimLens(segmentProperties.getDimColumnsCardinality()); carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB()); @@ -528,13 +513,6 @@ public class CarbonFactDataHandlerModel { isCompactionFlow = compactionFlow; } - public boolean[] getIsUseInvertedIndex() { - return isUseInvertedIndex; - } - - public void setIsUseInvertedIndex(boolean[] isUseInvertedIndex) { - this.isUseInvertedIndex = isUseInvertedIndex; - } /** * * @return segmentProperties @@ -621,10 +599,6 @@ public class CarbonFactDataHandlerModel { return count; } - public boolean isSortColumn(int columnIndex) { - return columnIndex < segmentProperties.getNumberOfSortColumns(); - } - public TableSpec getTableSpec() { return tableSpec; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactHandlerFactory.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactHandlerFactory.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactHandlerFactory.java index b4d18f4..79a8a86 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactHandlerFactory.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactHandlerFactory.java @@ -28,19 +28,8 @@ public final class CarbonFactHandlerFactory { * @param handlerType * @return */ - public static CarbonFactHandler createCarbonFactHandler(CarbonFactDataHandlerModel model, - FactHandlerType handlerType) { - switch (handlerType) { - case COLUMNAR: - return new CarbonFactDataHandlerColumnar(model); - default: - return new CarbonFactDataHandlerColumnar(model); - } + public static CarbonFactHandler createCarbonFactHandler(CarbonFactDataHandlerModel model) { + return new CarbonFactDataHandlerColumnar(model); } - - public enum FactHandlerType { - COLUMNAR - } - } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9114036/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java index 10888f6..dfe0e24 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java @@ -33,7 +33,6 @@ import org.apache.carbondata.common.constants.LoggerAction; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastore.ColumnType; import org.apache.carbondata.core.metadata.CarbonMetadata; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; @@ -397,7 +396,7 @@ public final class CarbonDataProcessorUtil { public static boolean isHeaderValid(String tableName, String[] csvHeader, CarbonDataLoadSchema schema, List<String> ignoreColumns) { Iterator<String> columnIterator = - CarbonDataProcessorUtil.getSchemaColumnNames(schema, tableName).iterator(); + CarbonDataProcessorUtil.getSchemaColumnNames(schema).iterator(); Set<String> csvColumns = new HashSet<String>(csvHeader.length); Collections.addAll(csvColumns, csvHeader); @@ -418,28 +417,17 @@ public final class CarbonDataProcessorUtil { * @param schema * @param tableName */ - public static Set<String> getSchemaColumnNames(CarbonDataLoadSchema schema, String tableName) { + public static Set<String> getSchemaColumnNames(CarbonDataLoadSchema schema) { Set<String> columnNames = new HashSet<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); String factTableName = schema.getCarbonTable().getTableName(); - if (tableName.equals(factTableName)) { - List<CarbonDimension> dimensions = - schema.getCarbonTable().getDimensionByTableName(factTableName); - for (CarbonDimension dimension : dimensions) { - columnNames.add(dimension.getColName()); - } - List<CarbonMeasure> measures = schema.getCarbonTable().getMeasureByTableName(factTableName); - for (CarbonMeasure msr : measures) { - columnNames.add(msr.getColName()); - } - } else { - List<CarbonDimension> dimensions = schema.getCarbonTable().getDimensionByTableName(tableName); - for (CarbonDimension dimension : dimensions) { - columnNames.add(dimension.getColName()); - } - List<CarbonMeasure> measures = schema.getCarbonTable().getMeasureByTableName(tableName); - for (CarbonMeasure msr : measures) { - columnNames.add(msr.getColName()); - } + List<CarbonDimension> dimensions = + schema.getCarbonTable().getDimensionByTableName(factTableName); + for (CarbonDimension dimension : dimensions) { + columnNames.add(dimension.getColName()); + } + List<CarbonMeasure> measures = schema.getCarbonTable().getMeasureByTableName(factTableName); + for (CarbonMeasure msr : measures) { + columnNames.add(msr.getColName()); } return columnNames; } @@ -604,19 +592,6 @@ public final class CarbonDataProcessorUtil { } /** - * This method will return a flag based on whether a column is applicable for RLE encoding - * - * @param dimensionType - * @return - */ - public static boolean isRleApplicableForColumn(ColumnType dimensionType) { - if (dimensionType == ColumnType.GLOBAL_DICTIONARY) { - return true; - } - return false; - } - - /** * This method will return an array whose element with be appended with the `append` strings * @param inputArr inputArr * @param append strings to append