Repository: incubator-carbondata Updated Branches: refs/heads/master c95e565e4 -> 997af85dc
fixLatedecoderIssueForSpark2 Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/9961f537 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/9961f537 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/9961f537 Branch: refs/heads/master Commit: 9961f537f2229985b09d22204c827638b4e4a397 Parents: c95e565 Author: QiangCai <qiang...@qq.com> Authored: Thu Dec 1 19:32:04 2016 +0800 Committer: jackylk <jacky.li...@huawei.com> Committed: Thu Dec 1 20:26:40 2016 +0800 ---------------------------------------------------------------------- .../readsupport/SparkRowReadSupportImpl.java | 76 -------------------- .../readsupport/SparkRowReadSupportImpl.java | 70 ++++++++++++++++++ .../readsupport/SparkRowReadSupportImpl.java | 57 +++++++++++++++ 3 files changed, 127 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9961f537/integration/spark-common/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java deleted file mode 100644 index 4b1958d..0000000 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.carbondata.spark.readsupport; - -import java.sql.Timestamp; - -import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; -import org.apache.carbondata.core.carbon.metadata.datatype.DataType; -import org.apache.carbondata.core.carbon.metadata.encoder.Encoding; -import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn; -import org.apache.carbondata.core.util.DataTypeUtil; -import org.apache.carbondata.hadoop.readsupport.impl.AbstractDictionaryDecodedReadSupport; - -import org.apache.spark.sql.Row; -import org.apache.spark.sql.catalyst.expressions.GenericRow; -import org.apache.spark.unsafe.types.UTF8String; - -public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSupport<Row> { - - @Override public void initialize(CarbonColumn[] carbonColumns, - AbsoluteTableIdentifier absoluteTableIdentifier) { - super.initialize(carbonColumns, absoluteTableIdentifier); - //can initialize and generate schema here. - } - - @Override public Row readRow(Object[] data) { - for (int i = 0; i < dictionaries.length; i++) { - if (dictionaries[i] != null) { - data[i] = DataTypeUtil - .getDataBasedOnDataType(dictionaries[i].getDictionaryValueForKey((int) data[i]), - dataTypes[i]); - switch (dataTypes[i]) { - case STRING: - data[i] = UTF8String.fromString(data[i].toString()); - break; - case TIMESTAMP: - data[i] = new Timestamp((long) data[i] / 1000); - break; - case LONG: - data[i] = data[i]; - break; - default: - } - } - else if (carbonColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) { - //convert the long to timestamp in case of direct dictionary column - if (DataType.TIMESTAMP == carbonColumns[i].getDataType()) { - data[i] = new Timestamp((long) data[i] / 1000); - } - } -// else if(dataTypes[i].equals(DataType.INT)) { -// data[i] = ((Long)(data[i])).intValue(); -// } -// else if(dataTypes[i].equals(DataType.SHORT)) { -// data[i] = ((Double)(data[i])).shortValue(); -// } - } - return new GenericRow(data); - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9961f537/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java new file mode 100644 index 0000000..42c67b9 --- /dev/null +++ b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.spark.readsupport; + +import java.sql.Timestamp; + +import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; +import org.apache.carbondata.core.carbon.metadata.datatype.DataType; +import org.apache.carbondata.core.carbon.metadata.encoder.Encoding; +import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.hadoop.readsupport.impl.AbstractDictionaryDecodedReadSupport; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.expressions.GenericRow; +import org.apache.spark.unsafe.types.UTF8String; + +public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSupport<Row> { + + @Override public void initialize(CarbonColumn[] carbonColumns, + AbsoluteTableIdentifier absoluteTableIdentifier) { + super.initialize(carbonColumns, absoluteTableIdentifier); + //can initialize and generate schema here. + } + + @Override public Row readRow(Object[] data) { + for (int i = 0; i < dictionaries.length; i++) { + if (dictionaries[i] != null) { + data[i] = DataTypeUtil + .getDataBasedOnDataType(dictionaries[i].getDictionaryValueForKey((int) data[i]), + dataTypes[i]); + switch (dataTypes[i]) { + case STRING: + data[i] = UTF8String.fromString(data[i].toString()); + break; + case TIMESTAMP: + data[i] = new Timestamp((long) data[i] / 1000); + break; + case LONG: + data[i] = data[i]; + break; + default: + } + } + else if (carbonColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) { + //convert the long to timestamp in case of direct dictionary column + if (DataType.TIMESTAMP == carbonColumns[i].getDataType()) { + data[i] = new Timestamp((long) data[i] / 1000); + } + } + } + return new GenericRow(data); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/9961f537/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java new file mode 100644 index 0000000..c2f5d5d --- /dev/null +++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.spark.readsupport; + +import java.sql.Timestamp; + +import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; +import org.apache.carbondata.core.carbon.metadata.datatype.DataType; +import org.apache.carbondata.core.carbon.metadata.encoder.Encoding; +import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.hadoop.readsupport.impl.AbstractDictionaryDecodedReadSupport; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.expressions.GenericRow; + +public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSupport<Row> { + + @Override public void initialize(CarbonColumn[] carbonColumns, + AbsoluteTableIdentifier absoluteTableIdentifier) { + super.initialize(carbonColumns, absoluteTableIdentifier); + //can initialize and generate schema here. + } + + @Override public Row readRow(Object[] data) { + for (int i = 0; i < dictionaries.length; i++) { + if (dictionaries[i] == null) { + if (carbonColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) { + //convert the long to timestamp in case of direct dictionary column + if (DataType.TIMESTAMP == carbonColumns[i].getDataType()) { + data[i] = new Timestamp((long) data[i] / 1000); + } + } else if(dataTypes[i].equals(DataType.INT)) { + data[i] = ((Long)(data[i])).intValue(); + } else if(dataTypes[i].equals(DataType.SHORT)) { + data[i] = ((Double)(data[i])).shortValue(); + } + } + } + return new GenericRow(data); + } +}