Repository: incubator-carbondata
Updated Branches:
  refs/heads/master cb480e0dd -> eac728d11


Added interface for carbon data loading

Updated as per review comments

Updated interface as per review comments

Added carbon row.

Added carbon row batch.


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/779fd08e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/779fd08e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/779fd08e

Branch: refs/heads/master
Commit: 779fd08e0577287d577f316d1214de1f1b7687be
Parents: cb480e0
Author: ravipesala <ravi.pes...@gmail.com>
Authored: Tue Oct 11 21:33:48 2016 +0530
Committer: jackylk <jacky.li...@huawei.com>
Committed: Fri Oct 14 17:22:04 2016 +0800

----------------------------------------------------------------------
 .../newflow/AbstractDataLoadProcessorStep.java  | 124 ++++++++++++++++
 .../newflow/CarbonDataLoadConfiguration.java    | 144 +++++++++++++++++++
 .../processing/newflow/DataField.java           |  57 ++++++++
 .../constants/DataLoadProcessorConstants.java   |  36 +++++
 .../exception/CarbonDataLoadingException.java   |  88 ++++++++++++
 .../processing/newflow/row/CarbonRow.java       |  71 +++++++++
 .../processing/newflow/row/CarbonRowBatch.java  |  42 ++++++
 7 files changed, 562 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/779fd08e/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java
new file mode 100644
index 0000000..69fe511
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.newflow;
+
+import java.util.Iterator;
+
+import org.apache.carbondata.common.CarbonIterator;
+import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+
+/**
+ * This base abstract class for data loading.
+ * It can do transformation jobs as per the implementation.
+ */
+public abstract class AbstractDataLoadProcessorStep {
+
+  protected CarbonDataLoadConfiguration configuration;
+
+  protected AbstractDataLoadProcessorStep child;
+
+  public AbstractDataLoadProcessorStep(CarbonDataLoadConfiguration 
configuration,
+      AbstractDataLoadProcessorStep child) {
+    this.configuration = configuration;
+    this.child = child;
+  }
+
+  /**
+   * The output meta for this step. The data returns from this step is as per 
this meta.
+   *
+   */
+  public abstract DataField[] getOutput();
+
+  /**
+   * Intialization process for this step.
+   *
+   * @throws CarbonDataLoadingException
+   */
+  public abstract void intialize() throws CarbonDataLoadingException;
+
+  /**
+   * Tranform the data as per the implementation.
+   *
+   * @return Array of Iterator with data. It can be processed parallel if 
implementation class wants
+   * @throws CarbonDataLoadingException
+   */
+  public Iterator<CarbonRowBatch>[] execute() throws 
CarbonDataLoadingException {
+    Iterator<CarbonRowBatch>[] childIters = child.execute();
+    Iterator<CarbonRowBatch>[] iterators = new Iterator[childIters.length];
+    for (int i = 0; i < childIters.length; i++) {
+      iterators[i] = getIterator(childIters[i]);
+    }
+    return iterators;
+  }
+
+  /**
+   * Create the iterator using child iterator.
+   *
+   * @param childIter
+   * @return new iterator with step specific processing.
+   */
+  protected Iterator<CarbonRowBatch> getIterator(final 
Iterator<CarbonRowBatch> childIter) {
+    return new CarbonIterator<CarbonRowBatch>() {
+      @Override public boolean hasNext() {
+        return childIter.hasNext();
+      }
+
+      @Override public CarbonRowBatch next() {
+        return processRowBatch(childIter.next());
+      }
+    };
+  }
+
+  /**
+   * Process the batch of rows as per the step logic.
+   *
+   * @param rowBatch
+   * @return processed row.
+   */
+  protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch) {
+    CarbonRowBatch newBatch = new CarbonRowBatch();
+    Iterator<CarbonRow> batchIterator = rowBatch.getBatchIterator();
+    while (batchIterator.hasNext()) {
+      newBatch.addRow(processRow(batchIterator.next()));
+    }
+    return newBatch;
+  }
+
+  /**
+   * Process the row as per the step logic.
+   *
+   * @param row
+   * @return processed row.
+   */
+  protected abstract CarbonRow processRow(CarbonRow row);
+
+  /**
+   * It is called when task is called successfully.
+   */
+  public abstract void finish();
+
+  /**
+   * Closing of resources after step execution can be done here.
+   */
+  public abstract void close();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/779fd08e/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
new file mode 100644
index 0000000..20013ce
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.newflow;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+
+public class CarbonDataLoadConfiguration {
+
+  private DataField[] dataFields;
+
+  private AbsoluteTableIdentifier tableIdentifier;
+
+  private String[] header;
+
+  private String partitionId;
+
+  private String segmentId;
+
+  private String taskNo;
+
+  private Map<String, Object> dataLoadProperties = new HashMap<>();
+
+  public int getDimensionCount() {
+    int dimCount = 0;
+    for (int i = 0; i < dataFields.length; i++) {
+      if (dataFields[i].getColumn().isDimesion()) {
+        dimCount++;
+      }
+    }
+    return dimCount;
+  }
+
+  public int getNoDictionaryCount() {
+    int dimCount = 0;
+    for (int i = 0; i < dataFields.length; i++) {
+      if (dataFields[i].getColumn().isDimesion() && 
!dataFields[i].hasDictionaryEncoding()) {
+        dimCount++;
+      }
+    }
+    return dimCount;
+  }
+
+  public int getComplexDimensionCount() {
+    int dimCount = 0;
+    for (int i = 0; i < dataFields.length; i++) {
+      if (dataFields[i].getColumn().isComplex()) {
+        dimCount++;
+      }
+    }
+    return dimCount;
+  }
+
+  public int getMeasureCount() {
+    int msrCount = 0;
+    for (int i = 0; i < dataFields.length; i++) {
+      if (!dataFields[i].getColumn().isDimesion()) {
+        msrCount++;
+      }
+    }
+    return msrCount;
+  }
+
+  public DataField[] getDataFields() {
+    return dataFields;
+  }
+
+  public void setDataFields(DataField[] dataFields) {
+    this.dataFields = dataFields;
+  }
+
+  public String[] getHeader() {
+    return header;
+  }
+
+  public void setHeader(String[] header) {
+    this.header = header;
+  }
+
+  public AbsoluteTableIdentifier getTableIdentifier() {
+    return tableIdentifier;
+  }
+
+  public void setTableIdentifier(AbsoluteTableIdentifier tableIdentifier) {
+    this.tableIdentifier = tableIdentifier;
+  }
+
+  public String getPartitionId() {
+    return partitionId;
+  }
+
+  public void setPartitionId(String partitionId) {
+    this.partitionId = partitionId;
+  }
+
+  public String getSegmentId() {
+    return segmentId;
+  }
+
+  public void setSegmentId(String segmentId) {
+    this.segmentId = segmentId;
+  }
+
+  public String getTaskNo() {
+    return taskNo;
+  }
+
+  public void setTaskNo(String taskNo) {
+    this.taskNo = taskNo;
+  }
+
+  public void setDataLoadProperty(String key, Object value) {
+    dataLoadProperties.put(key, value);
+  }
+
+  public Object getDataLoadProperty(String key, Object defaultValue) {
+    Object value = dataLoadProperties.get(key);
+    if (value == null) {
+      value = defaultValue;
+    }
+    return value;
+  }
+
+  public Object getDataLoadProperty(String key) {
+    return dataLoadProperties.get(key);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/779fd08e/processing/src/main/java/org/apache/carbondata/processing/newflow/DataField.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataField.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataField.java
new file mode 100644
index 0000000..3e6d63e
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataField.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.newflow;
+
+import java.io.Serializable;
+
+import 
org.apache.carbondata.core.carbon.metadata.blocklet.compressor.CompressionCodec;
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
+import 
org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn;
+
+/**
+ * Metadata class for each column of table.
+ */
+public class DataField implements Serializable {
+
+  private CarbonColumn column;
+
+  private CompressionCodec compressionCodec;
+
+  public boolean hasDictionaryEncoding() {
+    return column.hasEncoding(Encoding.DICTIONARY);
+  }
+
+  public CarbonColumn getColumn() {
+    return column;
+  }
+
+  public void setColumn(CarbonColumn column) {
+    this.column = column;
+  }
+
+  public CompressionCodec getCompressionCodec() {
+    return compressionCodec;
+  }
+
+  public void setCompressionCodec(CompressionCodec compressionCodec) {
+    this.compressionCodec = compressionCodec;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/779fd08e/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
new file mode 100644
index 0000000..9d35ccb
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.newflow.constants;
+
+/**
+ * Constants used in data loading.
+ */
+public final class DataLoadProcessorConstants {
+
+  public static final String TEMP_STORE_LOCATION = "TEMP_STORE_LOCATION";
+
+  public static final String BLOCKLET_SIZE = "BLOCKLET_SIZE";
+
+  public static final String SORT_SIZE = "SORT_SIZE";
+
+  public static final String FACT_TIME_STAMP = "FACT_TIME_STAMP";
+
+  public static final String COMPLEX_DELIMITERS = "COMPLEX_DELIMITERS";
+
+  public static final String DIMENSION_LENGTHS = "DIMENSION_LENGTHS";
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/779fd08e/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java
new file mode 100644
index 0000000..c26e2de
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.newflow.exception;
+
+import java.util.Locale;
+
+public class CarbonDataLoadingException extends Exception {
+  /**
+   * default serial version ID.
+   */
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * The Error message.
+   */
+  private String msg = "";
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public CarbonDataLoadingException(String msg) {
+    super(msg);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public CarbonDataLoadingException(String msg, Throwable t) {
+    super(msg, t);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param t
+   */
+  public CarbonDataLoadingException(Throwable t) {
+    super(t);
+  }
+
+  /**
+   * This method is used to get the localized message.
+   *
+   * @param locale - A Locale object represents a specific geographical,
+   *               political, or cultural region.
+   * @return - Localized error message.
+   */
+  public String getLocalizedMessage(Locale locale) {
+    return "";
+  }
+
+  /**
+   * getLocalizedMessage
+   */
+  @Override public String getLocalizedMessage() {
+    return super.getLocalizedMessage();
+  }
+
+  /**
+   * getMessage
+   */
+  public String getMessage() {
+    return this.msg;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/779fd08e/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java
new file mode 100644
index 0000000..e1aa601
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.newflow.row;
+
+import java.math.BigDecimal;
+
+/**
+ * This row class is used to transfer the row data from one step to other step
+ */
+public class CarbonRow {
+
+  private Object[] data;
+
+  public CarbonRow(Object[] data) {
+    this.data = data;
+  }
+
+  public Object[] getData() {
+    return data;
+  }
+
+  public int getInt(int ordinal) {
+    return (int) data[ordinal];
+  }
+
+  public long getLong(int ordinal) {
+    return (long) data[ordinal];
+  }
+
+  public float getFloat(int ordinal) {
+    return (float) data[ordinal];
+  }
+
+  public double getDouble(int ordinal) {
+    return (double) data[ordinal];
+  }
+
+  public BigDecimal getDecimal(int ordinal) {
+    return (BigDecimal) data[ordinal];
+  }
+
+  public String getString(int ordinal) {
+    return (String) data[ordinal];
+  }
+
+  public byte[] getBinary(int ordinal) {
+    return (byte[]) data[ordinal];
+  }
+
+  public void update(Object value, int ordinal) {
+    data[ordinal] = value;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/779fd08e/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
new file mode 100644
index 0000000..b26a60e
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.newflow.row;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Batch of rows.
+ */
+public class CarbonRowBatch {
+
+  private List<CarbonRow> rowBatch = new ArrayList<>();
+
+  public void addRow(CarbonRow carbonRow) {
+    rowBatch.add(carbonRow);
+  }
+
+  public Iterator<CarbonRow> getBatchIterator() {
+    return rowBatch.iterator();
+  }
+
+
+}

Reply via email to