Repository: incubator-carbondata Updated Branches: refs/heads/master 7bd734767 -> fdc9454d9
Added Encoder processor for dataloding. Fixed review comments Fixed review comments Rebased and fixed comments. Fixed comments. Fixed compilation issue Fixed checkstyle issue Fixed compile error after merging of PR 247 Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/d96f09a2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/d96f09a2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/d96f09a2 Branch: refs/heads/master Commit: d96f09a2bec50d8c78e815420444defedab9c039 Parents: 7bd7347 Author: ravipesala <ravi.pes...@gmail.com> Authored: Mon Oct 17 10:33:35 2016 +0530 Committer: jackylk <jacky.li...@huawei.com> Committed: Tue Oct 25 23:39:52 2016 +0800 ---------------------------------------------------------------------- .../carbondata/core/devapi/BiDictionary.java | 2 +- .../devapi/DictionaryGenerationException.java | 69 ++++++++++++++ .../core/devapi/DictionaryGenerator.java | 2 +- .../core/devapi/GeneratingBiDictionary.java | 2 +- .../newflow/converter/FieldConverter.java | 36 ++++++++ .../newflow/converter/RowConverter.java | 32 +++++++ .../AbstractDictionaryFieldConverterImpl.java | 27 ++++++ .../impl/ComplexFieldConverterImpl.java | 30 ++++++ .../impl/DictionaryFieldConverterImpl.java | 74 +++++++++++++++ .../DirectDictionaryFieldConverterImpl.java | 50 ++++++++++ .../converter/impl/FieldEncoderFactory.java | 69 ++++++++++++++ .../impl/NonDictionaryFieldConverterImpl.java | 52 +++++++++++ .../converter/impl/RowConverterImpl.java | 96 ++++++++++++++++++++ .../newflow/dictionary/InMemBiDictionary.java | 6 +- .../dictionary/PreCreatedDictionary.java | 3 +- .../exception/CarbonDataLoadingException.java | 2 +- .../steps/DataConverterProcessorStepImpl.java | 66 ++++++++++++++ .../dictionary/InMemBiDictionaryTest.java | 3 +- 18 files changed, 613 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/core/src/main/java/org/apache/carbondata/core/devapi/BiDictionary.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/devapi/BiDictionary.java b/core/src/main/java/org/apache/carbondata/core/devapi/BiDictionary.java index ffaba52..051fb9e 100644 --- a/core/src/main/java/org/apache/carbondata/core/devapi/BiDictionary.java +++ b/core/src/main/java/org/apache/carbondata/core/devapi/BiDictionary.java @@ -27,7 +27,7 @@ public interface BiDictionary<K, V> { * @param value dictionary value * @return dictionary key */ - K getOrGenerateKey(V value) throws Exception; + K getOrGenerateKey(V value) throws DictionaryGenerationException; /** * Get the dictionary key corresponding to the input value, return null if value is not exist in http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/core/src/main/java/org/apache/carbondata/core/devapi/DictionaryGenerationException.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/devapi/DictionaryGenerationException.java b/core/src/main/java/org/apache/carbondata/core/devapi/DictionaryGenerationException.java new file mode 100644 index 0000000..47f43ee --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/devapi/DictionaryGenerationException.java @@ -0,0 +1,69 @@ +package org.apache.carbondata.core.devapi; + +import java.util.Locale; + +public class DictionaryGenerationException extends Exception { + /** + * default serial version ID. + */ + private static final long serialVersionUID = 1L; + + /** + * The Error message. + */ + private String msg = ""; + + /** + * Constructor + * + * @param msg The error message for this exception. + */ + public DictionaryGenerationException(String msg) { + super(msg); + this.msg = msg; + } + + /** + * Constructor + * + * @param msg The error message for this exception. + */ + public DictionaryGenerationException(String msg, Throwable t) { + super(msg, t); + this.msg = msg; + } + + /** + * Constructor + * + * @param t + */ + public DictionaryGenerationException(Throwable t) { + super(t); + } + + /** + * This method is used to get the localized message. + * + * @param locale - A Locale object represents a specific geographical, + * political, or cultural region. + * @return - Localized error message. + */ + public String getLocalizedMessage(Locale locale) { + return ""; + } + + /** + * getLocalizedMessage + */ + @Override public String getLocalizedMessage() { + return super.getLocalizedMessage(); + } + + /** + * getMessage + */ + public String getMessage() { + return this.msg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/core/src/main/java/org/apache/carbondata/core/devapi/DictionaryGenerator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/devapi/DictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/devapi/DictionaryGenerator.java index 99f430e..a6dd763 100644 --- a/core/src/main/java/org/apache/carbondata/core/devapi/DictionaryGenerator.java +++ b/core/src/main/java/org/apache/carbondata/core/devapi/DictionaryGenerator.java @@ -27,5 +27,5 @@ public interface DictionaryGenerator<K, V> { * @return dictionary key * @throws Exception any exception */ - K generateKey(V value) throws Exception; + K generateKey(V value) throws DictionaryGenerationException; } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/core/src/main/java/org/apache/carbondata/core/devapi/GeneratingBiDictionary.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/devapi/GeneratingBiDictionary.java b/core/src/main/java/org/apache/carbondata/core/devapi/GeneratingBiDictionary.java index 47424b4..9194c84 100644 --- a/core/src/main/java/org/apache/carbondata/core/devapi/GeneratingBiDictionary.java +++ b/core/src/main/java/org/apache/carbondata/core/devapi/GeneratingBiDictionary.java @@ -28,7 +28,7 @@ public abstract class GeneratingBiDictionary<K, V> implements BiDictionary<K, V> } @Override - public K getOrGenerateKey(V value) throws Exception { + public K getOrGenerateKey(V value) throws DictionaryGenerationException { K key = getKey(value); if (key != null) { return key; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/FieldConverter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/FieldConverter.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/FieldConverter.java new file mode 100644 index 0000000..e304fbc --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/FieldConverter.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.processing.newflow.converter; + +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; +import org.apache.carbondata.processing.newflow.row.CarbonRow; + +/** + * This interface converts/transforms the column field. + */ +public interface FieldConverter { + + /** + * It converts the column field and updates the data in same location/index in row. + * @param row + * @throws CarbonDataLoadingException + */ + void convert(CarbonRow row) throws CarbonDataLoadingException; +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java new file mode 100644 index 0000000..44f1116 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.processing.newflow.converter; + +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; +import org.apache.carbondata.processing.newflow.row.CarbonRow; + +/** + * convert the row + */ +public interface RowConverter { + + CarbonRow convert(CarbonRow row) throws CarbonDataLoadingException; + + void finish(); +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/AbstractDictionaryFieldConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/AbstractDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/AbstractDictionaryFieldConverterImpl.java new file mode 100644 index 0000000..70a900c --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/AbstractDictionaryFieldConverterImpl.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.processing.newflow.converter.impl; + +import org.apache.carbondata.processing.newflow.converter.FieldConverter; + +public abstract class AbstractDictionaryFieldConverterImpl implements FieldConverter { + + public abstract int getColumnCardinality(); + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java new file mode 100644 index 0000000..4c18aa7 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.processing.newflow.converter.impl; + +import org.apache.carbondata.processing.newflow.converter.FieldConverter; +import org.apache.carbondata.processing.newflow.row.CarbonRow; + +public class ComplexFieldConverterImpl implements FieldConverter { + + @Override + public void convert(CarbonRow row) { + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java new file mode 100644 index 0000000..8ca4ff2 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.processing.newflow.converter.impl; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; +import org.apache.carbondata.core.carbon.CarbonTableIdentifier; +import org.apache.carbondata.core.devapi.BiDictionary; +import org.apache.carbondata.core.devapi.DictionaryGenerationException; +import org.apache.carbondata.core.util.CarbonUtilException; +import org.apache.carbondata.processing.newflow.DataField; +import org.apache.carbondata.processing.newflow.dictionary.PreCreatedDictionary; +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; +import org.apache.carbondata.processing.newflow.row.CarbonRow; + +public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConverterImpl { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(DictionaryFieldConverterImpl.class.getName()); + + private BiDictionary<Integer, String> dictionaryGenerator; + + private int index; + + public DictionaryFieldConverterImpl(DataField dataField, + Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache, + CarbonTableIdentifier carbonTableIdentifier, int index) { + this.index = index; + DictionaryColumnUniqueIdentifier identifier = + new DictionaryColumnUniqueIdentifier(carbonTableIdentifier, + dataField.getColumn().getColumnIdentifier(), dataField.getColumn().getDataType()); + try { + Dictionary dictionary = cache.get(identifier); + dictionaryGenerator = new PreCreatedDictionary(dictionary); + } catch (CarbonUtilException e) { + LOGGER.error(e); + throw new RuntimeException(e); + } + } + + @Override + public void convert(CarbonRow row) throws CarbonDataLoadingException { + try { + row.update(dictionaryGenerator.getOrGenerateKey(row.getString(index)), index); + } catch (DictionaryGenerationException e) { + throw new CarbonDataLoadingException(e); + } + } + + @Override + public int getColumnCardinality() { + return dictionaryGenerator.size(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java new file mode 100644 index 0000000..8ff110a --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.processing.newflow.converter.impl; + +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; +import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; +import org.apache.carbondata.processing.newflow.DataField; +import org.apache.carbondata.processing.newflow.row.CarbonRow; + +public class DirectDictionaryFieldConverterImpl extends AbstractDictionaryFieldConverterImpl { + + private DirectDictionaryGenerator directDictionaryGenerator; + + private int index; + + public DirectDictionaryFieldConverterImpl(DataField dataField, int index) { + DirectDictionaryGenerator directDictionaryGenerator = + DirectDictionaryKeyGeneratorFactory + .getDirectDictionaryGenerator(dataField.getColumn().getDataType()); + this.directDictionaryGenerator = directDictionaryGenerator; + this.index = index; + } + + @Override + public void convert(CarbonRow row) { + row.update(directDictionaryGenerator.generateDirectSurrogateKey(row.getString(index)), index); + } + + @Override + public int getColumnCardinality() { + return Integer.MAX_VALUE; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java new file mode 100644 index 0000000..a10ad20 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.processing.newflow.converter.impl; + +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; +import org.apache.carbondata.core.carbon.CarbonTableIdentifier; +import org.apache.carbondata.core.carbon.metadata.encoder.Encoding; +import org.apache.carbondata.processing.newflow.DataField; +import org.apache.carbondata.processing.newflow.converter.FieldConverter; + +public class FieldEncoderFactory { + + private static FieldEncoderFactory instance; + + private FieldEncoderFactory() { + + } + + public static FieldEncoderFactory getInstance() { + if (instance == null) { + instance = new FieldEncoderFactory(); + } + return instance; + } + + /** + * Creates the FieldConverter for all dimensions, for measures return null. + * @param dataField column schema + * @param cache dicionary cache. + * @param carbonTableIdentifier table identifier + * @param index index of column in the row. + * @return + */ + public FieldConverter createFieldEncoder(DataField dataField, + Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache, + CarbonTableIdentifier carbonTableIdentifier, int index) { + // Converters are only needed for dimensions and measures it return null. + if (dataField.getColumn().isDimesion()) { + if (dataField.getColumn().hasEncoding(Encoding.DICTIONARY)) { + return new DictionaryFieldConverterImpl(dataField, cache, carbonTableIdentifier, index); + } else if (dataField.getColumn().hasEncoding(Encoding.DIRECT_DICTIONARY)) { + return new DirectDictionaryFieldConverterImpl(dataField, index); + } else if (dataField.getColumn().isComplex()) { + return new ComplexFieldConverterImpl(); + } else { + return new NonDictionaryFieldConverterImpl(dataField, index); + } + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java new file mode 100644 index 0000000..9540907 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.processing.newflow.converter.impl; + +import java.nio.charset.Charset; + +import org.apache.carbondata.core.carbon.metadata.datatype.DataType; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.DataTypeUtil; +import org.apache.carbondata.processing.newflow.DataField; +import org.apache.carbondata.processing.newflow.converter.FieldConverter; +import org.apache.carbondata.processing.newflow.row.CarbonRow; + +public class NonDictionaryFieldConverterImpl implements FieldConverter { + + private DataType dataType; + + private int index; + + public NonDictionaryFieldConverterImpl(DataField dataField, int index) { + this.dataType = dataField.getColumn().getDataType(); + this.index = index; + } + + @Override + public void convert(CarbonRow row) { + String dimensionValue = row.getString(index); + if (dataType != DataType.STRING) { + if (null == DataTypeUtil.normalizeIntAndLongValues(dimensionValue, dataType)) { + dimensionValue = CarbonCommonConstants.MEMBER_DEFAULT_VAL; + } + } + row.update(dimensionValue.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)), + index); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java new file mode 100644 index 0000000..78b9290 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.processing.newflow.converter.impl; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.core.cache.Cache; +import org.apache.carbondata.core.cache.CacheProvider; +import org.apache.carbondata.core.cache.CacheType; +import org.apache.carbondata.core.cache.dictionary.Dictionary; +import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration; +import org.apache.carbondata.processing.newflow.DataField; +import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants; +import org.apache.carbondata.processing.newflow.converter.FieldConverter; +import org.apache.carbondata.processing.newflow.converter.RowConverter; +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; +import org.apache.carbondata.processing.newflow.row.CarbonRow; + +/** + * It converts the complete row if necessary, dictionary columns are encoded with dictionary values + * and nondictionary values are converted to binary. + */ +public class RowConverterImpl implements RowConverter { + + private CarbonDataLoadConfiguration configuration; + + private FieldConverter[] fieldConverters; + + public RowConverterImpl(DataField[] fields, CarbonDataLoadConfiguration configuration) { + this.configuration = configuration; + CacheProvider cacheProvider = CacheProvider.getInstance(); + Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache = + cacheProvider.createCache(CacheType.REVERSE_DICTIONARY, + configuration.getTableIdentifier().getStorePath()); + List<FieldConverter> fieldConverterList = new ArrayList<>(); + + long lruCacheStartTime = System.currentTimeMillis(); + + for (int i = 0; i < fields.length; i++) { + FieldConverter fieldConverter = FieldEncoderFactory.getInstance() + .createFieldEncoder(fields[i], cache, + configuration.getTableIdentifier().getCarbonTableIdentifier(), i); + if (fieldConverter != null) { + fieldConverterList.add(fieldConverter); + } + } + CarbonTimeStatisticsFactory.getLoadStatisticsInstance() + .recordLruCacheLoadTime((System.currentTimeMillis() - lruCacheStartTime) / 1000.0); + fieldConverters = fieldConverterList.toArray(new FieldConverter[fieldConverterList.size()]); + } + + @Override + public CarbonRow convert(CarbonRow row) throws CarbonDataLoadingException { + for (int i = 0; i < fieldConverters.length; i++) { + fieldConverters[i].convert(row); + } + return row; + } + + @Override + public void finish() { + List<Integer> dimCardinality = new ArrayList<>(); + for (int i = 0; i < fieldConverters.length; i++) { + if (fieldConverters[i] instanceof AbstractDictionaryFieldConverterImpl) { + dimCardinality.add( + ((AbstractDictionaryFieldConverterImpl) fieldConverters[i]).getColumnCardinality()); + } + } + int[] cardinality = new int[dimCardinality.size()]; + for (int i = 0; i < dimCardinality.size(); i++) { + cardinality[i] = dimCardinality.get(i); + } + // Set the cardinality to configuration, it will be used by further step for mdk key. + configuration.setDataLoadProperty(DataLoadProcessorConstants.DIMENSION_LENGTHS, cardinality); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionary.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionary.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionary.java index dcf5d5e..d2900b6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionary.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionary.java @@ -21,6 +21,7 @@ package org.apache.carbondata.processing.newflow.dictionary; import java.util.Map; +import org.apache.carbondata.core.devapi.DictionaryGenerationException; import org.apache.carbondata.core.devapi.DictionaryGenerator; import org.apache.carbondata.core.devapi.GeneratingBiDictionary; @@ -47,9 +48,10 @@ public class InMemBiDictionary<K, V> extends GeneratingBiDictionary<K, V> { public InMemBiDictionary(Map<K, V> preCreatedDictionary) { super(new DictionaryGenerator<K, V>() { @Override - public K generateKey(V value) throws Exception { + public K generateKey(V value) throws DictionaryGenerationException { // Since dictionary is provided by preCreated, normally it should not come here - throw new Exception("encounter new dictionary value in pre-created dictionary:" + value); + throw new DictionaryGenerationException( + "encounter new dictionary value in pre-created dictionary:" + value); } }); biMap = HashBiMap.create(preCreatedDictionary); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/PreCreatedDictionary.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/PreCreatedDictionary.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/PreCreatedDictionary.java index 900d3e5..f807a81 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/PreCreatedDictionary.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/PreCreatedDictionary.java @@ -21,6 +21,7 @@ package org.apache.carbondata.processing.newflow.dictionary; import org.apache.carbondata.core.cache.dictionary.Dictionary; import org.apache.carbondata.core.devapi.BiDictionary; +import org.apache.carbondata.core.devapi.DictionaryGenerationException; public class PreCreatedDictionary implements BiDictionary<Integer, String> { @@ -31,7 +32,7 @@ public class PreCreatedDictionary implements BiDictionary<Integer, String> { } @Override - public Integer getOrGenerateKey(String value) throws Exception { + public Integer getOrGenerateKey(String value) throws DictionaryGenerationException { Integer key = getKey(value); if (key == null) { throw new UnsupportedOperationException("trying to add new entry in PreCreatedDictionary"); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java index c26e2de..242185d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java @@ -21,7 +21,7 @@ package org.apache.carbondata.processing.newflow.exception; import java.util.Locale; -public class CarbonDataLoadingException extends Exception { +public class CarbonDataLoadingException extends RuntimeException { /** * default serial version ID. */ http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java new file mode 100644 index 0000000..582b8c1 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.processing.newflow.steps; + +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep; +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration; +import org.apache.carbondata.processing.newflow.DataField; +import org.apache.carbondata.processing.newflow.converter.RowConverter; +import org.apache.carbondata.processing.newflow.converter.impl.RowConverterImpl; +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; +import org.apache.carbondata.processing.newflow.row.CarbonRow; + +/** + * Replace row data fields with dictionary values if column is configured dictionary encoded. + * And nondictionary columns as well as complex columns will be converted to byte[]. + */ +public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorStep { + + private RowConverter encoder; + + public DataConverterProcessorStepImpl(CarbonDataLoadConfiguration configuration, + AbstractDataLoadProcessorStep child) { + super(configuration, child); + } + + @Override + public DataField[] getOutput() { + return child.getOutput(); + } + + @Override + public void initialize() throws CarbonDataLoadingException { + encoder = new RowConverterImpl(child.getOutput(), configuration); + child.initialize(); + } + + @Override + protected CarbonRow processRow(CarbonRow row) { + return encoder.convert(row); + } + + @Override + public void close() { + super.close(); + if (encoder != null) { + encoder.finish(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/processing/src/test/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionaryTest.java ---------------------------------------------------------------------- diff --git a/processing/src/test/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionaryTest.java b/processing/src/test/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionaryTest.java index 6ceea7e..253c97e 100644 --- a/processing/src/test/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionaryTest.java +++ b/processing/src/test/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionaryTest.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.carbondata.core.devapi.BiDictionary; +import org.apache.carbondata.core.devapi.DictionaryGenerationException; import org.apache.carbondata.core.devapi.DictionaryGenerator; import org.junit.Assert; import org.junit.Test; @@ -58,7 +59,7 @@ public class InMemBiDictionaryTest { new DictionaryGenerator<Integer, String>() { int sequence = 1; @Override - public Integer generateKey(String value) throws Exception { + public Integer generateKey(String value) throws DictionaryGenerationException { return sequence++; } });