[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...
Github user xuchuanyin closed the pull request at: https://github.com/apache/carbondata/pull/2374 ---
[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2374#discussion_r197016816 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/CsvRecordReader.java --- @@ -0,0 +1,506 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.UnsupportedEncodingException; +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.intf.RowImpl; +import org.apache.carbondata.core.scan.filter.intf.RowIntf; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.statusmanager.FileFormatProperties; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; + +import com.univocity.parsers.csv.CsvParser; +import com.univocity.parsers.csv.CsvParserSettings; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +/** + * scan csv file and filter on it + */ +@InterfaceStability.Evolving +@InterfaceAudience.Internal +public class CsvRecordReader extends AbstractRecordReader { + private static final LogService LOGGER = LogServiceFactory.getLogService( + CsvRecordReader.class.getName()); + private static final int MAX_BATCH_SIZE = + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT; + // vector reader + private boolean isVectorReader; + private T columnarBatch; + + // metadata + private CarbonTable carbonTable; + private CarbonColumn[] carbonColumns; + // input + private QueryModel queryModel; + private CarbonReadSupport readSupport; + private FileSplit fileSplit; + private Configuration hadoopConf; + // the index is schema ordinal, the value is the csv ordinal + private int[] schema2csvIdx; + + // filter + private FilterExecuter filter; --- End diff -- And also there should be an option whether we should push down filters or execution
[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2374#discussion_r197016418 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/CsvRecordReader.java --- @@ -0,0 +1,506 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.UnsupportedEncodingException; +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.intf.RowImpl; +import org.apache.carbondata.core.scan.filter.intf.RowIntf; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.statusmanager.FileFormatProperties; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; + +import com.univocity.parsers.csv.CsvParser; +import com.univocity.parsers.csv.CsvParserSettings; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +/** + * scan csv file and filter on it + */ +@InterfaceStability.Evolving +@InterfaceAudience.Internal +public class CsvRecordReader extends AbstractRecordReader { + private static final LogService LOGGER = LogServiceFactory.getLogService( + CsvRecordReader.class.getName()); + private static final int MAX_BATCH_SIZE = + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT; + // vector reader + private boolean isVectorReader; + private T columnarBatch; + + // metadata + private CarbonTable carbonTable; + private CarbonColumn[] carbonColumns; + // input + private QueryModel queryModel; + private CarbonReadSupport readSupport; + private FileSplit fileSplit; + private Configuration hadoopConf; + // the index is schema ordinal, the value is the csv ordinal + private int[] schema2csvIdx; + + // filter + private FilterExecuter filter; --- End diff -- FIltering logic should be out of readers. Because in future if we add more readers,
[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2374#discussion_r197010299 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java --- @@ -515,12 +573,72 @@ private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet blocklet) th return split; } + private List convertToInputSplit4ExternalFormat(JobContext jobContext, --- End diff -- Why don't use CSVInputFormat.getSplits? I can see the code is almost similar as `FileInputFormat.getSplits` ---
[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2374#discussion_r197008527 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/CsvRecordReader.java --- @@ -0,0 +1,506 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.UnsupportedEncodingException; +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.intf.RowImpl; +import org.apache.carbondata.core.scan.filter.intf.RowIntf; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.statusmanager.FileFormatProperties; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; + +import com.univocity.parsers.csv.CsvParser; +import com.univocity.parsers.csv.CsvParserSettings; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +/** + * scan csv file and filter on it + */ +@InterfaceStability.Evolving +@InterfaceAudience.Internal +public class CsvRecordReader extends AbstractRecordReader { --- End diff -- Why can't you use our existing `CSVInputFormat` and `CSVRecordReader`? why duplicate the code? ---
[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...
Github user xuchuanyin commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2374#discussion_r196627999 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala --- @@ -403,6 +403,17 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { partition = partitionSpec) } + /** + * The syntax of + * ALTER TABLE [dbName.]tableName ADD SEGMENT LOCATION 'path/to/data' + */ + protected lazy val addSegment: Parser[LogicalPlan] = +ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ +ADD ~ SEGMENT ~ LOCATION ~ stringLit <~ opt(";") ^^ { + case dbName ~ tableName ~ add ~ segment ~ location ~ filePath => --- End diff -- OK ---
[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...
Github user xuchuanyin commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2374#discussion_r196627213 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddSegmentCommand.scala --- @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import java.util.UUID + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.execution.command.AtomicRunnableCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.util.FileUtils + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datamap.status.DataMapStatusManager +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.mutate.CarbonUpdateUtil +import org.apache.carbondata.core.statusmanager.{FileFormat, LoadMetadataDetails, SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.events.{OperationContext, OperationListenerBus} +import org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEvent +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} +import org.apache.carbondata.processing.util.CarbonLoaderUtil + +/** + * support `alter table tableName add segment location 'path'` command. + * It will create a segment and map the path of datafile to segment's storage + */ +case class CarbonAddSegmentCommand( +dbNameOp: Option[String], +tableName: String, +filePathFromUser: String, +var operationContext: OperationContext = new OperationContext) extends AtomicRunnableCommand { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + var carbonTable: CarbonTable = _ + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { +val dbName = CarbonEnv.getDatabaseName(dbNameOp)(sparkSession) +carbonTable = { + val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore +.lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] + if (relation == null) { +LOGGER.error(s"Add segment failed due to table $dbName.$tableName not found") +throw new NoSuchTableException(dbName, tableName) + } + relation.carbonTable +} + +if (carbonTable.isHivePartitionTable) { + LOGGER.error("Ignore hive partition table for now") +} + +operationContext.setProperty("isOverwrite", false) +if (CarbonUtil.hasAggregationDataMap(carbonTable)) { + val loadMetadataEvent = new LoadMetadataEvent(carbonTable, false) + OperationListenerBus.getInstance().fireEvent(loadMetadataEvent, operationContext) +} +Seq.empty + } + + // will just mapping external files to segment metadata + override def processData(sparkSession: SparkSession): Seq[Row] = { --- End diff -- In my opinion, creating the segment and updating the tablestatus both belong to `processData`. And in other command such as LoadData, these operation are in `processData` too. ---
[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...
Github user xuchuanyin commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2374#discussion_r196626592 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala --- @@ -700,6 +700,13 @@ class TableNewProcessor(cm: TableModel) { cm.tableName)) tableInfo.setLastUpdatedTime(System.currentTimeMillis()) tableInfo.setFactTable(tableSchema) +val format = cm.tableProperties.get(CarbonCommonConstants.FORMAT) --- End diff -- OK ---
[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...
Github user xuchuanyin commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2374#discussion_r196626156 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala --- @@ -426,6 +439,22 @@ class CarbonScanRDD[T: ClassTag]( CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId)) streamReader.setQueryModel(model) streamReader +case FileFormat.EXTERNAL => + assert(storageFormat.equals("csv"), --- End diff -- OK~ ---
[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...
Github user xuchuanyin commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2374#discussion_r196625677 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java --- @@ -515,12 +574,73 @@ private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet blocklet) th return split; } + private List convertToInputSplit4ExternalFormat(JobContext jobContext, + ExtendedBlocklet extendedBlocklet) throws IOException { +List splits = new ArrayList(); +String factFilePath = extendedBlocklet.getFilePath(); +Path path = new Path(factFilePath); +FileSystem fs = FileFactory.getFileSystem(path); +FileStatus fileStatus = fs.getFileStatus(path); +long length = fileStatus.getLen(); +if (length != 0) { + BlockLocation[] blkLocations = fs.getFileBlockLocations(path, 0, length); + long blkSize = fileStatus.getBlockSize(); + long minSplitSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(jobContext)); + long maxSplitSize = getMaxSplitSize(jobContext); + long splitSize = computeSplitSize(blkSize, minSplitSize, maxSplitSize); + long bytesRemaining = fileStatus.getLen(); + while (((double) bytesRemaining) / splitSize > 1.1) { +int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); +splits.add(new CarbonInputSplit(extendedBlocklet.getSegmentId(), path, +length - bytesRemaining, +splitSize, blkLocations[blkIndex].getHosts(), +blkLocations[blkIndex].getCachedHosts(), FileFormat.EXTERNAL)); +bytesRemaining -= splitSize; + } + if (bytesRemaining != 0) { +int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); +splits.add(new CarbonInputSplit(extendedBlocklet.getSegmentId(), path, +length - bytesRemaining, +bytesRemaining, blkLocations[blkIndex].getHosts(), +blkLocations[blkIndex].getCachedHosts(), FileFormat.EXTERNAL)); + } +} else { + splits.add(new CarbonInputSplit(extendedBlocklet.getSegmentId(), path, 0, length, + new String[0], FileFormat.EXTERNAL)); +} +return splits; + } + @Override public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { Configuration configuration = taskAttemptContext.getConfiguration(); QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext); CarbonReadSupport readSupport = getReadSupportClass(configuration); -return new CarbonRecordReader(queryModel, readSupport); +if (inputSplit instanceof CarbonMultiBlockSplit +&& ((CarbonMultiBlockSplit) inputSplit).getFileFormat() == FileFormat.EXTERNAL) { + return createRecordReaderForExternalFormat(queryModel, readSupport, + configuration.get(CarbonCommonConstants.CARBON_EXTERNAL_FORMAT_CONF_KEY)); +} else if (inputSplit instanceof CarbonInputSplit +&& ((CarbonInputSplit) inputSplit).getFileFormat() == FileFormat.EXTERNAL) { + return createRecordReaderForExternalFormat(queryModel, readSupport, + configuration.get(CarbonCommonConstants.CARBON_EXTERNAL_FORMAT_CONF_KEY)); +} else { + return new CarbonRecordReader(queryModel, readSupport); +} + } + + @Since("1.4.1") --- End diff -- OK ---
[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...
Github user xuchuanyin commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2374#discussion_r196625604 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java --- @@ -174,9 +174,15 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO List result = new LinkedList(); // for each segment fetch blocks matching filter in Driver BTree -List dataBlocksOfSegment = -getDataBlocksOfSegment(job, carbonTable, filterResolver, matchedPartitions, -validSegments, partitionInfo, oldPartitionIdList); +List dataBlocksOfSegment; +if (carbonTable.getTableInfo().getFormat().equals("") --- End diff -- The default value of format is 'carbondata', so there is no need to handle empty. Will remove it ---
[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...
Github user xuchuanyin commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2374#discussion_r196625258 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/CsvRecordReader.java --- @@ -0,0 +1,510 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.UnsupportedEncodingException; +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.intf.RowImpl; +import org.apache.carbondata.core.scan.filter.intf.RowIntf; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.statusmanager.FileFormatProperties; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; + +import com.univocity.parsers.csv.CsvParser; +import com.univocity.parsers.csv.CsvParserSettings; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +/** + * scan csv file and filter on it + */ +@InterfaceStability.Evolving +@InterfaceAudience.Internal +public class CsvRecordReader extends AbstractRecordReader { --- End diff -- The procedure is alike, but the implementation is quite different. The most import parts are converting origin data to internal row and converting origin data to output row. StreamRecordReader, its origin source is ROW_V1 format while in CsvRecordReader, its origin source is CSV format. Besides, in StreamRecordReader there are more details, such as 'syncMark', 'rawRow', we do not need it in CSV. Maybe we can extract the common code in utils or create a new abstraction for ReadSupport or RecordReader. ---
[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...
Github user xuchuanyin commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2374#discussion_r196624366 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/CsvRecordReader.java --- @@ -0,0 +1,510 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.UnsupportedEncodingException; +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.intf.RowImpl; +import org.apache.carbondata.core.scan.filter.intf.RowIntf; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.statusmanager.FileFormatProperties; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; + +import com.univocity.parsers.csv.CsvParser; +import com.univocity.parsers.csv.CsvParserSettings; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +/** + * scan csv file and filter on it + */ +@InterfaceStability.Evolving +@InterfaceAudience.Internal +public class CsvRecordReader extends AbstractRecordReader { + private static final LogService LOGGER = LogServiceFactory.getLogService( + CsvRecordReader.class.getName()); + private static final int MAX_BATCH_SIZE = + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT; + // vector reader + private boolean isVectorReader; + private T columnarBatch; + + // metadata + private CarbonTable carbonTable; + private CarbonColumn[] carbonColumns; + // input + private QueryModel queryModel; + private CarbonReadSupport readSupport; + private FileSplit fileSplit; + private Configuration hadoopConf; + // the index is schema ordinal, the value is the csv ordinal + private int[] schema2csvIdx; + + // filter + private FilterExecuter filter; + // the index is the dimension ordinal, the value is the schema ordinal + private int[]
[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2374#discussion_r196512608 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala --- @@ -403,6 +403,17 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { partition = partitionSpec) } + /** + * The syntax of + * ALTER TABLE [dbName.]tableName ADD SEGMENT LOCATION 'path/to/data' + */ + protected lazy val addSegment: Parser[LogicalPlan] = +ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ +ADD ~ SEGMENT ~ LOCATION ~ stringLit <~ opt(";") ^^ { + case dbName ~ tableName ~ add ~ segment ~ location ~ filePath => --- End diff -- I think it should be `case dbName ~ tableName ~ filePath =>` ---
[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2374#discussion_r196512126 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddSegmentCommand.scala --- @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import java.util.UUID + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.execution.command.AtomicRunnableCommand +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.util.FileUtils + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datamap.status.DataMapStatusManager +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.mutate.CarbonUpdateUtil +import org.apache.carbondata.core.statusmanager.{FileFormat, LoadMetadataDetails, SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.events.{OperationContext, OperationListenerBus} +import org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEvent +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} +import org.apache.carbondata.processing.util.CarbonLoaderUtil + +/** + * support `alter table tableName add segment location 'path'` command. + * It will create a segment and map the path of datafile to segment's storage + */ +case class CarbonAddSegmentCommand( +dbNameOp: Option[String], +tableName: String, +filePathFromUser: String, +var operationContext: OperationContext = new OperationContext) extends AtomicRunnableCommand { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + var carbonTable: CarbonTable = _ + + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { +val dbName = CarbonEnv.getDatabaseName(dbNameOp)(sparkSession) +carbonTable = { + val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore +.lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] + if (relation == null) { +LOGGER.error(s"Add segment failed due to table $dbName.$tableName not found") +throw new NoSuchTableException(dbName, tableName) + } + relation.carbonTable +} + +if (carbonTable.isHivePartitionTable) { + LOGGER.error("Ignore hive partition table for now") +} + +operationContext.setProperty("isOverwrite", false) +if (CarbonUtil.hasAggregationDataMap(carbonTable)) { + val loadMetadataEvent = new LoadMetadataEvent(carbonTable, false) + OperationListenerBus.getInstance().fireEvent(loadMetadataEvent, operationContext) +} +Seq.empty + } + + // will just mapping external files to segment metadata + override def processData(sparkSession: SparkSession): Seq[Row] = { --- End diff -- All these operations are metadata only, so I think this class should extend `MetadataProcessOpeation` instead ---
[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2374#discussion_r196511544 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala --- @@ -700,6 +700,13 @@ class TableNewProcessor(cm: TableModel) { cm.tableName)) tableInfo.setLastUpdatedTime(System.currentTimeMillis()) tableInfo.setFactTable(tableSchema) +val format = cm.tableProperties.get(CarbonCommonConstants.FORMAT) --- End diff -- `format` table property should also be checked, now only csv is supported ---
[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2374#discussion_r196510839 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala --- @@ -426,6 +439,22 @@ class CarbonScanRDD[T: ClassTag]( CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId)) streamReader.setQueryModel(model) streamReader +case FileFormat.EXTERNAL => + assert(storageFormat.equals("csv"), --- End diff -- should use if check instead of assert ---
[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2374#discussion_r196510278 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java --- @@ -515,12 +574,73 @@ private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet blocklet) th return split; } + private List convertToInputSplit4ExternalFormat(JobContext jobContext, + ExtendedBlocklet extendedBlocklet) throws IOException { +List splits = new ArrayList(); +String factFilePath = extendedBlocklet.getFilePath(); +Path path = new Path(factFilePath); +FileSystem fs = FileFactory.getFileSystem(path); +FileStatus fileStatus = fs.getFileStatus(path); +long length = fileStatus.getLen(); +if (length != 0) { + BlockLocation[] blkLocations = fs.getFileBlockLocations(path, 0, length); + long blkSize = fileStatus.getBlockSize(); + long minSplitSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(jobContext)); + long maxSplitSize = getMaxSplitSize(jobContext); + long splitSize = computeSplitSize(blkSize, minSplitSize, maxSplitSize); + long bytesRemaining = fileStatus.getLen(); + while (((double) bytesRemaining) / splitSize > 1.1) { +int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); +splits.add(new CarbonInputSplit(extendedBlocklet.getSegmentId(), path, +length - bytesRemaining, +splitSize, blkLocations[blkIndex].getHosts(), +blkLocations[blkIndex].getCachedHosts(), FileFormat.EXTERNAL)); +bytesRemaining -= splitSize; + } + if (bytesRemaining != 0) { +int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); +splits.add(new CarbonInputSplit(extendedBlocklet.getSegmentId(), path, +length - bytesRemaining, +bytesRemaining, blkLocations[blkIndex].getHosts(), +blkLocations[blkIndex].getCachedHosts(), FileFormat.EXTERNAL)); + } +} else { + splits.add(new CarbonInputSplit(extendedBlocklet.getSegmentId(), path, 0, length, + new String[0], FileFormat.EXTERNAL)); +} +return splits; + } + @Override public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { Configuration configuration = taskAttemptContext.getConfiguration(); QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext); CarbonReadSupport readSupport = getReadSupportClass(configuration); -return new CarbonRecordReader(queryModel, readSupport); +if (inputSplit instanceof CarbonMultiBlockSplit +&& ((CarbonMultiBlockSplit) inputSplit).getFileFormat() == FileFormat.EXTERNAL) { + return createRecordReaderForExternalFormat(queryModel, readSupport, + configuration.get(CarbonCommonConstants.CARBON_EXTERNAL_FORMAT_CONF_KEY)); +} else if (inputSplit instanceof CarbonInputSplit +&& ((CarbonInputSplit) inputSplit).getFileFormat() == FileFormat.EXTERNAL) { + return createRecordReaderForExternalFormat(queryModel, readSupport, + configuration.get(CarbonCommonConstants.CARBON_EXTERNAL_FORMAT_CONF_KEY)); +} else { + return new CarbonRecordReader(queryModel, readSupport); +} + } + + @Since("1.4.1") --- End diff -- I think for private method, this annotation is not required ---
[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2374#discussion_r196509935 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java --- @@ -174,9 +174,15 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO List result = new LinkedList(); // for each segment fetch blocks matching filter in Driver BTree -List dataBlocksOfSegment = -getDataBlocksOfSegment(job, carbonTable, filterResolver, matchedPartitions, -validSegments, partitionInfo, oldPartitionIdList); +List dataBlocksOfSegment; +if (carbonTable.getTableInfo().getFormat().equals("") --- End diff -- why support empty string? ---
[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2374#discussion_r196509716 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/CsvRecordReader.java --- @@ -0,0 +1,510 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.UnsupportedEncodingException; +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.intf.RowImpl; +import org.apache.carbondata.core.scan.filter.intf.RowIntf; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.statusmanager.FileFormatProperties; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; + +import com.univocity.parsers.csv.CsvParser; +import com.univocity.parsers.csv.CsvParserSettings; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +/** + * scan csv file and filter on it + */ +@InterfaceStability.Evolving +@InterfaceAudience.Internal +public class CsvRecordReader extends AbstractRecordReader { --- End diff -- This class is much like StreamRecordReader, and it implements filter execution on internal row, can you extract common code to a parent class? ---
[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2374#discussion_r196508522 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/CsvRecordReader.java --- @@ -0,0 +1,510 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.UnsupportedEncodingException; +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.intf.RowImpl; +import org.apache.carbondata.core.scan.filter.intf.RowIntf; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.statusmanager.FileFormatProperties; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; +import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; + +import com.univocity.parsers.csv.CsvParser; +import com.univocity.parsers.csv.CsvParserSettings; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +/** + * scan csv file and filter on it + */ +@InterfaceStability.Evolving +@InterfaceAudience.Internal +public class CsvRecordReader extends AbstractRecordReader { + private static final LogService LOGGER = LogServiceFactory.getLogService( + CsvRecordReader.class.getName()); + private static final int MAX_BATCH_SIZE = + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT; + // vector reader + private boolean isVectorReader; + private T columnarBatch; + + // metadata + private CarbonTable carbonTable; + private CarbonColumn[] carbonColumns; + // input + private QueryModel queryModel; + private CarbonReadSupport readSupport; + private FileSplit fileSplit; + private Configuration hadoopConf; + // the index is schema ordinal, the value is the csv ordinal + private int[] schema2csvIdx; + + // filter + private FilterExecuter filter; + // the index is the dimension ordinal, the value is the schema ordinal + private int[]
[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...
Github user xuchuanyin commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2374#discussion_r196398250 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java --- @@ -291,6 +318,17 @@ public void write(DataOutput out) throws IOException { } } out.writeBoolean(isSchemaModified); + +out.writeUTF(format); +boolean isFormatPropertiesExists = null != formatProperties && formatProperties.size() > 0; +out.writeBoolean(isFormatPropertiesExists); +if (isFormatPropertiesExists) { + out.writeShort(formatProperties.size()); --- End diff -- The formatProperties is a javaMap that converted from scalaMap. When I try to directly write this map as binary, a serialization problem occurs. ---
[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2374#discussion_r196307354 --- Diff: common/src/main/java/org/apache/carbondata/common/annotations/Since.java --- @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.common.annotations; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * The annotation indicates that the version number since a member or a type has been present. + */ +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.FIELD, ElementType.TYPE}) --- End diff -- Add `ElementType.METHOD` also ---
[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2374#discussion_r195687377 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonCsvRecordReader.java --- @@ -0,0 +1,663 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.math.BigInteger; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.intf.RowImpl; +import org.apache.carbondata.core.scan.filter.intf.RowIntf; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.statusmanager.FileFormatProperties; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; + +import com.univocity.parsers.csv.CsvParser; +import com.univocity.parsers.csv.CsvParserSettings; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVector; +import org.apache.spark.sql.execution.vectorized.ColumnarBatch; +import org.apache.spark.sql.types.CalendarIntervalType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.types.DecimalType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * scan csv file and filter on it + */ +public class CarbonCsvRecordReader extends AbstractRecordReader { + private static final LogService LOGGER = LogServiceFactory.getLogService( + CarbonCsvRecordReader.class.getName()); + private static final int MAX_BATCH_SIZE = 32000; + + // vector reader + private boolean isVectorReader; + private ColumnarBatch columnarBatch; +
[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2374#discussion_r195687057 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonCsvRecordReader.java --- @@ -0,0 +1,663 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hadoop; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.Reader; +import java.math.BigInteger; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.GenericQueryType; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.intf.RowImpl; +import org.apache.carbondata.core.scan.filter.intf.RowIntf; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.scan.model.QueryModel; +import org.apache.carbondata.core.statusmanager.FileFormatProperties; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; + +import com.univocity.parsers.csv.CsvParser; +import com.univocity.parsers.csv.CsvParserSettings; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVector; +import org.apache.spark.sql.execution.vectorized.ColumnarBatch; +import org.apache.spark.sql.types.CalendarIntervalType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.types.DecimalType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * scan csv file and filter on it + */ +public class CarbonCsvRecordReader extends AbstractRecordReader { --- End diff -- I think you can name it `CsvRecordReader` ---
[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2374#discussion_r195686824 --- Diff: hadoop/pom.xml --- @@ -39,6 +39,11 @@ carbondata-processing ${project.version} + + org.apache.spark + spark-sql_${scala.binary.version} --- End diff -- why this is required? ---
[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2374#discussion_r195686320 --- Diff: core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java --- @@ -26,7 +26,10 @@ COLUMNAR_V3, // carbondata row file format, optimized for write - ROW_V1; + ROW_V1, + + // external file format, such as parquet/csv --- End diff -- please describe where the format string is stored (table property in TableInfo) ---
[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2374#discussion_r195684966 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java --- @@ -291,6 +318,17 @@ public void write(DataOutput out) throws IOException { } } out.writeBoolean(isSchemaModified); + +out.writeUTF(format); +boolean isFormatPropertiesExists = null != formatProperties && formatProperties.size() > 0; +out.writeBoolean(isFormatPropertiesExists); +if (isFormatPropertiesExists) { + out.writeShort(formatProperties.size()); --- End diff -- It is better to write `formateProperties` in one shot, performance is much better than writing each entry. ---
[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2374#discussion_r195680457 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java --- @@ -89,6 +90,15 @@ * */ private boolean isTransactionalTable = true; + /** + * The format of the fact table. + * By default it is carbondata, and we also support other format like CSV + */ + private String format = "carbondata"; --- End diff -- This class should be backward compatible, please make sure you will process it if it is null And please add @Since("1.4.1") annotation ---
[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...
Github user xuchuanyin closed the pull request at: https://github.com/apache/carbondata/pull/2374 ---
[GitHub] carbondata pull request #2374: [CARBONDATA-2613] Support csv based carbon ta...
GitHub user xuchuanyin reopened a pull request: https://github.com/apache/carbondata/pull/2374 [CARBONDATA-2613] Support csv based carbon table This PR is only a basic implementation and has some restrictions. Now it can support create/load/directly query/drop on csv based carbon table. 1. create csv based carbon table using ```SQL CREATE TABLE fact_table ( col1 bigint, col2 string, ..., col100 string) STORED BY 'CarbonData' TBLPROPERTIES( 'foramt'='csv', 'csv.delimiter'=',', 'csv.header'='col1,col2,col100'); ``` 2. Load data to this table using ```SQL ALTER TABLE fact_table ADD SEGMENT LOCATION 'path/to/data1' ``` **Note**: *In order to reduce data movement, we just mapping the origin csv to CarbonData segment using the following statement.* 3. Query on this table has no difference from that on ordinary carbon table. Be sure to do all of the following checklist to help us incorporate your contribution quickly and easily: - [ ] Any interfaces changed? - [x] Any backward compatibility impacted? `NO` - [x] Document update required? `NO, will do it once the feature is released` - [x] Testing done Please provide details on - Whether new unit test cases have been added or why no new tests are required? `basic tests added` - How it is tested? Please attach test report. `Tested in local machine` - Is it a performance related change? Please attach the performance test report. `NA` - Any additional information to help reviewers in testing this change. - [x] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. You can merge this pull request into a Git repository by running: $ git pull https://github.com/xuchuanyin/carbondata 0613_support_csv_table Alternatively you can review and apply these changes as the patch at: https://github.com/apache/carbondata/pull/2374.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2374 commit e9fa7e6402c7584146a52542534e719ca64143c1 Author: xuchuanyin Date: 2018-06-13T01:03:28Z support create csv based carbontable commit c71e9a5bda1ac23fa991a71e0e091f7814bd2117 Author: xuchuanyin Date: 2018-06-13T01:45:10Z support add segment for csv carbon table commit 01a8f00b5a50c50c6dd7854bd1d5500ac484b6e6 Author: xuchuanyin Date: 2018-06-14T09:37:24Z Add csv record reader for csv carbon table commit 506a072e8c7df64e89da8b53b3b5195fa4b01a31 Author: xuchuanyin Date: 2018-06-14T15:00:35Z fix checkstyle commit a9ff13027cf817c281de7030e36132f464abb3aa Author: xuchuanyin Date: 2018-06-15T00:23:43Z support specifying csv properties ---