[GitHub] incubator-carbondata pull request #333: [CARBONDATA-471]Optimized no kettle ...

2016-12-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-carbondata/pull/333


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #333: [CARBONDATA-471]Optimized no kettle ...

2016-12-01 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/333#discussion_r90588455
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
 ---
@@ -122,24 +139,52 @@ private boolean internalHasNext() {
   if (!hasNext) {
 // Check next iterator is available in the list.
 if (counter < inputIterators.size()) {
+  // close the old iterator
+  currentIterator.close();
   // Get the next iterator from the list.
   currentIterator = inputIterators.get(counter++);
+  // Initialize the new iterator
+  currentIterator.initialize();
   hasNext = internalHasNext();
 }
   }
   return hasNext;
 }
 
-@Override
-public CarbonRowBatch next() {
-  // Create batch and fill it.
-  CarbonRowBatch carbonRowBatch = new CarbonRowBatch();
-  int count = 0;
-  while (internalHasNext() && count < batchSize) {
-carbonRowBatch.addRow(new 
CarbonRow(rowParser.parseRow(currentIterator.next(;
-count++;
+@Override public CarbonRowBatch next() {
+  CarbonRowBatch result = null;
+  try {
+if (future == null) {
+  future = getCarbonRowBatch();
+}
+result = future.get();
+nextBatch = false;
+if (hasNext()) {
+  nextBatch = true;
+  future = getCarbonRowBatch();
+} else {
+  currentIterator.close();
+}
+  } catch (Exception e) {
--- End diff --

cache InterruptedException, ExecutionException only


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #333: [CARBONDATA-471]Optimized no kettle ...

2016-12-01 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/333#discussion_r90588425
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
 ---
@@ -122,24 +139,52 @@ private boolean internalHasNext() {
   if (!hasNext) {
 // Check next iterator is available in the list.
 if (counter < inputIterators.size()) {
+  // close the old iterator
+  currentIterator.close();
   // Get the next iterator from the list.
   currentIterator = inputIterators.get(counter++);
+  // Initialize the new iterator
+  currentIterator.initialize();
   hasNext = internalHasNext();
 }
   }
   return hasNext;
 }
 
-@Override
-public CarbonRowBatch next() {
-  // Create batch and fill it.
-  CarbonRowBatch carbonRowBatch = new CarbonRowBatch();
-  int count = 0;
-  while (internalHasNext() && count < batchSize) {
-carbonRowBatch.addRow(new 
CarbonRow(rowParser.parseRow(currentIterator.next(;
-count++;
+@Override public CarbonRowBatch next() {
+  CarbonRowBatch result = null;
+  try {
--- End diff --

limit the try scope to `future.get` only


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #333: [CARBONDATA-471]Optimized no kettle ...

2016-12-01 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/333#discussion_r90588408
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
 ---
@@ -122,24 +139,52 @@ private boolean internalHasNext() {
   if (!hasNext) {
 // Check next iterator is available in the list.
 if (counter < inputIterators.size()) {
+  // close the old iterator
+  currentIterator.close();
   // Get the next iterator from the list.
   currentIterator = inputIterators.get(counter++);
+  // Initialize the new iterator
+  currentIterator.initialize();
   hasNext = internalHasNext();
 }
   }
   return hasNext;
 }
 
-@Override
-public CarbonRowBatch next() {
-  // Create batch and fill it.
-  CarbonRowBatch carbonRowBatch = new CarbonRowBatch();
-  int count = 0;
-  while (internalHasNext() && count < batchSize) {
-carbonRowBatch.addRow(new 
CarbonRow(rowParser.parseRow(currentIterator.next(;
-count++;
+@Override public CarbonRowBatch next() {
--- End diff --

put override to previous line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #333: [CARBONDATA-471]Optimized no kettle ...

2016-12-01 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/333#discussion_r90588348
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
 ---
@@ -122,24 +139,52 @@ private boolean internalHasNext() {
   if (!hasNext) {
 // Check next iterator is available in the list.
 if (counter < inputIterators.size()) {
+  // close the old iterator
+  currentIterator.close();
   // Get the next iterator from the list.
   currentIterator = inputIterators.get(counter++);
+  // Initialize the new iterator
+  currentIterator.initialize();
   hasNext = internalHasNext();
 }
   }
   return hasNext;
 }
 
-@Override
-public CarbonRowBatch next() {
-  // Create batch and fill it.
-  CarbonRowBatch carbonRowBatch = new CarbonRowBatch();
-  int count = 0;
-  while (internalHasNext() && count < batchSize) {
-carbonRowBatch.addRow(new 
CarbonRow(rowParser.parseRow(currentIterator.next(;
-count++;
+@Override public CarbonRowBatch next() {
+  CarbonRowBatch result = null;
+  try {
--- End diff --

put override to previous line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #333: [CARBONDATA-471]Optimized no kettle ...

2016-12-01 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/333#discussion_r90588344
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
 ---
@@ -80,40 +87,50 @@ public void initialize() throws 
CarbonDataLoadingException {
 return iterators;
   }
 
-  @Override
-  protected CarbonRow processRow(CarbonRow row) {
+  @Override protected CarbonRow processRow(CarbonRow row) {
 return null;
   }
 
+  @Override public void close() {
+executorService.shutdown();
+  }
+
   /**
* This iterator wraps the list of iterators and it starts iterating the 
each
* iterator of the list one by one. It also parse the data while 
iterating it.
*/
   private static class InputProcessorIterator extends 
CarbonIterator {
 
-private List> inputIterators;
+private List> inputIterators;
 
-private Iterator currentIterator;
+private InputIterator currentIterator;
 
 private int counter;
 
 private int batchSize;
 
 private RowParser rowParser;
 
-public InputProcessorIterator(List> inputIterators,
-RowParser rowParser, int batchSize) {
+private Future future;
+
+private ExecutorService executorService;
+
+private boolean nextBatch = false;
+
+public InputProcessorIterator(List> 
inputIterators,
+RowParser rowParser, int batchSize, ExecutorService 
executorService) {
   this.inputIterators = inputIterators;
   this.batchSize = batchSize;
   this.rowParser = rowParser;
   this.counter = 0;
   // Get the first iterator from the list.
   currentIterator = inputIterators.get(counter++);
+  currentIterator.initialize();
+  this.executorService = executorService;
 }
 
-@Override
-public boolean hasNext() {
-  return internalHasNext();
+@Override public boolean hasNext() {
--- End diff --

put override to previous line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #333: [CARBONDATA-471]Optimized no kettle ...

2016-12-01 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/333#discussion_r90588312
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
 ---
@@ -80,40 +87,50 @@ public void initialize() throws 
CarbonDataLoadingException {
 return iterators;
   }
 
-  @Override
-  protected CarbonRow processRow(CarbonRow row) {
+  @Override protected CarbonRow processRow(CarbonRow row) {
 return null;
   }
 
+  @Override public void close() {
+executorService.shutdown();
+  }
+
   /**
* This iterator wraps the list of iterators and it starts iterating the 
each
* iterator of the list one by one. It also parse the data while 
iterating it.
*/
   private static class InputProcessorIterator extends 
CarbonIterator {
 
-private List> inputIterators;
+private List> inputIterators;
 
-private Iterator currentIterator;
+private InputIterator currentIterator;
 
 private int counter;
 
 private int batchSize;
 
 private RowParser rowParser;
 
-public InputProcessorIterator(List> inputIterators,
-RowParser rowParser, int batchSize) {
+private Future future;
+
+private ExecutorService executorService;
+
+private boolean nextBatch = false;
--- End diff --

initialize in constructor, like counter


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #333: [CARBONDATA-471]Optimized no kettle ...

2016-12-01 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/333#discussion_r90587816
  
--- Diff: 
hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java ---
@@ -138,6 +140,17 @@ public static void setQuoteCharacter(String 
quoteCharacter, Configuration config
   }
 
   /**
+   * Sets the read buffer size to configuration.
+   * @param bufferSize
+   * @param configuration
+   */
+  public static void setReadBufferSize(String bufferSize, Configuration 
configuration) {
--- End diff --

why bufferSize is string but not int?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---