Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 7bd734767 -> fdc9454d9


Added Encoder processor for dataloding.

Fixed review comments

Fixed review comments

Rebased and fixed comments.

Fixed comments.

Fixed compilation issue

Fixed checkstyle issue

Fixed compile error after merging of PR 247


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/d96f09a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/d96f09a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/d96f09a2

Branch: refs/heads/master
Commit: d96f09a2bec50d8c78e815420444defedab9c039
Parents: 7bd7347
Author: ravipesala <ravi.pes...@gmail.com>
Authored: Mon Oct 17 10:33:35 2016 +0530
Committer: jackylk <jacky.li...@huawei.com>
Committed: Tue Oct 25 23:39:52 2016 +0800

----------------------------------------------------------------------
 .../carbondata/core/devapi/BiDictionary.java    |  2 +-
 .../devapi/DictionaryGenerationException.java   | 69 ++++++++++++++
 .../core/devapi/DictionaryGenerator.java        |  2 +-
 .../core/devapi/GeneratingBiDictionary.java     |  2 +-
 .../newflow/converter/FieldConverter.java       | 36 ++++++++
 .../newflow/converter/RowConverter.java         | 32 +++++++
 .../AbstractDictionaryFieldConverterImpl.java   | 27 ++++++
 .../impl/ComplexFieldConverterImpl.java         | 30 ++++++
 .../impl/DictionaryFieldConverterImpl.java      | 74 +++++++++++++++
 .../DirectDictionaryFieldConverterImpl.java     | 50 ++++++++++
 .../converter/impl/FieldEncoderFactory.java     | 69 ++++++++++++++
 .../impl/NonDictionaryFieldConverterImpl.java   | 52 +++++++++++
 .../converter/impl/RowConverterImpl.java        | 96 ++++++++++++++++++++
 .../newflow/dictionary/InMemBiDictionary.java   |  6 +-
 .../dictionary/PreCreatedDictionary.java        |  3 +-
 .../exception/CarbonDataLoadingException.java   |  2 +-
 .../steps/DataConverterProcessorStepImpl.java   | 66 ++++++++++++++
 .../dictionary/InMemBiDictionaryTest.java       |  3 +-
 18 files changed, 613 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/core/src/main/java/org/apache/carbondata/core/devapi/BiDictionary.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/devapi/BiDictionary.java 
b/core/src/main/java/org/apache/carbondata/core/devapi/BiDictionary.java
index ffaba52..051fb9e 100644
--- a/core/src/main/java/org/apache/carbondata/core/devapi/BiDictionary.java
+++ b/core/src/main/java/org/apache/carbondata/core/devapi/BiDictionary.java
@@ -27,7 +27,7 @@ public interface BiDictionary<K, V> {
    * @param value dictionary value
    * @return dictionary key
    */
-  K getOrGenerateKey(V value) throws Exception;
+  K getOrGenerateKey(V value) throws DictionaryGenerationException;
 
   /**
    * Get the dictionary key corresponding to the input value, return null if 
value is not exist in

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/core/src/main/java/org/apache/carbondata/core/devapi/DictionaryGenerationException.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/devapi/DictionaryGenerationException.java
 
b/core/src/main/java/org/apache/carbondata/core/devapi/DictionaryGenerationException.java
new file mode 100644
index 0000000..47f43ee
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/devapi/DictionaryGenerationException.java
@@ -0,0 +1,69 @@
+package org.apache.carbondata.core.devapi;
+
+import java.util.Locale;
+
+public class DictionaryGenerationException extends Exception {
+  /**
+   * default serial version ID.
+   */
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * The Error message.
+   */
+  private String msg = "";
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public DictionaryGenerationException(String msg) {
+    super(msg);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public DictionaryGenerationException(String msg, Throwable t) {
+    super(msg, t);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param t
+   */
+  public DictionaryGenerationException(Throwable t) {
+    super(t);
+  }
+
+  /**
+   * This method is used to get the localized message.
+   *
+   * @param locale - A Locale object represents a specific geographical,
+   *               political, or cultural region.
+   * @return - Localized error message.
+   */
+  public String getLocalizedMessage(Locale locale) {
+    return "";
+  }
+
+  /**
+   * getLocalizedMessage
+   */
+  @Override public String getLocalizedMessage() {
+    return super.getLocalizedMessage();
+  }
+
+  /**
+   * getMessage
+   */
+  public String getMessage() {
+    return this.msg;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/core/src/main/java/org/apache/carbondata/core/devapi/DictionaryGenerator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/devapi/DictionaryGenerator.java 
b/core/src/main/java/org/apache/carbondata/core/devapi/DictionaryGenerator.java
index 99f430e..a6dd763 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/devapi/DictionaryGenerator.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/devapi/DictionaryGenerator.java
@@ -27,5 +27,5 @@ public interface DictionaryGenerator<K, V> {
    * @return dictionary key
    * @throws Exception any exception
    */
-  K generateKey(V value) throws Exception;
+  K generateKey(V value) throws DictionaryGenerationException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/core/src/main/java/org/apache/carbondata/core/devapi/GeneratingBiDictionary.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/devapi/GeneratingBiDictionary.java
 
b/core/src/main/java/org/apache/carbondata/core/devapi/GeneratingBiDictionary.java
index 47424b4..9194c84 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/devapi/GeneratingBiDictionary.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/devapi/GeneratingBiDictionary.java
@@ -28,7 +28,7 @@ public abstract class GeneratingBiDictionary<K, V> implements 
BiDictionary<K, V>
   }
 
   @Override
-  public K getOrGenerateKey(V value) throws Exception {
+  public K getOrGenerateKey(V value) throws DictionaryGenerationException {
     K key = getKey(value);
     if (key != null) {
       return key;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/FieldConverter.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/FieldConverter.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/FieldConverter.java
new file mode 100644
index 0000000..e304fbc
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/FieldConverter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.newflow.converter;
+
+import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+
+/**
+ * This interface converts/transforms the column field.
+ */
+public interface FieldConverter {
+
+  /**
+   * It converts the column field and updates the data in same location/index 
in row.
+   * @param row
+   * @throws CarbonDataLoadingException
+   */
+  void convert(CarbonRow row) throws CarbonDataLoadingException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java
new file mode 100644
index 0000000..44f1116
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.newflow.converter;
+
+import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+
+/**
+ * convert the row
+ */
+public interface RowConverter {
+
+  CarbonRow convert(CarbonRow row) throws CarbonDataLoadingException;
+
+  void finish();
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/AbstractDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/AbstractDictionaryFieldConverterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/AbstractDictionaryFieldConverterImpl.java
new file mode 100644
index 0000000..70a900c
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/AbstractDictionaryFieldConverterImpl.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.newflow.converter.impl;
+
+import org.apache.carbondata.processing.newflow.converter.FieldConverter;
+
+public abstract class AbstractDictionaryFieldConverterImpl implements 
FieldConverter {
+
+  public abstract int getColumnCardinality();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
new file mode 100644
index 0000000..4c18aa7
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.newflow.converter.impl;
+
+import org.apache.carbondata.processing.newflow.converter.FieldConverter;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+
+public class ComplexFieldConverterImpl implements FieldConverter {
+
+  @Override
+  public void convert(CarbonRow row) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
new file mode 100644
index 0000000..8ca4ff2
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.newflow.converter.impl;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import 
org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.devapi.BiDictionary;
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
+import org.apache.carbondata.core.util.CarbonUtilException;
+import org.apache.carbondata.processing.newflow.DataField;
+import 
org.apache.carbondata.processing.newflow.dictionary.PreCreatedDictionary;
+import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+
+public class DictionaryFieldConverterImpl extends 
AbstractDictionaryFieldConverterImpl {
+
+  private static final LogService LOGGER =
+      
LogServiceFactory.getLogService(DictionaryFieldConverterImpl.class.getName());
+
+  private BiDictionary<Integer, String> dictionaryGenerator;
+
+  private int index;
+
+  public DictionaryFieldConverterImpl(DataField dataField,
+      Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
+      CarbonTableIdentifier carbonTableIdentifier, int index) {
+    this.index = index;
+    DictionaryColumnUniqueIdentifier identifier =
+        new DictionaryColumnUniqueIdentifier(carbonTableIdentifier,
+            dataField.getColumn().getColumnIdentifier(), 
dataField.getColumn().getDataType());
+    try {
+      Dictionary dictionary = cache.get(identifier);
+      dictionaryGenerator = new PreCreatedDictionary(dictionary);
+    } catch (CarbonUtilException e) {
+      LOGGER.error(e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void convert(CarbonRow row) throws CarbonDataLoadingException {
+    try {
+      row.update(dictionaryGenerator.getOrGenerateKey(row.getString(index)), 
index);
+    } catch (DictionaryGenerationException e) {
+      throw new CarbonDataLoadingException(e);
+    }
+  }
+
+  @Override
+  public int getColumnCardinality() {
+    return dictionaryGenerator.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java
new file mode 100644
index 0000000..8ff110a
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.newflow.converter.impl;
+
+import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import 
org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.processing.newflow.DataField;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+
+public class DirectDictionaryFieldConverterImpl extends 
AbstractDictionaryFieldConverterImpl {
+
+  private DirectDictionaryGenerator directDictionaryGenerator;
+
+  private int index;
+
+  public DirectDictionaryFieldConverterImpl(DataField dataField, int index) {
+    DirectDictionaryGenerator directDictionaryGenerator =
+        DirectDictionaryKeyGeneratorFactory
+            .getDirectDictionaryGenerator(dataField.getColumn().getDataType());
+    this.directDictionaryGenerator = directDictionaryGenerator;
+    this.index = index;
+  }
+
+  @Override
+  public void convert(CarbonRow row) {
+    
row.update(directDictionaryGenerator.generateDirectSurrogateKey(row.getString(index)),
 index);
+  }
+
+  @Override
+  public int getColumnCardinality() {
+    return Integer.MAX_VALUE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
new file mode 100644
index 0000000..a10ad20
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.newflow.converter.impl;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import 
org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.apache.carbondata.processing.newflow.DataField;
+import org.apache.carbondata.processing.newflow.converter.FieldConverter;
+
+public class FieldEncoderFactory {
+
+  private static FieldEncoderFactory instance;
+
+  private FieldEncoderFactory() {
+
+  }
+
+  public static FieldEncoderFactory getInstance() {
+    if (instance == null) {
+      instance = new FieldEncoderFactory();
+    }
+    return instance;
+  }
+
+  /**
+   * Creates the FieldConverter for all dimensions, for measures return null.
+   * @param dataField column schema
+   * @param cache dicionary cache.
+   * @param carbonTableIdentifier table identifier
+   * @param index index of column in the row.
+   * @return
+   */
+  public FieldConverter createFieldEncoder(DataField dataField,
+      Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
+      CarbonTableIdentifier carbonTableIdentifier, int index) {
+    // Converters are only needed for dimensions and measures it return null.
+    if (dataField.getColumn().isDimesion()) {
+      if (dataField.getColumn().hasEncoding(Encoding.DICTIONARY)) {
+        return new DictionaryFieldConverterImpl(dataField, cache, 
carbonTableIdentifier, index);
+      } else if 
(dataField.getColumn().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+        return new DirectDictionaryFieldConverterImpl(dataField, index);
+      } else if (dataField.getColumn().isComplex()) {
+        return new ComplexFieldConverterImpl();
+      } else {
+        return new NonDictionaryFieldConverterImpl(dataField, index);
+      }
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
new file mode 100644
index 0000000..9540907
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.newflow.converter.impl;
+
+import java.nio.charset.Charset;
+
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.newflow.DataField;
+import org.apache.carbondata.processing.newflow.converter.FieldConverter;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+
+public class NonDictionaryFieldConverterImpl implements FieldConverter {
+
+  private DataType dataType;
+
+  private int index;
+
+  public NonDictionaryFieldConverterImpl(DataField dataField, int index) {
+    this.dataType = dataField.getColumn().getDataType();
+    this.index = index;
+  }
+
+  @Override
+  public void convert(CarbonRow row) {
+    String dimensionValue = row.getString(index);
+    if (dataType != DataType.STRING) {
+      if (null == DataTypeUtil.normalizeIntAndLongValues(dimensionValue, 
dataType)) {
+        dimensionValue = CarbonCommonConstants.MEMBER_DEFAULT_VAL;
+      }
+    }
+    
row.update(dimensionValue.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)),
+        index);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
new file mode 100644
index 0000000..78b9290
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.newflow.converter.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import 
org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.newflow.DataField;
+import 
org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.newflow.converter.FieldConverter;
+import org.apache.carbondata.processing.newflow.converter.RowConverter;
+import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+
+/**
+ * It converts the complete row if necessary, dictionary columns are encoded 
with dictionary values
+ * and nondictionary values are converted to binary.
+ */
+public class RowConverterImpl implements RowConverter {
+
+  private CarbonDataLoadConfiguration configuration;
+
+  private FieldConverter[] fieldConverters;
+
+  public RowConverterImpl(DataField[] fields, CarbonDataLoadConfiguration 
configuration) {
+    this.configuration = configuration;
+    CacheProvider cacheProvider = CacheProvider.getInstance();
+    Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache =
+        cacheProvider.createCache(CacheType.REVERSE_DICTIONARY,
+            configuration.getTableIdentifier().getStorePath());
+    List<FieldConverter> fieldConverterList = new ArrayList<>();
+
+    long lruCacheStartTime = System.currentTimeMillis();
+
+    for (int i = 0; i < fields.length; i++) {
+      FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
+          .createFieldEncoder(fields[i], cache,
+              configuration.getTableIdentifier().getCarbonTableIdentifier(), 
i);
+      if (fieldConverter != null) {
+        fieldConverterList.add(fieldConverter);
+      }
+    }
+    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+        .recordLruCacheLoadTime((System.currentTimeMillis() - 
lruCacheStartTime) / 1000.0);
+    fieldConverters = fieldConverterList.toArray(new 
FieldConverter[fieldConverterList.size()]);
+  }
+
+  @Override
+  public CarbonRow convert(CarbonRow row) throws CarbonDataLoadingException {
+    for (int i = 0; i < fieldConverters.length; i++) {
+      fieldConverters[i].convert(row);
+    }
+    return row;
+  }
+
+  @Override
+  public void finish() {
+    List<Integer> dimCardinality = new ArrayList<>();
+    for (int i = 0; i < fieldConverters.length; i++) {
+      if (fieldConverters[i] instanceof AbstractDictionaryFieldConverterImpl) {
+        dimCardinality.add(
+            ((AbstractDictionaryFieldConverterImpl) 
fieldConverters[i]).getColumnCardinality());
+      }
+    }
+    int[] cardinality = new int[dimCardinality.size()];
+    for (int i = 0; i < dimCardinality.size(); i++) {
+      cardinality[i] = dimCardinality.get(i);
+    }
+    // Set the cardinality to configuration, it will be used by further step 
for mdk key.
+    
configuration.setDataLoadProperty(DataLoadProcessorConstants.DIMENSION_LENGTHS, 
cardinality);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionary.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionary.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionary.java
index dcf5d5e..d2900b6 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionary.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionary.java
@@ -21,6 +21,7 @@ package org.apache.carbondata.processing.newflow.dictionary;
 
 import java.util.Map;
 
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.devapi.DictionaryGenerator;
 import org.apache.carbondata.core.devapi.GeneratingBiDictionary;
 
@@ -47,9 +48,10 @@ public class InMemBiDictionary<K, V> extends 
GeneratingBiDictionary<K, V> {
   public InMemBiDictionary(Map<K, V> preCreatedDictionary) {
     super(new DictionaryGenerator<K, V>() {
       @Override
-      public K generateKey(V value) throws Exception {
+      public K generateKey(V value) throws DictionaryGenerationException {
         // Since dictionary is provided by preCreated, normally it should not 
come here
-        throw new Exception("encounter new dictionary value in pre-created 
dictionary:" + value);
+        throw new DictionaryGenerationException(
+            "encounter new dictionary value in pre-created dictionary:" + 
value);
       }
     });
     biMap = HashBiMap.create(preCreatedDictionary);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/PreCreatedDictionary.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/PreCreatedDictionary.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/PreCreatedDictionary.java
index 900d3e5..f807a81 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/PreCreatedDictionary.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/PreCreatedDictionary.java
@@ -21,6 +21,7 @@ package org.apache.carbondata.processing.newflow.dictionary;
 
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.devapi.BiDictionary;
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 
 public class PreCreatedDictionary implements BiDictionary<Integer, String> {
 
@@ -31,7 +32,7 @@ public class PreCreatedDictionary implements 
BiDictionary<Integer, String> {
   }
 
   @Override
-  public Integer getOrGenerateKey(String value) throws Exception {
+  public Integer getOrGenerateKey(String value) throws 
DictionaryGenerationException {
     Integer key = getKey(value);
     if (key == null) {
       throw new UnsupportedOperationException("trying to add new entry in 
PreCreatedDictionary");

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java
index c26e2de..242185d 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java
@@ -21,7 +21,7 @@ package org.apache.carbondata.processing.newflow.exception;
 
 import java.util.Locale;
 
-public class CarbonDataLoadingException extends Exception {
+public class CarbonDataLoadingException extends RuntimeException {
   /**
    * default serial version ID.
    */

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
new file mode 100644
index 0000000..582b8c1
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.newflow.steps;
+
+import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
+import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.newflow.DataField;
+import org.apache.carbondata.processing.newflow.converter.RowConverter;
+import 
org.apache.carbondata.processing.newflow.converter.impl.RowConverterImpl;
+import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+
+/**
+ * Replace row data fields with dictionary values if column is configured 
dictionary encoded.
+ * And nondictionary columns as well as complex columns will be converted to 
byte[].
+ */
+public class DataConverterProcessorStepImpl extends 
AbstractDataLoadProcessorStep {
+
+  private RowConverter encoder;
+
+  public DataConverterProcessorStepImpl(CarbonDataLoadConfiguration 
configuration,
+      AbstractDataLoadProcessorStep child) {
+    super(configuration, child);
+  }
+
+  @Override
+  public DataField[] getOutput() {
+    return child.getOutput();
+  }
+
+  @Override
+  public void initialize() throws CarbonDataLoadingException {
+    encoder = new RowConverterImpl(child.getOutput(), configuration);
+    child.initialize();
+  }
+
+  @Override
+  protected CarbonRow processRow(CarbonRow row) {
+    return encoder.convert(row);
+  }
+
+  @Override
+  public void close() {
+    super.close();
+    if (encoder != null) {
+      encoder.finish();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d96f09a2/processing/src/test/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionaryTest.java
----------------------------------------------------------------------
diff --git 
a/processing/src/test/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionaryTest.java
 
b/processing/src/test/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionaryTest.java
index 6ceea7e..253c97e 100644
--- 
a/processing/src/test/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionaryTest.java
+++ 
b/processing/src/test/java/org/apache/carbondata/processing/newflow/dictionary/InMemBiDictionaryTest.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.carbondata.core.devapi.BiDictionary;
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.devapi.DictionaryGenerator;
 import org.junit.Assert;
 import org.junit.Test;
@@ -58,7 +59,7 @@ public class InMemBiDictionaryTest {
         new DictionaryGenerator<Integer, String>() {
           int sequence = 1;
           @Override
-          public Integer generateKey(String value) throws Exception {
+          public Integer generateKey(String value) throws 
DictionaryGenerationException {
             return sequence++;
           }
         });


Reply via email to