[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

2016-10-21 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/240#discussion_r84485160
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
 ---
@@ -33,4 +33,8 @@
   public static final String COMPLEX_DELIMITERS = "COMPLEX_DELIMITERS";
 
   public static final String DIMENSION_LENGTHS = "DIMENSION_LENGTHS";
+
--- End diff --

ok. we should separate options expose to user. let's do it in future PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

2016-10-21 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/240#discussion_r84476341
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java
 ---
@@ -0,0 +1,171 @@
+package org.apache.carbondata.processing.newflow.steps.input;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import 
org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
+import 
org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.newflow.DataField;
+import 
org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
+import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
+import org.apache.carbondata.processing.newflow.parser.GenericParser;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+
+/**
+ * It reads data from record reader and sends data to next step.
+ */
+public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
+
+  private static final LogService LOGGER =
+  
LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
+
+  private GenericParser[] genericParsers;
+
+  private List> inputIterators;
+
+  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
+  AbstractDataLoadProcessorStep child, List> 
inputIterators) {
+super(configuration, child);
+this.inputIterators = inputIterators;
+  }
+
+  @Override public DataField[] getOutput() {
+DataField[] fields = configuration.getDataFields();
+String[] header = configuration.getHeader();
+DataField[] output = new DataField[fields.length];
+int k = 0;
+for (int i = 0; i < header.length; i++) {
+  for (int j = 0; j < fields.length; j++) {
+if 
(header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
+  output[k++] = fields[j];
+  break;
+}
+  }
+}
+return output;
+  }
+
+  @Override public void initialize() throws CarbonDataLoadingException {
+DataField[] output = getOutput();
+genericParsers = new GenericParser[output.length];
+for (int i = 0; i < genericParsers.length; i++) {
+  genericParsers[i] = 
CarbonParserFactory.createParser(output[i].getColumn(),
+  (String[]) configuration
+  
.getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
+}
+  }
+
+  private int getNumberOfCores() {
+int numberOfCores;
+try {
+  numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
+  .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
+  CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
+} catch (NumberFormatException exc) {
+  numberOfCores = 
Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+}
+return numberOfCores;
+  }
+
+  private int getBatchSize() {
+int batchSize;
+try {
+  batchSize = Integer.parseInt(configuration
+  
.getDataLoadProperty(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE,
+  
DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT).toString());
+} catch (NumberFormatException exc) {
+  batchSize = 
Integer.parseInt(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT);
+}
+return batchSize;
+  }
+
+  @Override public Iterator[] execute() {
+int batchSize = getBatchSize();
+List>[] readerIterators = 
partitionInputReaderIterators();
+Iterator[] outIterators = new 
Iterator[readerIterators.length];
+for (int i = 0; i < outIterators.length; i++) {
+  outIterators[i] = new InputProcessorIterator(readerIterators[i], 
genericParsers, batchSize);
+}
+return outIterators;
+  }
+
+  private List>[] partitionInputReaderIterators() {
+int numberOfCores = getNumberOfCores();
+if (inputIterators.size() < numberOfCores) {
+  numberOfCores = inputIterators.size();
+}
+

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

2016-10-21 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/240#discussion_r84475851
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
 ---
@@ -33,4 +33,8 @@
   public static final String COMPLEX_DELIMITERS = "COMPLEX_DELIMITERS";
 
   public static final String DIMENSION_LENGTHS = "DIMENSION_LENGTHS";
+
--- End diff --

May be we can refactor this later


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

2016-10-21 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/240#discussion_r84475566
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/parser/GenericParser.java
 ---
@@ -0,0 +1,22 @@
+package org.apache.carbondata.processing.newflow.parser;
+
+/**
+ * Parse the data according to implementation, The implementation classes 
can be struct, array or
+ * map datatypes.
+ */
+public interface GenericParser {
+
+  /**
+   * Parse the data as per the delimiter
+   * @param data
+   * @return
+   */
+  E parse(String data);
+
+  /**
+   * Children of the parser.
+   * @param parser
+   */
+  void addChildren(GenericParser parser);
--- End diff --

Yes, added new interface ComplexParser that extends GenericParser.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

2016-10-21 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/240#discussion_r84475299
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java
 ---
@@ -114,11 +114,15 @@ protected CarbonRowBatch 
processRowBatch(CarbonRowBatch rowBatch) {
   /**
* It is called when task is called successfully.
*/
-  public abstract void finish();
+  public void finish() {
+// implementation classes can override to update the status.
+  }
 
   /**
* Closing of resources after step execution can be done here.
*/
-  public abstract void close();
+  public void close() {
+// implementation classes can override to close the resources if any 
available.
--- End diff --

Removed `finish` method and kept only `close`, so for all cases `close` 
need to be called.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

2016-10-20 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/240#discussion_r84226906
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java
 ---
@@ -114,11 +114,15 @@ protected CarbonRowBatch 
processRowBatch(CarbonRowBatch rowBatch) {
   /**
* It is called when task is called successfully.
*/
-  public abstract void finish();
+  public void finish() {
+// implementation classes can override to update the status.
+  }
 
   /**
* Closing of resources after step execution can be done here.
*/
-  public abstract void close();
+  public void close() {
+// implementation classes can override to close the resources if any 
available.
--- End diff --

The comment is not so clear, is this called in case of failure?
in `finish()`, resources also need to be released, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

2016-10-20 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/240#discussion_r84227253
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/complexobjects/ArrayObject.java
 ---
@@ -0,0 +1,18 @@
+package org.apache.carbondata.processing.newflow.complexobjects;
--- End diff --

please add license header for all files


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

2016-10-20 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/240#discussion_r84234651
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java
 ---
@@ -0,0 +1,171 @@
+package org.apache.carbondata.processing.newflow.steps.input;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import 
org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
+import 
org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.newflow.DataField;
+import 
org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
+import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
+import org.apache.carbondata.processing.newflow.parser.GenericParser;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+
+/**
+ * It reads data from record reader and sends data to next step.
+ */
+public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
+
+  private static final LogService LOGGER =
+  
LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
+
+  private GenericParser[] genericParsers;
+
+  private List> inputIterators;
+
+  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
+  AbstractDataLoadProcessorStep child, List> 
inputIterators) {
+super(configuration, child);
+this.inputIterators = inputIterators;
+  }
+
+  @Override public DataField[] getOutput() {
+DataField[] fields = configuration.getDataFields();
+String[] header = configuration.getHeader();
+DataField[] output = new DataField[fields.length];
+int k = 0;
+for (int i = 0; i < header.length; i++) {
+  for (int j = 0; j < fields.length; j++) {
+if 
(header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
+  output[k++] = fields[j];
+  break;
+}
+  }
+}
+return output;
+  }
+
+  @Override public void initialize() throws CarbonDataLoadingException {
+DataField[] output = getOutput();
+genericParsers = new GenericParser[output.length];
+for (int i = 0; i < genericParsers.length; i++) {
+  genericParsers[i] = 
CarbonParserFactory.createParser(output[i].getColumn(),
+  (String[]) configuration
+  
.getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
+}
+  }
+
+  private int getNumberOfCores() {
+int numberOfCores;
+try {
+  numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
+  .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
+  CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
+} catch (NumberFormatException exc) {
+  numberOfCores = 
Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+}
+return numberOfCores;
+  }
+
+  private int getBatchSize() {
+int batchSize;
+try {
+  batchSize = Integer.parseInt(configuration
+  
.getDataLoadProperty(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE,
+  
DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT).toString());
+} catch (NumberFormatException exc) {
+  batchSize = 
Integer.parseInt(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT);
+}
+return batchSize;
+  }
+
+  @Override public Iterator[] execute() {
+int batchSize = getBatchSize();
+List>[] readerIterators = 
partitionInputReaderIterators();
+Iterator[] outIterators = new 
Iterator[readerIterators.length];
+for (int i = 0; i < outIterators.length; i++) {
+  outIterators[i] = new InputProcessorIterator(readerIterators[i], 
genericParsers, batchSize);
+}
+return outIterators;
+  }
+
+  private List>[] partitionInputReaderIterators() {
+int numberOfCores = getNumberOfCores();
+if (inputIterators.size() < numberOfCores) {
+  numberOfCores = inputIterators.size();
+}
+List>[] 

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

2016-10-20 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/240#discussion_r84230036
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java
 ---
@@ -0,0 +1,171 @@
+package org.apache.carbondata.processing.newflow.steps.input;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import 
org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
+import 
org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.newflow.DataField;
+import 
org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
+import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
+import org.apache.carbondata.processing.newflow.parser.GenericParser;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+
+/**
+ * It reads data from record reader and sends data to next step.
+ */
+public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
+
+  private static final LogService LOGGER =
+  
LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
+
+  private GenericParser[] genericParsers;
+
+  private List> inputIterators;
+
+  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
+  AbstractDataLoadProcessorStep child, List> 
inputIterators) {
+super(configuration, child);
+this.inputIterators = inputIterators;
+  }
+
+  @Override public DataField[] getOutput() {
+DataField[] fields = configuration.getDataFields();
+String[] header = configuration.getHeader();
+DataField[] output = new DataField[fields.length];
+int k = 0;
+for (int i = 0; i < header.length; i++) {
+  for (int j = 0; j < fields.length; j++) {
+if 
(header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
+  output[k++] = fields[j];
+  break;
+}
+  }
+}
+return output;
+  }
+
+  @Override public void initialize() throws CarbonDataLoadingException {
+DataField[] output = getOutput();
+genericParsers = new GenericParser[output.length];
+for (int i = 0; i < genericParsers.length; i++) {
+  genericParsers[i] = 
CarbonParserFactory.createParser(output[i].getColumn(),
+  (String[]) configuration
+  
.getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
+}
+  }
+
+  private int getNumberOfCores() {
+int numberOfCores;
+try {
+  numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
+  .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
+  CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
+} catch (NumberFormatException exc) {
+  numberOfCores = 
Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+}
+return numberOfCores;
+  }
+
+  private int getBatchSize() {
+int batchSize;
+try {
+  batchSize = Integer.parseInt(configuration
--- End diff --

move `configuration` to next line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

2016-10-20 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/240#discussion_r84233387
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java
 ---
@@ -0,0 +1,171 @@
+package org.apache.carbondata.processing.newflow.steps.input;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import 
org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
+import 
org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.newflow.DataField;
+import 
org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
+import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
+import org.apache.carbondata.processing.newflow.parser.GenericParser;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+
+/**
+ * It reads data from record reader and sends data to next step.
+ */
+public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
+
+  private static final LogService LOGGER =
+  
LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
+
+  private GenericParser[] genericParsers;
+
+  private List> inputIterators;
+
+  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
+  AbstractDataLoadProcessorStep child, List> 
inputIterators) {
+super(configuration, child);
+this.inputIterators = inputIterators;
+  }
+
+  @Override public DataField[] getOutput() {
+DataField[] fields = configuration.getDataFields();
+String[] header = configuration.getHeader();
+DataField[] output = new DataField[fields.length];
+int k = 0;
+for (int i = 0; i < header.length; i++) {
+  for (int j = 0; j < fields.length; j++) {
+if 
(header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
+  output[k++] = fields[j];
+  break;
+}
+  }
+}
+return output;
+  }
+
+  @Override public void initialize() throws CarbonDataLoadingException {
+DataField[] output = getOutput();
+genericParsers = new GenericParser[output.length];
+for (int i = 0; i < genericParsers.length; i++) {
+  genericParsers[i] = 
CarbonParserFactory.createParser(output[i].getColumn(),
+  (String[]) configuration
+  
.getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
+}
+  }
+
+  private int getNumberOfCores() {
+int numberOfCores;
+try {
+  numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
+  .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
+  CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
+} catch (NumberFormatException exc) {
+  numberOfCores = 
Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+}
+return numberOfCores;
+  }
+
+  private int getBatchSize() {
+int batchSize;
+try {
+  batchSize = Integer.parseInt(configuration
+  
.getDataLoadProperty(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE,
+  
DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT).toString());
+} catch (NumberFormatException exc) {
+  batchSize = 
Integer.parseInt(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT);
+}
+return batchSize;
+  }
+
+  @Override public Iterator[] execute() {
+int batchSize = getBatchSize();
+List>[] readerIterators = 
partitionInputReaderIterators();
+Iterator[] outIterators = new 
Iterator[readerIterators.length];
+for (int i = 0; i < outIterators.length; i++) {
+  outIterators[i] = new InputProcessorIterator(readerIterators[i], 
genericParsers, batchSize);
+}
+return outIterators;
+  }
+
+  private List>[] partitionInputReaderIterators() {
+int numberOfCores = getNumberOfCores();
+if (inputIterators.size() < numberOfCores) {
+  numberOfCores = inputIterators.size();
+}
+List>[] 

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

2016-10-20 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/240#discussion_r84227656
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/parser/CarbonParserFactory.java
 ---
@@ -0,0 +1,40 @@
+package org.apache.carbondata.processing.newflow.parser;
+
+import java.util.List;
+
+import 
org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn;
+import 
org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
+import 
org.apache.carbondata.processing.newflow.parser.impl.ArrayParserImpl;
+import 
org.apache.carbondata.processing.newflow.parser.impl.PrimitiveParserImpl;
+import 
org.apache.carbondata.processing.newflow.parser.impl.StructParserImpl;
+
+public class CarbonParserFactory {
+
+  public static GenericParser createParser(CarbonColumn carbonColumn, 
String[] complexDelimiters) {
+return createParser(carbonColumn, complexDelimiters, 0);
+  }
+
+  private static GenericParser createParser(CarbonColumn carbonColumn, 
String[] complexDelimiters,
--- End diff --

please give function description to these two `createParser` functions, it 
is not so clear that what the `counter` it is


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

2016-10-20 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/240#discussion_r84229694
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
 ---
@@ -33,4 +33,8 @@
   public static final String COMPLEX_DELIMITERS = "COMPLEX_DELIMITERS";
 
   public static final String DIMENSION_LENGTHS = "DIMENSION_LENGTHS";
+
--- End diff --

can you rename this class to a more meaningful name like `DataLoadOptions`. 
It will be exposed to user as data load options, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

2016-10-20 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/240#discussion_r84229327
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/complexobjects/StructObject.java
 ---
@@ -0,0 +1,19 @@
+package org.apache.carbondata.processing.newflow.complexobjects;
+
+public class StructObject {
+
+  private Object[] data;
+
+  public StructObject(Object[] data) {
+this.data = data;
+  }
+
+  public Object[] getData() {
+return data;
+  }
+
+  public void setData(Object[] data) {
--- End diff --

instead of just setting data, I think it is better to add function like 
`addMember(Object member)`, so that in `StructParserImpl` can make use of 
`addMember`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

2016-10-20 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/240#discussion_r84236268
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java
 ---
@@ -0,0 +1,171 @@
+package org.apache.carbondata.processing.newflow.steps.input;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import 
org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
+import 
org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.newflow.DataField;
+import 
org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
+import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
+import org.apache.carbondata.processing.newflow.parser.GenericParser;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+
+/**
+ * It reads data from record reader and sends data to next step.
+ */
+public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
+
+  private static final LogService LOGGER =
+  
LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
+
+  private GenericParser[] genericParsers;
+
+  private List> inputIterators;
+
+  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
+  AbstractDataLoadProcessorStep child, List> 
inputIterators) {
+super(configuration, child);
+this.inputIterators = inputIterators;
+  }
+
+  @Override public DataField[] getOutput() {
+DataField[] fields = configuration.getDataFields();
+String[] header = configuration.getHeader();
+DataField[] output = new DataField[fields.length];
+int k = 0;
+for (int i = 0; i < header.length; i++) {
+  for (int j = 0; j < fields.length; j++) {
+if 
(header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
+  output[k++] = fields[j];
+  break;
+}
+  }
+}
+return output;
+  }
+
+  @Override public void initialize() throws CarbonDataLoadingException {
+DataField[] output = getOutput();
+genericParsers = new GenericParser[output.length];
+for (int i = 0; i < genericParsers.length; i++) {
+  genericParsers[i] = 
CarbonParserFactory.createParser(output[i].getColumn(),
+  (String[]) configuration
+  
.getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
+}
+  }
+
+  private int getNumberOfCores() {
+int numberOfCores;
+try {
+  numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
+  .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
+  CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
+} catch (NumberFormatException exc) {
+  numberOfCores = 
Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+}
+return numberOfCores;
+  }
+
+  private int getBatchSize() {
+int batchSize;
+try {
+  batchSize = Integer.parseInt(configuration
+  
.getDataLoadProperty(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE,
+  
DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT).toString());
+} catch (NumberFormatException exc) {
+  batchSize = 
Integer.parseInt(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT);
+}
+return batchSize;
+  }
+
+  @Override public Iterator[] execute() {
+int batchSize = getBatchSize();
+List>[] readerIterators = 
partitionInputReaderIterators();
+Iterator[] outIterators = new 
Iterator[readerIterators.length];
+for (int i = 0; i < outIterators.length; i++) {
+  outIterators[i] = new InputProcessorIterator(readerIterators[i], 
genericParsers, batchSize);
+}
+return outIterators;
+  }
+
+  private List>[] partitionInputReaderIterators() {
+int numberOfCores = getNumberOfCores();
+if (inputIterators.size() < numberOfCores) {
+  numberOfCores = inputIterators.size();
+}
+List>[] 

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

2016-10-20 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/240#discussion_r84234179
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java
 ---
@@ -0,0 +1,171 @@
+package org.apache.carbondata.processing.newflow.steps.input;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import 
org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
+import 
org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.newflow.DataField;
+import 
org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
+import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
+import org.apache.carbondata.processing.newflow.parser.GenericParser;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+
+/**
+ * It reads data from record reader and sends data to next step.
+ */
+public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
+
+  private static final LogService LOGGER =
+  
LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
+
+  private GenericParser[] genericParsers;
+
+  private List> inputIterators;
+
+  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
+  AbstractDataLoadProcessorStep child, List> 
inputIterators) {
+super(configuration, child);
+this.inputIterators = inputIterators;
+  }
+
+  @Override public DataField[] getOutput() {
+DataField[] fields = configuration.getDataFields();
+String[] header = configuration.getHeader();
+DataField[] output = new DataField[fields.length];
+int k = 0;
+for (int i = 0; i < header.length; i++) {
+  for (int j = 0; j < fields.length; j++) {
+if 
(header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
+  output[k++] = fields[j];
+  break;
+}
+  }
+}
+return output;
+  }
+
+  @Override public void initialize() throws CarbonDataLoadingException {
+DataField[] output = getOutput();
+genericParsers = new GenericParser[output.length];
+for (int i = 0; i < genericParsers.length; i++) {
+  genericParsers[i] = 
CarbonParserFactory.createParser(output[i].getColumn(),
+  (String[]) configuration
+  
.getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
+}
+  }
+
+  private int getNumberOfCores() {
+int numberOfCores;
+try {
+  numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
+  .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
+  CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
+} catch (NumberFormatException exc) {
+  numberOfCores = 
Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+}
+return numberOfCores;
+  }
+
+  private int getBatchSize() {
+int batchSize;
+try {
+  batchSize = Integer.parseInt(configuration
+  
.getDataLoadProperty(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE,
+  
DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT).toString());
+} catch (NumberFormatException exc) {
+  batchSize = 
Integer.parseInt(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT);
+}
+return batchSize;
+  }
+
+  @Override public Iterator[] execute() {
+int batchSize = getBatchSize();
+List>[] readerIterators = 
partitionInputReaderIterators();
+Iterator[] outIterators = new 
Iterator[readerIterators.length];
+for (int i = 0; i < outIterators.length; i++) {
+  outIterators[i] = new InputProcessorIterator(readerIterators[i], 
genericParsers, batchSize);
--- End diff --

In this case, genericParser should be thread-safe, please add comment in 
`GenericParser` interface and ensure it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if 

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

2016-10-20 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/240#discussion_r84232050
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java
 ---
@@ -0,0 +1,171 @@
+package org.apache.carbondata.processing.newflow.steps.input;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import 
org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
+import 
org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.newflow.DataField;
+import 
org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
+import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
+import org.apache.carbondata.processing.newflow.parser.GenericParser;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+
+/**
+ * It reads data from record reader and sends data to next step.
+ */
+public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
+
+  private static final LogService LOGGER =
+  
LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
+
+  private GenericParser[] genericParsers;
+
+  private List> inputIterators;
+
+  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
+  AbstractDataLoadProcessorStep child, List> 
inputIterators) {
+super(configuration, child);
+this.inputIterators = inputIterators;
+  }
+
+  @Override public DataField[] getOutput() {
+DataField[] fields = configuration.getDataFields();
+String[] header = configuration.getHeader();
+DataField[] output = new DataField[fields.length];
+int k = 0;
+for (int i = 0; i < header.length; i++) {
+  for (int j = 0; j < fields.length; j++) {
+if 
(header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
+  output[k++] = fields[j];
+  break;
+}
+  }
+}
+return output;
+  }
+
+  @Override public void initialize() throws CarbonDataLoadingException {
+DataField[] output = getOutput();
+genericParsers = new GenericParser[output.length];
+for (int i = 0; i < genericParsers.length; i++) {
+  genericParsers[i] = 
CarbonParserFactory.createParser(output[i].getColumn(),
+  (String[]) configuration
+  
.getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
+}
+  }
+
+  private int getNumberOfCores() {
+int numberOfCores;
+try {
+  numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
+  .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
+  CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
+} catch (NumberFormatException exc) {
+  numberOfCores = 
Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+}
+return numberOfCores;
+  }
+
+  private int getBatchSize() {
--- End diff --

suggest to change to `CarbonProperties.loadProcessBatchSize()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

2016-10-20 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/240#discussion_r84233633
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java
 ---
@@ -0,0 +1,171 @@
+package org.apache.carbondata.processing.newflow.steps.input;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import 
org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
+import 
org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.newflow.DataField;
+import 
org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
+import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
+import org.apache.carbondata.processing.newflow.parser.GenericParser;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+
+/**
+ * It reads data from record reader and sends data to next step.
+ */
+public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
+
+  private static final LogService LOGGER =
+  
LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
+
+  private GenericParser[] genericParsers;
+
+  private List> inputIterators;
+
+  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
+  AbstractDataLoadProcessorStep child, List> 
inputIterators) {
+super(configuration, child);
+this.inputIterators = inputIterators;
+  }
+
+  @Override public DataField[] getOutput() {
+DataField[] fields = configuration.getDataFields();
+String[] header = configuration.getHeader();
+DataField[] output = new DataField[fields.length];
+int k = 0;
+for (int i = 0; i < header.length; i++) {
+  for (int j = 0; j < fields.length; j++) {
+if 
(header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
+  output[k++] = fields[j];
+  break;
+}
+  }
+}
+return output;
+  }
+
+  @Override public void initialize() throws CarbonDataLoadingException {
+DataField[] output = getOutput();
+genericParsers = new GenericParser[output.length];
+for (int i = 0; i < genericParsers.length; i++) {
+  genericParsers[i] = 
CarbonParserFactory.createParser(output[i].getColumn(),
+  (String[]) configuration
+  
.getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
+}
+  }
+
+  private int getNumberOfCores() {
+int numberOfCores;
+try {
+  numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
+  .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
+  CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
+} catch (NumberFormatException exc) {
+  numberOfCores = 
Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+}
+return numberOfCores;
+  }
+
+  private int getBatchSize() {
+int batchSize;
+try {
+  batchSize = Integer.parseInt(configuration
+  
.getDataLoadProperty(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE,
+  
DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT).toString());
+} catch (NumberFormatException exc) {
+  batchSize = 
Integer.parseInt(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT);
+}
+return batchSize;
+  }
+
+  @Override public Iterator[] execute() {
+int batchSize = getBatchSize();
+List>[] readerIterators = 
partitionInputReaderIterators();
+Iterator[] outIterators = new 
Iterator[readerIterators.length];
+for (int i = 0; i < outIterators.length; i++) {
+  outIterators[i] = new InputProcessorIterator(readerIterators[i], 
genericParsers, batchSize);
+}
+return outIterators;
+  }
+
+  private List>[] partitionInputReaderIterators() {
+int numberOfCores = getNumberOfCores();
+if (inputIterators.size() < numberOfCores) {
+  numberOfCores = inputIterators.size();
+}
+List>[] 

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

2016-10-20 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/240#discussion_r84228406
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/parser/GenericParser.java
 ---
@@ -0,0 +1,22 @@
+package org.apache.carbondata.processing.newflow.parser;
+
+/**
+ * Parse the data according to implementation, The implementation classes 
can be struct, array or
+ * map datatypes.
+ */
+public interface GenericParser {
+
+  /**
+   * Parse the data as per the delimiter
+   * @param data
+   * @return
+   */
+  E parse(String data);
+
+  /**
+   * Children of the parser.
+   * @param parser
+   */
+  void addChildren(GenericParser parser);
--- End diff --

This can be the behavior of parser for complex type only, so I think you 
can create a sub-interface for `ComplexTypePaser` extending `GenericParser`. Or 
even better, you can remove it and add it in `ArrayParserImpl` and 
`StructParserImpl` only


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

2016-10-20 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/240#discussion_r84233734
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java
 ---
@@ -0,0 +1,171 @@
+package org.apache.carbondata.processing.newflow.steps.input;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import 
org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
+import 
org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.newflow.DataField;
+import 
org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
+import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
+import org.apache.carbondata.processing.newflow.parser.GenericParser;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+
+/**
+ * It reads data from record reader and sends data to next step.
+ */
+public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
+
+  private static final LogService LOGGER =
+  
LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
+
+  private GenericParser[] genericParsers;
+
+  private List> inputIterators;
+
+  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
+  AbstractDataLoadProcessorStep child, List> 
inputIterators) {
+super(configuration, child);
+this.inputIterators = inputIterators;
+  }
+
+  @Override public DataField[] getOutput() {
+DataField[] fields = configuration.getDataFields();
+String[] header = configuration.getHeader();
+DataField[] output = new DataField[fields.length];
+int k = 0;
+for (int i = 0; i < header.length; i++) {
+  for (int j = 0; j < fields.length; j++) {
+if 
(header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
+  output[k++] = fields[j];
+  break;
+}
+  }
+}
+return output;
+  }
+
+  @Override public void initialize() throws CarbonDataLoadingException {
+DataField[] output = getOutput();
+genericParsers = new GenericParser[output.length];
+for (int i = 0; i < genericParsers.length; i++) {
+  genericParsers[i] = 
CarbonParserFactory.createParser(output[i].getColumn(),
+  (String[]) configuration
+  
.getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
+}
+  }
+
+  private int getNumberOfCores() {
+int numberOfCores;
+try {
+  numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
+  .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
+  CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
+} catch (NumberFormatException exc) {
+  numberOfCores = 
Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+}
+return numberOfCores;
+  }
+
+  private int getBatchSize() {
+int batchSize;
+try {
+  batchSize = Integer.parseInt(configuration
+  
.getDataLoadProperty(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE,
+  
DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT).toString());
+} catch (NumberFormatException exc) {
+  batchSize = 
Integer.parseInt(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT);
+}
+return batchSize;
+  }
+
+  @Override public Iterator[] execute() {
+int batchSize = getBatchSize();
+List>[] readerIterators = 
partitionInputReaderIterators();
+Iterator[] outIterators = new 
Iterator[readerIterators.length];
+for (int i = 0; i < outIterators.length; i++) {
+  outIterators[i] = new InputProcessorIterator(readerIterators[i], 
genericParsers, batchSize);
+}
+return outIterators;
+  }
+
+  private List>[] partitionInputReaderIterators() {
+int numberOfCores = getNumberOfCores();
--- End diff --

please add some comment in this function to describe the parallelism


---
If your project is set up for it, you can 

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

2016-10-20 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/240#discussion_r84236755
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java
 ---
@@ -0,0 +1,171 @@
+package org.apache.carbondata.processing.newflow.steps.input;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import 
org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
+import 
org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.newflow.DataField;
+import 
org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
+import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
+import org.apache.carbondata.processing.newflow.parser.GenericParser;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+
+/**
+ * It reads data from record reader and sends data to next step.
+ */
+public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
+
+  private static final LogService LOGGER =
+  
LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
+
+  private GenericParser[] genericParsers;
+
+  private List> inputIterators;
+
+  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
+  AbstractDataLoadProcessorStep child, List> 
inputIterators) {
+super(configuration, child);
+this.inputIterators = inputIterators;
+  }
+
+  @Override public DataField[] getOutput() {
+DataField[] fields = configuration.getDataFields();
+String[] header = configuration.getHeader();
+DataField[] output = new DataField[fields.length];
+int k = 0;
+for (int i = 0; i < header.length; i++) {
+  for (int j = 0; j < fields.length; j++) {
+if 
(header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
+  output[k++] = fields[j];
+  break;
+}
+  }
+}
+return output;
+  }
+
+  @Override public void initialize() throws CarbonDataLoadingException {
+DataField[] output = getOutput();
+genericParsers = new GenericParser[output.length];
+for (int i = 0; i < genericParsers.length; i++) {
+  genericParsers[i] = 
CarbonParserFactory.createParser(output[i].getColumn(),
+  (String[]) configuration
+  
.getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
+}
+  }
+
+  private int getNumberOfCores() {
+int numberOfCores;
+try {
+  numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
+  .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
+  CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
+} catch (NumberFormatException exc) {
+  numberOfCores = 
Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+}
+return numberOfCores;
+  }
+
+  private int getBatchSize() {
+int batchSize;
+try {
+  batchSize = Integer.parseInt(configuration
+  
.getDataLoadProperty(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE,
+  
DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT).toString());
+} catch (NumberFormatException exc) {
+  batchSize = 
Integer.parseInt(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT);
+}
+return batchSize;
+  }
+
+  @Override public Iterator[] execute() {
+int batchSize = getBatchSize();
+List>[] readerIterators = 
partitionInputReaderIterators();
+Iterator[] outIterators = new 
Iterator[readerIterators.length];
+for (int i = 0; i < outIterators.length; i++) {
+  outIterators[i] = new InputProcessorIterator(readerIterators[i], 
genericParsers, batchSize);
+}
+return outIterators;
+  }
+
+  private List>[] partitionInputReaderIterators() {
+int numberOfCores = getNumberOfCores();
+if (inputIterators.size() < numberOfCores) {
+  numberOfCores = inputIterators.size();
+}
+List>[] 

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

2016-10-20 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/240#discussion_r84237159
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java
 ---
@@ -0,0 +1,171 @@
+package org.apache.carbondata.processing.newflow.steps.input;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import 
org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
+import 
org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.newflow.DataField;
+import 
org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
+import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
+import org.apache.carbondata.processing.newflow.parser.GenericParser;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+
+/**
+ * It reads data from record reader and sends data to next step.
+ */
+public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
+
+  private static final LogService LOGGER =
+  
LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
+
+  private GenericParser[] genericParsers;
+
+  private List> inputIterators;
+
+  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
+  AbstractDataLoadProcessorStep child, List> 
inputIterators) {
+super(configuration, child);
+this.inputIterators = inputIterators;
+  }
+
+  @Override public DataField[] getOutput() {
+DataField[] fields = configuration.getDataFields();
+String[] header = configuration.getHeader();
+DataField[] output = new DataField[fields.length];
+int k = 0;
+for (int i = 0; i < header.length; i++) {
+  for (int j = 0; j < fields.length; j++) {
+if 
(header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
+  output[k++] = fields[j];
+  break;
+}
+  }
+}
+return output;
+  }
+
+  @Override public void initialize() throws CarbonDataLoadingException {
+DataField[] output = getOutput();
+genericParsers = new GenericParser[output.length];
+for (int i = 0; i < genericParsers.length; i++) {
+  genericParsers[i] = 
CarbonParserFactory.createParser(output[i].getColumn(),
+  (String[]) configuration
+  
.getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
+}
+  }
+
+  private int getNumberOfCores() {
+int numberOfCores;
+try {
+  numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
+  .getProperty(CarbonCommonConstants.NUM_CORES_LOADING,
+  CarbonCommonConstants.NUM_CORES_DEFAULT_VAL));
+} catch (NumberFormatException exc) {
+  numberOfCores = 
Integer.parseInt(CarbonCommonConstants.NUM_CORES_DEFAULT_VAL);
+}
+return numberOfCores;
+  }
+
+  private int getBatchSize() {
+int batchSize;
+try {
+  batchSize = Integer.parseInt(configuration
+  
.getDataLoadProperty(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE,
+  
DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT).toString());
+} catch (NumberFormatException exc) {
+  batchSize = 
Integer.parseInt(DataLoadProcessorConstants.DATA_LOAD_BATCH_SIZE_DEFAULT);
+}
+return batchSize;
+  }
+
+  @Override public Iterator[] execute() {
+int batchSize = getBatchSize();
+List>[] readerIterators = 
partitionInputReaderIterators();
+Iterator[] outIterators = new 
Iterator[readerIterators.length];
+for (int i = 0; i < outIterators.length; i++) {
+  outIterators[i] = new InputProcessorIterator(readerIterators[i], 
genericParsers, batchSize);
+}
+return outIterators;
+  }
+
+  private List>[] partitionInputReaderIterators() {
+int numberOfCores = getNumberOfCores();
+if (inputIterators.size() < numberOfCores) {
+  numberOfCores = inputIterators.size();
+}
+List>[] 

[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

2016-10-20 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/240#discussion_r84231719
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java
 ---
@@ -0,0 +1,171 @@
+package org.apache.carbondata.processing.newflow.steps.input;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import 
org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
+import 
org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.newflow.DataField;
+import 
org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
+import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
+import org.apache.carbondata.processing.newflow.parser.GenericParser;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+
+/**
+ * It reads data from record reader and sends data to next step.
+ */
+public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
+
+  private static final LogService LOGGER =
+  
LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
+
+  private GenericParser[] genericParsers;
+
+  private List> inputIterators;
+
+  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
+  AbstractDataLoadProcessorStep child, List> 
inputIterators) {
+super(configuration, child);
+this.inputIterators = inputIterators;
+  }
+
+  @Override public DataField[] getOutput() {
+DataField[] fields = configuration.getDataFields();
+String[] header = configuration.getHeader();
+DataField[] output = new DataField[fields.length];
+int k = 0;
+for (int i = 0; i < header.length; i++) {
+  for (int j = 0; j < fields.length; j++) {
+if 
(header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
+  output[k++] = fields[j];
+  break;
+}
+  }
+}
+return output;
+  }
+
+  @Override public void initialize() throws CarbonDataLoadingException {
+DataField[] output = getOutput();
+genericParsers = new GenericParser[output.length];
+for (int i = 0; i < genericParsers.length; i++) {
+  genericParsers[i] = 
CarbonParserFactory.createParser(output[i].getColumn(),
+  (String[]) configuration
+  
.getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
+}
+  }
+
+  private int getNumberOfCores() {
--- End diff --

I think these functions can be shared across the project, so considering 
move them into CarbonProperties directly, like 
`CarbonProperties.numberOfCores()` and `CarbonProperties.batchSize()` 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

2016-10-14 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/240#discussion_r83506371
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/steps/input/InputProcessorStepImpl.java
 ---
@@ -0,0 +1,171 @@
+package org.apache.carbondata.processing.newflow.steps.input;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import 
org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
+import 
org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.newflow.DataField;
+import 
org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
+import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
+import org.apache.carbondata.processing.newflow.parser.GenericParser;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+
+/**
+ * It reads data from record reader and sends data to next step.
+ */
+public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
+
+  private static final LogService LOGGER =
+  
LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
+
+  private GenericParser[] genericParsers;
+
+  private List> inputIterators;
+
+  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
+  AbstractDataLoadProcessorStep child, List> 
inputIterators) {
+super(configuration, child);
+this.inputIterators = inputIterators;
+  }
+
+  @Override public DataField[] getOutput() {
+DataField[] fields = configuration.getDataFields();
+String[] header = configuration.getHeader();
+DataField[] output = new DataField[fields.length];
+int k = 0;
+for (int i = 0; i < header.length; i++) {
+  for (int j = 0; j < fields.length; j++) {
+if 
(header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
+  output[k++] = fields[j];
+  break;
+}
+  }
+}
+return output;
+  }
+
+  @Override public void intialize() throws CarbonDataLoadingException {
--- End diff --

typo, initialize


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #240: [CARBONDATA-298]Added InputProcessor...

2016-10-14 Thread ravipesala
GitHub user ravipesala opened a pull request:

https://github.com/apache/incubator-carbondata/pull/240

[CARBONDATA-298]Added InputProcessorStep to read data from csv reader 
iterator.

Add InputProcessorStep which should iterate recordreader of csv input and 
parse the data as per the data type.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ravipesala/incubator-carbondata 
input-processor-step

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-carbondata/pull/240.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #240


commit 96c46d2d31c2f80b89ff755c3683c08b24eca042
Author: ravipesala 
Date:   2016-10-14T17:09:58Z

Added InputProcessorStep to read data from csv reader iterator.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---