[GitHub] carbondata pull request #2133: [CARBONDATA-2304][Compaction] Prefetch rowbat...

2018-04-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/carbondata/pull/2133


---


[GitHub] carbondata pull request #2133: [CARBONDATA-2304][Compaction] Prefetch rowbat...

2018-04-16 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2133#discussion_r181136505
  
--- Diff: 
core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
 ---
@@ -39,106 +53,131 @@
*/
   private CarbonIterator detailRawQueryResultIterator;
 
-  /**
-   * Counter to maintain the row counter.
-   */
-  private int counter = 0;
-
-  private Object[] currentConveretedRawRow = null;
-
-  /**
-   * LOGGER
-   */
-  private static final LogService LOGGER =
-  LogServiceFactory.getLogService(RawResultIterator.class.getName());
-
-  /**
-   * batch of the result.
-   */
-  private RowBatch batch;
+  private boolean prefetchEnabled;
+  private List currentBuffer;
+  private List backupBuffer;
+  private int currentIdxInBuffer;
+  private ExecutorService executorService;
+  private Future fetchFuture;
+  private Object[] currentRawRow = null;
+  private boolean isBackupFilled = false;
 
   public RawResultIterator(CarbonIterator 
detailRawQueryResultIterator,
-  SegmentProperties sourceSegProperties, SegmentProperties 
destinationSegProperties) {
+  SegmentProperties sourceSegProperties, SegmentProperties 
destinationSegProperties,
+  boolean isStreamingHandOff) {
 this.detailRawQueryResultIterator = detailRawQueryResultIterator;
 this.sourceSegProperties = sourceSegProperties;
 this.destinationSegProperties = destinationSegProperties;
+this.executorService = Executors.newFixedThreadPool(1);
+
+if (!isStreamingHandOff) {
+  init();
+}
   }
 
-  @Override public boolean hasNext() {
+  private void init() {
+this.prefetchEnabled = CarbonProperties.getInstance().getProperty(
+CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE,
+
CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true");
+try {
+  new RowsFetcher(false).call();
+  if (prefetchEnabled) {
+this.fetchFuture = executorService.submit(new RowsFetcher(true));
+  }
+} catch (Exception e) {
+  LOGGER.error(e, "Error occurs while fetching records");
+  throw new RuntimeException(e);
+}
+  }
+
+  /**
+   * fetch rows
+   */
+  private final class RowsFetcher implements Callable {
+private boolean isBackupFilling;
 
-if (null == batch || checkIfBatchIsProcessedCompletely(batch)) {
-  if (detailRawQueryResultIterator.hasNext()) {
-batch = null;
-batch = detailRawQueryResultIterator.next();
-counter = 0; // batch changed so reset the counter.
+private RowsFetcher(boolean isBackupFilling) {
+  this.isBackupFilling = isBackupFilling;
+}
+
+@Override
+public Void call() throws Exception {
+  if (isBackupFilling) {
+backupBuffer = fetchRows();
+isBackupFilled = true;
   } else {
-return false;
+currentBuffer = fetchRows();
   }
+  return null;
 }
+  }
 
-if (!checkIfBatchIsProcessedCompletely(batch)) {
-  return true;
+  private List fetchRows() {
+if (detailRawQueryResultIterator.hasNext()) {
+  return detailRawQueryResultIterator.next().getRows();
 } else {
-  return false;
+  return new ArrayList<>();
 }
   }
 
-  @Override public Object[] next() {
-if (null == batch) { // for 1st time
-  batch = detailRawQueryResultIterator.next();
-}
-if (!checkIfBatchIsProcessedCompletely(batch)) {
-  try {
-if (null != currentConveretedRawRow) {
-  counter++;
-  Object[] currentConveretedRawRowTemp = 
this.currentConveretedRawRow;
-  currentConveretedRawRow = null;
-  return currentConveretedRawRowTemp;
+  private void fillDataFromPrefetch() {
+try {
+  if (currentIdxInBuffer >= currentBuffer.size() && 0 != 
currentIdxInBuffer) {
+if (prefetchEnabled) {
+  if (!isBackupFilled) {
+fetchFuture.get();
+  }
+  // copy backup buffer to current buffer and fill backup buffer 
asyn
+  currentIdxInBuffer = 0;
+  currentBuffer = backupBuffer;
+  isBackupFilled = false;
+  fetchFuture = executorService.submit(new RowsFetcher(true));
+} else {
+  currentIdxInBuffer = 0;
+  new RowsFetcher(false).call();
 }
-return convertRow(batch.getRawRow(counter++));
-  } catch (KeyGenException e) {
-

[GitHub] carbondata pull request #2133: [CARBONDATA-2304][Compaction] Prefetch rowbat...

2018-04-16 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2133#discussion_r181135852
  
--- Diff: 
core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 ---
@@ -1642,6 +1642,14 @@
 
   public static final String CARBON_SEARCH_MODE_THREAD_DEFAULT = "3";
 
+  /*
+   * whether to enable prefetch during compaction
--- End diff --

Can you describe more on what is prefetched


---


[GitHub] carbondata pull request #2133: [CARBONDATA-2304][Compaction] Prefetch rowbat...

2018-04-16 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2133#discussion_r181136264
  
--- Diff: 
core/src/main/java/org/apache/carbondata/core/scan/result/iterator/RawResultIterator.java
 ---
@@ -39,106 +53,131 @@
*/
   private CarbonIterator detailRawQueryResultIterator;
 
-  /**
-   * Counter to maintain the row counter.
-   */
-  private int counter = 0;
-
-  private Object[] currentConveretedRawRow = null;
-
-  /**
-   * LOGGER
-   */
-  private static final LogService LOGGER =
-  LogServiceFactory.getLogService(RawResultIterator.class.getName());
-
-  /**
-   * batch of the result.
-   */
-  private RowBatch batch;
+  private boolean prefetchEnabled;
+  private List currentBuffer;
+  private List backupBuffer;
+  private int currentIdxInBuffer;
+  private ExecutorService executorService;
+  private Future fetchFuture;
+  private Object[] currentRawRow = null;
+  private boolean isBackupFilled = false;
 
   public RawResultIterator(CarbonIterator 
detailRawQueryResultIterator,
-  SegmentProperties sourceSegProperties, SegmentProperties 
destinationSegProperties) {
+  SegmentProperties sourceSegProperties, SegmentProperties 
destinationSegProperties,
+  boolean isStreamingHandOff) {
 this.detailRawQueryResultIterator = detailRawQueryResultIterator;
 this.sourceSegProperties = sourceSegProperties;
 this.destinationSegProperties = destinationSegProperties;
+this.executorService = Executors.newFixedThreadPool(1);
+
+if (!isStreamingHandOff) {
+  init();
+}
   }
 
-  @Override public boolean hasNext() {
+  private void init() {
+this.prefetchEnabled = CarbonProperties.getInstance().getProperty(
+CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE,
+
CarbonCommonConstants.CARBON_COMPACTION_PREFETCH_ENABLE_DEFAULT).equalsIgnoreCase("true");
+try {
+  new RowsFetcher(false).call();
+  if (prefetchEnabled) {
+this.fetchFuture = executorService.submit(new RowsFetcher(true));
+  }
+} catch (Exception e) {
+  LOGGER.error(e, "Error occurs while fetching records");
+  throw new RuntimeException(e);
+}
+  }
+
+  /**
+   * fetch rows
+   */
+  private final class RowsFetcher implements Callable {
+private boolean isBackupFilling;
 
-if (null == batch || checkIfBatchIsProcessedCompletely(batch)) {
-  if (detailRawQueryResultIterator.hasNext()) {
-batch = null;
-batch = detailRawQueryResultIterator.next();
-counter = 0; // batch changed so reset the counter.
+private RowsFetcher(boolean isBackupFilling) {
+  this.isBackupFilling = isBackupFilling;
+}
+
+@Override
+public Void call() throws Exception {
+  if (isBackupFilling) {
+backupBuffer = fetchRows();
+isBackupFilled = true;
   } else {
-return false;
+currentBuffer = fetchRows();
   }
+  return null;
 }
+  }
 
-if (!checkIfBatchIsProcessedCompletely(batch)) {
-  return true;
+  private List fetchRows() {
+if (detailRawQueryResultIterator.hasNext()) {
+  return detailRawQueryResultIterator.next().getRows();
 } else {
-  return false;
+  return new ArrayList<>();
 }
   }
 
-  @Override public Object[] next() {
-if (null == batch) { // for 1st time
-  batch = detailRawQueryResultIterator.next();
-}
-if (!checkIfBatchIsProcessedCompletely(batch)) {
-  try {
-if (null != currentConveretedRawRow) {
-  counter++;
-  Object[] currentConveretedRawRowTemp = 
this.currentConveretedRawRow;
-  currentConveretedRawRow = null;
-  return currentConveretedRawRowTemp;
+  private void fillDataFromPrefetch() {
+try {
+  if (currentIdxInBuffer >= currentBuffer.size() && 0 != 
currentIdxInBuffer) {
+if (prefetchEnabled) {
+  if (!isBackupFilled) {
+fetchFuture.get();
+  }
+  // copy backup buffer to current buffer and fill backup buffer 
asyn
+  currentIdxInBuffer = 0;
+  currentBuffer = backupBuffer;
+  isBackupFilled = false;
+  fetchFuture = executorService.submit(new RowsFetcher(true));
+} else {
+  currentIdxInBuffer = 0;
+  new RowsFetcher(false).call();
 }
-return convertRow(batch.getRawRow(counter++));
-  } catch (KeyGenException e) {
-

[GitHub] carbondata pull request #2133: [CARBONDATA-2304][Compaction] Prefetch rowbat...

2018-04-02 Thread xuchuanyin
GitHub user xuchuanyin opened a pull request:

https://github.com/apache/carbondata/pull/2133

[CARBONDATA-2304][Compaction] Prefetch rowbatch during compaction

Add a configuration to enable prefetch during compaction.

During compaction, carbondata will query on the segments and retrieve a 
row, then it will sort the rows and produce the final carbondata file.

Currently we find the poor performance in retrieving the rows, so adding 
prefetch for the rows will surely improve the compaction performance.

In my local tests, compacting 4 segments each with 100 thousand rows costs 
30s with prefetch and 50s without prefetch.

In my tests in a larger cluster, compacting 6 segments each with 18GB raw 
data costs 45min with prefetch and 57min without prefetch.

Be sure to do all of the following checklist to help us incorporate 
your contribution quickly and easily:

 - [x] Any interfaces changed?
 `NO`
 - [x] Any backward compatibility impacted?
 `NO`
 - [x] Document update required?
`Add a configuration, will update it later`
 - [x] Testing done
Please provide details on 
- Whether new unit test cases have been added or why no new tests 
are required?
`Yes`
- How it is tested? Please attach test report.
`Tested in local and a 3-node cluster`
- Is it a performance related change? Please attach the performance 
test report.
`Compaction performance has been enhanced by 25+%`
- Any additional information to help reviewers in testing this 
change.
   
 - [x] For large changes, please consider breaking it into sub-tasks under 
an umbrella JIRA. 
`Not related`


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuchuanyin/carbondata 0402_compaction_prefetch

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/carbondata/pull/2133.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2133


commit 41869effa326052b46088f68dd1d6ccc5f7525e5
Author: xuchuanyin 
Date:   2018-04-02T12:38:17Z

Prefetch rowbatch during compaction

Add a configuration to enable prefetch during compaction.




---