[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2020-01-02 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006916#comment-17006916
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

asfgit commented on pull request #1929: DRILL-6832: Remove the old "unmanaged" 
external sort
URL: https://github.com/apache/drill/pull/1929
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>  Labels: ready-to-commit
> Fix For: 1.18.0
>
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-31 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006254#comment-17006254
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

paul-rogers commented on issue #1929: DRILL-6832: Remove the old "unmanaged" 
external sort
URL: https://github.com/apache/drill/pull/1929#issuecomment-570004303
 
 
   @ihuzenko, thanks much for your review. Squashed commits. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>  Labels: ready-to-commit
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-31 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17006009#comment-17006009
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on issue #1929: DRILL-6832: Remove the old "unmanaged" 
external sort
URL: https://github.com/apache/drill/pull/1929#issuecomment-569895159
 
 
   @paul-rogers please squash commits into one.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>  Labels: ready-to-commit
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-31 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17005996#comment-17005996
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on issue #1929: DRILL-6832: Remove the old "unmanaged" 
external sort
URL: https://github.com/apache/drill/pull/1929#issuecomment-569889329
 
 
   LGTM, +1. Thanks, @paul-rogers for making the changes.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-30 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17005882#comment-17005882
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

paul-rogers commented on issue #1929: DRILL-6832: Remove the old "unmanaged" 
external sort
URL: https://github.com/apache/drill/pull/1929#issuecomment-569844669
 
 
   Rebased on the latest master and reran all unit tests.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-30 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17005860#comment-17005860
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

paul-rogers commented on issue #1929: DRILL-6832: Remove the old "unmanaged" 
external sort
URL: https://github.com/apache/drill/pull/1929#issuecomment-569836457
 
 
   @ihuzenko, I should have spotted that one. Also, I should have done my own 
test run, but I thought, hey, it is one simple change, what could go wrong? My 
fault for not actually thinking. Anyway, we should be good now. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-30 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17005236#comment-17005236
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on issue #1929: DRILL-6832: Remove the old "unmanaged" 
external sort
URL: https://github.com/apache/drill/pull/1929#issuecomment-569633946
 
 
   @paul-rogers , omg, sometimes I'm so blind:( That last comment from my side 
was a big mistake and caused StackOverFlow error in our tests with this 
```Autocloseable.close(varargs)```. I'm so sorry about that, please revert the 
last commit. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17004162#comment-17004162
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r361651705
 
 

 ##
 File path: common/src/main/java/org/apache/drill/common/AutoCloseables.java
 ##
 @@ -75,6 +76,14 @@ public static void close(AutoCloseable... autoCloseables) 
throws Exception {
 close(Arrays.asList(autoCloseables));
   }
 
+  public static void closeWithUserException(AutoCloseable... autoCloseables) {
+try {
+  close(Arrays.asList(autoCloseables));
 
 Review comment:
   ```suggestion
 close(autoCloseables);
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17004150#comment-17004150
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r361651139
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##
 @@ -286,540 +291,419 @@ public void buildSchema() throws SchemaChangeException 
{
 state = BatchState.DONE;
 break;
   default:
-break;
+throw new IllegalStateException("Unexpected iter outcome: " + outcome);
 }
   }
 
+  /**
+   * Process each request for a batch. The first request retrieves
+   * all the incoming batches and sorts them, optionally spilling to
+   * disk as needed. Subsequent calls retrieve the sorted results in
+   * fixed-size batches.
+   */
+
   @Override
   public IterOutcome innerNext() {
-if (schema != null) {
-  if (spillCount == 0) {
-return (getSelectionVector4().next()) ? IterOutcome.OK : 
IterOutcome.NONE;
-  } else {
-Stopwatch w = Stopwatch.createStarted();
-int count = copier.next(targetRecordCount);
-if (count > 0) {
-  long t = w.elapsed(TimeUnit.MICROSECONDS);
-  logger.debug("Took {} us to merge {} records", t, count);
-  container.setRecordCount(count);
-  return IterOutcome.OK;
-} else {
-  logger.debug("copier returned 0 records");
-  return IterOutcome.NONE;
-}
+switch (sortState) {
+case DONE:
+  return NONE;
+case START:
+  return load();
+case LOAD:
+  if (!this.retainInMemoryBatchesOnNone) {
+resetSortState();
   }
+  return (sortState == SortState.DONE) ? NONE : load();
+case DELIVER:
+  return nextOutputBatch();
+default:
+  throw new IllegalStateException("Unexpected sort state: " + sortState);
 }
+  }
 
-int totalCount = 0;
-int totalBatches = 0; // total number of batches received so far
+  private IterOutcome nextOutputBatch() {
+// Call next on outputSV4 for it's state to progress in parallel to 
resultsIterator state
+outputSV4.next();
 
-try{
-  container.clear();
-  outer: while (true) {
-IterOutcome upstream;
-if (first) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-} else {
-  upstream = next(incoming);
-}
-if (upstream == IterOutcome.OK && sorter == null) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-}
-switch (upstream) {
-case NONE:
-  if (first) {
-return upstream;
-  }
-  break outer;
-case NOT_YET:
-  throw new UnsupportedOperationException();
-case STOP:
-  return upstream;
-case OK_NEW_SCHEMA:
-case OK:
-  VectorContainer convertedBatch;
-  // only change in the case that the schema truly changes.  
Artificial schema changes are ignored.
-  if (upstream == IterOutcome.OK_NEW_SCHEMA && 
!incoming.getSchema().equals(schema)) {
-if (schema != null) {
-  if (unionTypeEnabled) {
-this.schema = SchemaUtil.mergeSchemas(schema, 
incoming.getSchema());
-  } else {
-throw new SchemaChangeException("Schema changes not supported 
in External Sort. Please enable Union type");
-  }
-} else {
-  schema = incoming.getSchema();
-}
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-for (BatchGroup b : batchGroups) {
-  b.setSchema(schema);
-}
-for (BatchGroup b : spilledBatchGroups) {
-  b.setSchema(schema);
-}
-this.sorter = createNewSorter(context, convertedBatch);
-  } else {
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-  }
-  if (first) {
-first = false;
-  }
-  if (convertedBatch.getRecordCount() == 0) {
-for (VectorWrapper w : convertedBatch) {
-  w.clear();
-}
-break;
-  }
-  SelectionVector2 sv2;
-  if (incoming.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
-sv2 = incoming.getSelectionVector2().clone();
-  } else {
-try {
-  sv2 = newSV2();
-} catch(InterruptedException e) {
-  return IterOutcome.STOP;
-} catch (OutOfMemoryException e) {
-  throw new OutOfMemoryException(e);
-}
-  }
+// But if results iterator next returns true that means it has 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17004141#comment-17004141
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r361648663
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##
 @@ -286,540 +291,419 @@ public void buildSchema() throws SchemaChangeException 
{
 state = BatchState.DONE;
 break;
   default:
-break;
+throw new IllegalStateException("Unexpected iter outcome: " + outcome);
 }
   }
 
+  /**
+   * Process each request for a batch. The first request retrieves
+   * all the incoming batches and sorts them, optionally spilling to
+   * disk as needed. Subsequent calls retrieve the sorted results in
+   * fixed-size batches.
+   */
+
   @Override
   public IterOutcome innerNext() {
-if (schema != null) {
-  if (spillCount == 0) {
-return (getSelectionVector4().next()) ? IterOutcome.OK : 
IterOutcome.NONE;
-  } else {
-Stopwatch w = Stopwatch.createStarted();
-int count = copier.next(targetRecordCount);
-if (count > 0) {
-  long t = w.elapsed(TimeUnit.MICROSECONDS);
-  logger.debug("Took {} us to merge {} records", t, count);
-  container.setRecordCount(count);
-  return IterOutcome.OK;
-} else {
-  logger.debug("copier returned 0 records");
-  return IterOutcome.NONE;
-}
+switch (sortState) {
+case DONE:
+  return NONE;
+case START:
+  return load();
+case LOAD:
+  if (!this.retainInMemoryBatchesOnNone) {
+resetSortState();
   }
+  return (sortState == SortState.DONE) ? NONE : load();
+case DELIVER:
+  return nextOutputBatch();
+default:
+  throw new IllegalStateException("Unexpected sort state: " + sortState);
 }
+  }
 
-int totalCount = 0;
-int totalBatches = 0; // total number of batches received so far
+  private IterOutcome nextOutputBatch() {
+// Call next on outputSV4 for it's state to progress in parallel to 
resultsIterator state
+outputSV4.next();
 
-try{
-  container.clear();
-  outer: while (true) {
-IterOutcome upstream;
-if (first) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-} else {
-  upstream = next(incoming);
-}
-if (upstream == IterOutcome.OK && sorter == null) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-}
-switch (upstream) {
-case NONE:
-  if (first) {
-return upstream;
-  }
-  break outer;
-case NOT_YET:
-  throw new UnsupportedOperationException();
-case STOP:
-  return upstream;
-case OK_NEW_SCHEMA:
-case OK:
-  VectorContainer convertedBatch;
-  // only change in the case that the schema truly changes.  
Artificial schema changes are ignored.
-  if (upstream == IterOutcome.OK_NEW_SCHEMA && 
!incoming.getSchema().equals(schema)) {
-if (schema != null) {
-  if (unionTypeEnabled) {
-this.schema = SchemaUtil.mergeSchemas(schema, 
incoming.getSchema());
-  } else {
-throw new SchemaChangeException("Schema changes not supported 
in External Sort. Please enable Union type");
-  }
-} else {
-  schema = incoming.getSchema();
-}
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-for (BatchGroup b : batchGroups) {
-  b.setSchema(schema);
-}
-for (BatchGroup b : spilledBatchGroups) {
-  b.setSchema(schema);
-}
-this.sorter = createNewSorter(context, convertedBatch);
-  } else {
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-  }
-  if (first) {
-first = false;
-  }
-  if (convertedBatch.getRecordCount() == 0) {
-for (VectorWrapper w : convertedBatch) {
-  w.clear();
-}
-break;
-  }
-  SelectionVector2 sv2;
-  if (incoming.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
-sv2 = incoming.getSelectionVector2().clone();
-  } else {
-try {
-  sv2 = newSV2();
-} catch(InterruptedException e) {
-  return IterOutcome.STOP;
-} catch (OutOfMemoryException e) {
-  throw new OutOfMemoryException(e);
-}
-  }
+// But if results iterator next returns true that means it has 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17004133#comment-17004133
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r361647577
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##
 @@ -286,540 +291,419 @@ public void buildSchema() throws SchemaChangeException 
{
 state = BatchState.DONE;
 break;
   default:
-break;
+throw new IllegalStateException("Unexpected iter outcome: " + outcome);
 }
   }
 
+  /**
+   * Process each request for a batch. The first request retrieves
+   * all the incoming batches and sorts them, optionally spilling to
+   * disk as needed. Subsequent calls retrieve the sorted results in
+   * fixed-size batches.
+   */
+
   @Override
   public IterOutcome innerNext() {
-if (schema != null) {
-  if (spillCount == 0) {
-return (getSelectionVector4().next()) ? IterOutcome.OK : 
IterOutcome.NONE;
-  } else {
-Stopwatch w = Stopwatch.createStarted();
-int count = copier.next(targetRecordCount);
-if (count > 0) {
-  long t = w.elapsed(TimeUnit.MICROSECONDS);
-  logger.debug("Took {} us to merge {} records", t, count);
-  container.setRecordCount(count);
-  return IterOutcome.OK;
-} else {
-  logger.debug("copier returned 0 records");
-  return IterOutcome.NONE;
-}
+switch (sortState) {
+case DONE:
+  return NONE;
+case START:
+  return load();
+case LOAD:
+  if (!this.retainInMemoryBatchesOnNone) {
+resetSortState();
   }
+  return (sortState == SortState.DONE) ? NONE : load();
+case DELIVER:
+  return nextOutputBatch();
+default:
+  throw new IllegalStateException("Unexpected sort state: " + sortState);
 }
+  }
 
-int totalCount = 0;
-int totalBatches = 0; // total number of batches received so far
+  private IterOutcome nextOutputBatch() {
+// Call next on outputSV4 for it's state to progress in parallel to 
resultsIterator state
+outputSV4.next();
 
-try{
-  container.clear();
-  outer: while (true) {
-IterOutcome upstream;
-if (first) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-} else {
-  upstream = next(incoming);
-}
-if (upstream == IterOutcome.OK && sorter == null) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-}
-switch (upstream) {
-case NONE:
-  if (first) {
-return upstream;
-  }
-  break outer;
-case NOT_YET:
-  throw new UnsupportedOperationException();
-case STOP:
-  return upstream;
-case OK_NEW_SCHEMA:
-case OK:
-  VectorContainer convertedBatch;
-  // only change in the case that the schema truly changes.  
Artificial schema changes are ignored.
-  if (upstream == IterOutcome.OK_NEW_SCHEMA && 
!incoming.getSchema().equals(schema)) {
-if (schema != null) {
-  if (unionTypeEnabled) {
-this.schema = SchemaUtil.mergeSchemas(schema, 
incoming.getSchema());
-  } else {
-throw new SchemaChangeException("Schema changes not supported 
in External Sort. Please enable Union type");
-  }
-} else {
-  schema = incoming.getSchema();
-}
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-for (BatchGroup b : batchGroups) {
-  b.setSchema(schema);
-}
-for (BatchGroup b : spilledBatchGroups) {
-  b.setSchema(schema);
-}
-this.sorter = createNewSorter(context, convertedBatch);
-  } else {
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-  }
-  if (first) {
-first = false;
-  }
-  if (convertedBatch.getRecordCount() == 0) {
-for (VectorWrapper w : convertedBatch) {
-  w.clear();
-}
-break;
-  }
-  SelectionVector2 sv2;
-  if (incoming.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
-sv2 = incoming.getSelectionVector2().clone();
-  } else {
-try {
-  sv2 = newSV2();
-} catch(InterruptedException e) {
-  return IterOutcome.STOP;
-} catch (OutOfMemoryException e) {
-  throw new OutOfMemoryException(e);
-}
-  }
+// But if results iterator next returns true that means it has 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17004100#comment-17004100
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r361644230
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
 ##
 @@ -18,140 +18,300 @@
 package org.apache.drill.exec.physical.impl.xsort;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.cache.VectorSerializer;
+import org.apache.drill.exec.cache.VectorSerializer.Writer;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.SchemaUtil;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 
-public class BatchGroup implements VectorAccessible, AutoCloseable {
+/**
+ * Represents a group of batches spilled to disk.
+ * 
+ * The batches are defined by a schema which can change over time. When the 
schema changes,
+ * all existing and new batches are coerced into the new schema. Provides a
+ * uniform way to iterate over records for one or more batches whether
+ * the batches are in memory or on disk.
+ * 
+ * The BatchGroup operates in two modes as given by the two
+ * subclasses:
+ * 
+ * Input mode (@link InputBatchGroup): Used to buffer in-memory batches
+ * prior to spilling.
+ * Spill mode (@link SpilledBatchGroup): Holds a "memento" to a set
+ * of batches written to disk. Acts as both a reader and writer for
+ * those batches.
+ */
+
+public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BatchGroup.class);
 
-  private VectorContainer currentContainer;
-  private SelectionVector2 sv2;
-  private int pointer = 0;
-  private FSDataInputStream inputStream;
-  private FSDataOutputStream outputStream;
-  private Path path;
-  private FileSystem fs;
-  private BufferAllocator allocator;
-  private int spilledBatches = 0;
-  private OperatorContext context;
-  private BatchSchema schema;
-
-  public BatchGroup(VectorContainer container, SelectionVector2 sv2, 
OperatorContext context) {
-this.sv2 = sv2;
-this.currentContainer = container;
-this.context = context;
-  }
+  /**
+   * The input batch group gathers batches buffered in memory before
+   * spilling. The structure of the data is:
+   * 
+   * Contains a single batch received from the upstream (input)
+   * operator.
+   * Associated selection vector that provides a sorted
+   * indirection to the values in the batch.
+   * 
+   */
 
-  public BatchGroup(VectorContainer container, FileSystem fs, String path, 
OperatorContext context) {
-currentContainer = container;
-this.fs = fs;
-this.path = new Path(path);
-this.allocator = context.getAllocator();
-this.context = context;
-  }
+  public static class InputBatch extends BatchGroup {
+private final SelectionVector2 sv2;
+private final long dataSize;
+
+public InputBatch(VectorContainer container, SelectionVector2 sv2, 
BufferAllocator allocator, long dataSize) {
+  super(container, allocator);
+  this.sv2 = sv2;
+  this.dataSize = dataSize;
+}
 
-  public SelectionVector2 getSv2() {
-return sv2;
+public SelectionVector2 getSv2() { return sv2; }
+
+public long getDataSize() { return dataSize; }
+
+@Override
+public int getRecordCount() {
+  if (sv2 != null) {
+return sv2.getCount();
+  } else {
+return super.getRecordCount();
+  }
+}
+
+@Override
+public int getNextIndex() {
+  int val = super.getNextIndex();
+  if (val == -1) {
+return val;
+  }
+  return 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003967#comment-17003967
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

paul-rogers commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r361593465
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##
 @@ -286,540 +291,419 @@ public void buildSchema() throws SchemaChangeException 
{
 state = BatchState.DONE;
 break;
   default:
-break;
+throw new IllegalStateException("Unexpected iter outcome: " + outcome);
 }
   }
 
+  /**
+   * Process each request for a batch. The first request retrieves
+   * all the incoming batches and sorts them, optionally spilling to
+   * disk as needed. Subsequent calls retrieve the sorted results in
+   * fixed-size batches.
+   */
+
   @Override
   public IterOutcome innerNext() {
-if (schema != null) {
-  if (spillCount == 0) {
-return (getSelectionVector4().next()) ? IterOutcome.OK : 
IterOutcome.NONE;
-  } else {
-Stopwatch w = Stopwatch.createStarted();
-int count = copier.next(targetRecordCount);
-if (count > 0) {
-  long t = w.elapsed(TimeUnit.MICROSECONDS);
-  logger.debug("Took {} us to merge {} records", t, count);
-  container.setRecordCount(count);
-  return IterOutcome.OK;
-} else {
-  logger.debug("copier returned 0 records");
-  return IterOutcome.NONE;
-}
+switch (sortState) {
+case DONE:
+  return NONE;
+case START:
+  return load();
+case LOAD:
+  if (!this.retainInMemoryBatchesOnNone) {
+resetSortState();
   }
+  return (sortState == SortState.DONE) ? NONE : load();
+case DELIVER:
+  return nextOutputBatch();
+default:
+  throw new IllegalStateException("Unexpected sort state: " + sortState);
 }
+  }
 
-int totalCount = 0;
-int totalBatches = 0; // total number of batches received so far
+  private IterOutcome nextOutputBatch() {
+// Call next on outputSV4 for it's state to progress in parallel to 
resultsIterator state
+outputSV4.next();
 
-try{
-  container.clear();
-  outer: while (true) {
-IterOutcome upstream;
-if (first) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-} else {
-  upstream = next(incoming);
-}
-if (upstream == IterOutcome.OK && sorter == null) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-}
-switch (upstream) {
-case NONE:
-  if (first) {
-return upstream;
-  }
-  break outer;
-case NOT_YET:
-  throw new UnsupportedOperationException();
-case STOP:
-  return upstream;
-case OK_NEW_SCHEMA:
-case OK:
-  VectorContainer convertedBatch;
-  // only change in the case that the schema truly changes.  
Artificial schema changes are ignored.
-  if (upstream == IterOutcome.OK_NEW_SCHEMA && 
!incoming.getSchema().equals(schema)) {
-if (schema != null) {
-  if (unionTypeEnabled) {
-this.schema = SchemaUtil.mergeSchemas(schema, 
incoming.getSchema());
-  } else {
-throw new SchemaChangeException("Schema changes not supported 
in External Sort. Please enable Union type");
-  }
-} else {
-  schema = incoming.getSchema();
-}
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-for (BatchGroup b : batchGroups) {
-  b.setSchema(schema);
-}
-for (BatchGroup b : spilledBatchGroups) {
-  b.setSchema(schema);
-}
-this.sorter = createNewSorter(context, convertedBatch);
-  } else {
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-  }
-  if (first) {
-first = false;
-  }
-  if (convertedBatch.getRecordCount() == 0) {
-for (VectorWrapper w : convertedBatch) {
-  w.clear();
-}
-break;
-  }
-  SelectionVector2 sv2;
-  if (incoming.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
-sv2 = incoming.getSelectionVector2().clone();
-  } else {
-try {
-  sv2 = newSV2();
-} catch(InterruptedException e) {
-  return IterOutcome.STOP;
-} catch (OutOfMemoryException e) {
-  throw new OutOfMemoryException(e);
-}
-  }
+// But if results iterator next returns true that means it has 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003968#comment-17003968
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

paul-rogers commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r361590802
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##
 @@ -286,540 +291,419 @@ public void buildSchema() throws SchemaChangeException 
{
 state = BatchState.DONE;
 break;
   default:
-break;
+throw new IllegalStateException("Unexpected iter outcome: " + outcome);
 }
   }
 
+  /**
+   * Process each request for a batch. The first request retrieves
+   * all the incoming batches and sorts them, optionally spilling to
+   * disk as needed. Subsequent calls retrieve the sorted results in
+   * fixed-size batches.
+   */
+
   @Override
   public IterOutcome innerNext() {
-if (schema != null) {
-  if (spillCount == 0) {
-return (getSelectionVector4().next()) ? IterOutcome.OK : 
IterOutcome.NONE;
-  } else {
-Stopwatch w = Stopwatch.createStarted();
-int count = copier.next(targetRecordCount);
-if (count > 0) {
-  long t = w.elapsed(TimeUnit.MICROSECONDS);
-  logger.debug("Took {} us to merge {} records", t, count);
-  container.setRecordCount(count);
-  return IterOutcome.OK;
-} else {
-  logger.debug("copier returned 0 records");
-  return IterOutcome.NONE;
-}
+switch (sortState) {
+case DONE:
+  return NONE;
+case START:
+  return load();
+case LOAD:
+  if (!this.retainInMemoryBatchesOnNone) {
+resetSortState();
   }
+  return (sortState == SortState.DONE) ? NONE : load();
+case DELIVER:
+  return nextOutputBatch();
+default:
+  throw new IllegalStateException("Unexpected sort state: " + sortState);
 }
+  }
 
-int totalCount = 0;
-int totalBatches = 0; // total number of batches received so far
+  private IterOutcome nextOutputBatch() {
+// Call next on outputSV4 for it's state to progress in parallel to 
resultsIterator state
+outputSV4.next();
 
-try{
-  container.clear();
-  outer: while (true) {
-IterOutcome upstream;
-if (first) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-} else {
-  upstream = next(incoming);
-}
-if (upstream == IterOutcome.OK && sorter == null) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-}
-switch (upstream) {
-case NONE:
-  if (first) {
-return upstream;
-  }
-  break outer;
-case NOT_YET:
-  throw new UnsupportedOperationException();
-case STOP:
-  return upstream;
-case OK_NEW_SCHEMA:
-case OK:
-  VectorContainer convertedBatch;
-  // only change in the case that the schema truly changes.  
Artificial schema changes are ignored.
-  if (upstream == IterOutcome.OK_NEW_SCHEMA && 
!incoming.getSchema().equals(schema)) {
-if (schema != null) {
-  if (unionTypeEnabled) {
-this.schema = SchemaUtil.mergeSchemas(schema, 
incoming.getSchema());
-  } else {
-throw new SchemaChangeException("Schema changes not supported 
in External Sort. Please enable Union type");
-  }
-} else {
-  schema = incoming.getSchema();
-}
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-for (BatchGroup b : batchGroups) {
-  b.setSchema(schema);
-}
-for (BatchGroup b : spilledBatchGroups) {
-  b.setSchema(schema);
-}
-this.sorter = createNewSorter(context, convertedBatch);
-  } else {
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-  }
-  if (first) {
-first = false;
-  }
-  if (convertedBatch.getRecordCount() == 0) {
-for (VectorWrapper w : convertedBatch) {
-  w.clear();
-}
-break;
-  }
-  SelectionVector2 sv2;
-  if (incoming.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
-sv2 = incoming.getSelectionVector2().clone();
-  } else {
-try {
-  sv2 = newSV2();
-} catch(InterruptedException e) {
-  return IterOutcome.STOP;
-} catch (OutOfMemoryException e) {
-  throw new OutOfMemoryException(e);
-}
-  }
+// But if results iterator next returns true that means it has 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003969#comment-17003969
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

paul-rogers commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r361592300
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##
 @@ -286,540 +291,419 @@ public void buildSchema() throws SchemaChangeException 
{
 state = BatchState.DONE;
 break;
   default:
-break;
+throw new IllegalStateException("Unexpected iter outcome: " + outcome);
 }
   }
 
+  /**
+   * Process each request for a batch. The first request retrieves
+   * all the incoming batches and sorts them, optionally spilling to
+   * disk as needed. Subsequent calls retrieve the sorted results in
+   * fixed-size batches.
+   */
+
   @Override
   public IterOutcome innerNext() {
-if (schema != null) {
-  if (spillCount == 0) {
-return (getSelectionVector4().next()) ? IterOutcome.OK : 
IterOutcome.NONE;
-  } else {
-Stopwatch w = Stopwatch.createStarted();
-int count = copier.next(targetRecordCount);
-if (count > 0) {
-  long t = w.elapsed(TimeUnit.MICROSECONDS);
-  logger.debug("Took {} us to merge {} records", t, count);
-  container.setRecordCount(count);
-  return IterOutcome.OK;
-} else {
-  logger.debug("copier returned 0 records");
-  return IterOutcome.NONE;
-}
+switch (sortState) {
+case DONE:
+  return NONE;
+case START:
+  return load();
+case LOAD:
+  if (!this.retainInMemoryBatchesOnNone) {
+resetSortState();
   }
+  return (sortState == SortState.DONE) ? NONE : load();
+case DELIVER:
+  return nextOutputBatch();
+default:
+  throw new IllegalStateException("Unexpected sort state: " + sortState);
 }
+  }
 
-int totalCount = 0;
-int totalBatches = 0; // total number of batches received so far
+  private IterOutcome nextOutputBatch() {
+// Call next on outputSV4 for it's state to progress in parallel to 
resultsIterator state
+outputSV4.next();
 
-try{
-  container.clear();
-  outer: while (true) {
-IterOutcome upstream;
-if (first) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-} else {
-  upstream = next(incoming);
-}
-if (upstream == IterOutcome.OK && sorter == null) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-}
-switch (upstream) {
-case NONE:
-  if (first) {
-return upstream;
-  }
-  break outer;
-case NOT_YET:
-  throw new UnsupportedOperationException();
-case STOP:
-  return upstream;
-case OK_NEW_SCHEMA:
-case OK:
-  VectorContainer convertedBatch;
-  // only change in the case that the schema truly changes.  
Artificial schema changes are ignored.
-  if (upstream == IterOutcome.OK_NEW_SCHEMA && 
!incoming.getSchema().equals(schema)) {
-if (schema != null) {
-  if (unionTypeEnabled) {
-this.schema = SchemaUtil.mergeSchemas(schema, 
incoming.getSchema());
-  } else {
-throw new SchemaChangeException("Schema changes not supported 
in External Sort. Please enable Union type");
-  }
-} else {
-  schema = incoming.getSchema();
-}
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-for (BatchGroup b : batchGroups) {
-  b.setSchema(schema);
-}
-for (BatchGroup b : spilledBatchGroups) {
-  b.setSchema(schema);
-}
-this.sorter = createNewSorter(context, convertedBatch);
-  } else {
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-  }
-  if (first) {
-first = false;
-  }
-  if (convertedBatch.getRecordCount() == 0) {
-for (VectorWrapper w : convertedBatch) {
-  w.clear();
-}
-break;
-  }
-  SelectionVector2 sv2;
-  if (incoming.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
-sv2 = incoming.getSelectionVector2().clone();
-  } else {
-try {
-  sv2 = newSV2();
-} catch(InterruptedException e) {
-  return IterOutcome.STOP;
-} catch (OutOfMemoryException e) {
-  throw new OutOfMemoryException(e);
-}
-  }
+// But if results iterator next returns true that means it has 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003959#comment-17003959
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

paul-rogers commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r361570848
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
 ##
 @@ -18,140 +18,300 @@
 package org.apache.drill.exec.physical.impl.xsort;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.cache.VectorSerializer;
+import org.apache.drill.exec.cache.VectorSerializer.Writer;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.SchemaUtil;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 
-public class BatchGroup implements VectorAccessible, AutoCloseable {
+/**
+ * Represents a group of batches spilled to disk.
+ * 
+ * The batches are defined by a schema which can change over time. When the 
schema changes,
+ * all existing and new batches are coerced into the new schema. Provides a
+ * uniform way to iterate over records for one or more batches whether
+ * the batches are in memory or on disk.
+ * 
+ * The BatchGroup operates in two modes as given by the two
+ * subclasses:
+ * 
+ * Input mode (@link InputBatchGroup): Used to buffer in-memory batches
+ * prior to spilling.
+ * Spill mode (@link SpilledBatchGroup): Holds a "memento" to a set
+ * of batches written to disk. Acts as both a reader and writer for
+ * those batches.
+ */
+
+public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BatchGroup.class);
 
-  private VectorContainer currentContainer;
-  private SelectionVector2 sv2;
-  private int pointer = 0;
-  private FSDataInputStream inputStream;
-  private FSDataOutputStream outputStream;
-  private Path path;
-  private FileSystem fs;
-  private BufferAllocator allocator;
-  private int spilledBatches = 0;
-  private OperatorContext context;
-  private BatchSchema schema;
-
-  public BatchGroup(VectorContainer container, SelectionVector2 sv2, 
OperatorContext context) {
-this.sv2 = sv2;
-this.currentContainer = container;
-this.context = context;
-  }
+  /**
+   * The input batch group gathers batches buffered in memory before
+   * spilling. The structure of the data is:
+   * 
+   * Contains a single batch received from the upstream (input)
+   * operator.
+   * Associated selection vector that provides a sorted
+   * indirection to the values in the batch.
+   * 
+   */
 
-  public BatchGroup(VectorContainer container, FileSystem fs, String path, 
OperatorContext context) {
-currentContainer = container;
-this.fs = fs;
-this.path = new Path(path);
-this.allocator = context.getAllocator();
-this.context = context;
-  }
+  public static class InputBatch extends BatchGroup {
+private final SelectionVector2 sv2;
+private final long dataSize;
+
+public InputBatch(VectorContainer container, SelectionVector2 sv2, 
BufferAllocator allocator, long dataSize) {
+  super(container, allocator);
+  this.sv2 = sv2;
+  this.dataSize = dataSize;
+}
 
-  public SelectionVector2 getSv2() {
-return sv2;
+public SelectionVector2 getSv2() { return sv2; }
+
+public long getDataSize() { return dataSize; }
+
+@Override
+public int getRecordCount() {
+  if (sv2 != null) {
+return sv2.getCount();
+  } else {
+return super.getRecordCount();
+  }
+}
+
+@Override
+public int getNextIndex() {
+  int val = super.getNextIndex();
+  if (val == -1) {
+return val;
+  }
+  return 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003960#comment-17003960
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

paul-rogers commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r361574585
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
 ##
 @@ -221,4 +363,19 @@ public SelectionVector4 getSelectionVector4() {
 throw new UnsupportedOperationException();
   }
 
+  public static void closeAll(Collection groups) {
 
 Review comment:
   This is called from several places, so left the method in place. Did replace 
the code with the `AutoCloseables` call. Did the exception translation in this 
method since there seemed little value in cluttering `AutoCloseables` with the 
exception translation logic, or a lamda to do the work.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003965#comment-17003965
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

paul-rogers commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r361585050
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##
 @@ -286,540 +291,419 @@ public void buildSchema() throws SchemaChangeException 
{
 state = BatchState.DONE;
 break;
   default:
-break;
+throw new IllegalStateException("Unexpected iter outcome: " + outcome);
 }
   }
 
+  /**
+   * Process each request for a batch. The first request retrieves
+   * all the incoming batches and sorts them, optionally spilling to
+   * disk as needed. Subsequent calls retrieve the sorted results in
+   * fixed-size batches.
+   */
+
   @Override
   public IterOutcome innerNext() {
-if (schema != null) {
-  if (spillCount == 0) {
-return (getSelectionVector4().next()) ? IterOutcome.OK : 
IterOutcome.NONE;
-  } else {
-Stopwatch w = Stopwatch.createStarted();
-int count = copier.next(targetRecordCount);
-if (count > 0) {
-  long t = w.elapsed(TimeUnit.MICROSECONDS);
-  logger.debug("Took {} us to merge {} records", t, count);
-  container.setRecordCount(count);
-  return IterOutcome.OK;
-} else {
-  logger.debug("copier returned 0 records");
-  return IterOutcome.NONE;
-}
+switch (sortState) {
+case DONE:
+  return NONE;
+case START:
+  return load();
+case LOAD:
+  if (!this.retainInMemoryBatchesOnNone) {
+resetSortState();
   }
+  return (sortState == SortState.DONE) ? NONE : load();
+case DELIVER:
+  return nextOutputBatch();
+default:
+  throw new IllegalStateException("Unexpected sort state: " + sortState);
 }
+  }
 
-int totalCount = 0;
-int totalBatches = 0; // total number of batches received so far
+  private IterOutcome nextOutputBatch() {
+// Call next on outputSV4 for it's state to progress in parallel to 
resultsIterator state
+outputSV4.next();
 
-try{
-  container.clear();
-  outer: while (true) {
-IterOutcome upstream;
-if (first) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-} else {
-  upstream = next(incoming);
-}
-if (upstream == IterOutcome.OK && sorter == null) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-}
-switch (upstream) {
-case NONE:
-  if (first) {
-return upstream;
-  }
-  break outer;
-case NOT_YET:
-  throw new UnsupportedOperationException();
-case STOP:
-  return upstream;
-case OK_NEW_SCHEMA:
-case OK:
-  VectorContainer convertedBatch;
-  // only change in the case that the schema truly changes.  
Artificial schema changes are ignored.
-  if (upstream == IterOutcome.OK_NEW_SCHEMA && 
!incoming.getSchema().equals(schema)) {
-if (schema != null) {
-  if (unionTypeEnabled) {
-this.schema = SchemaUtil.mergeSchemas(schema, 
incoming.getSchema());
-  } else {
-throw new SchemaChangeException("Schema changes not supported 
in External Sort. Please enable Union type");
-  }
-} else {
-  schema = incoming.getSchema();
-}
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-for (BatchGroup b : batchGroups) {
-  b.setSchema(schema);
-}
-for (BatchGroup b : spilledBatchGroups) {
-  b.setSchema(schema);
-}
-this.sorter = createNewSorter(context, convertedBatch);
-  } else {
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-  }
-  if (first) {
-first = false;
-  }
-  if (convertedBatch.getRecordCount() == 0) {
-for (VectorWrapper w : convertedBatch) {
-  w.clear();
-}
-break;
-  }
-  SelectionVector2 sv2;
-  if (incoming.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
-sv2 = incoming.getSelectionVector2().clone();
-  } else {
-try {
-  sv2 = newSV2();
-} catch(InterruptedException e) {
-  return IterOutcome.STOP;
-} catch (OutOfMemoryException e) {
-  throw new OutOfMemoryException(e);
-}
-  }
+// But if results iterator next returns true that means it has 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003970#comment-17003970
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

paul-rogers commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r361593157
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##
 @@ -286,540 +291,419 @@ public void buildSchema() throws SchemaChangeException 
{
 state = BatchState.DONE;
 break;
   default:
-break;
+throw new IllegalStateException("Unexpected iter outcome: " + outcome);
 }
   }
 
+  /**
+   * Process each request for a batch. The first request retrieves
+   * all the incoming batches and sorts them, optionally spilling to
+   * disk as needed. Subsequent calls retrieve the sorted results in
+   * fixed-size batches.
+   */
+
   @Override
   public IterOutcome innerNext() {
-if (schema != null) {
-  if (spillCount == 0) {
-return (getSelectionVector4().next()) ? IterOutcome.OK : 
IterOutcome.NONE;
-  } else {
-Stopwatch w = Stopwatch.createStarted();
-int count = copier.next(targetRecordCount);
-if (count > 0) {
-  long t = w.elapsed(TimeUnit.MICROSECONDS);
-  logger.debug("Took {} us to merge {} records", t, count);
-  container.setRecordCount(count);
-  return IterOutcome.OK;
-} else {
-  logger.debug("copier returned 0 records");
-  return IterOutcome.NONE;
-}
+switch (sortState) {
+case DONE:
+  return NONE;
+case START:
+  return load();
+case LOAD:
+  if (!this.retainInMemoryBatchesOnNone) {
+resetSortState();
   }
+  return (sortState == SortState.DONE) ? NONE : load();
+case DELIVER:
+  return nextOutputBatch();
+default:
+  throw new IllegalStateException("Unexpected sort state: " + sortState);
 }
+  }
 
-int totalCount = 0;
-int totalBatches = 0; // total number of batches received so far
+  private IterOutcome nextOutputBatch() {
+// Call next on outputSV4 for it's state to progress in parallel to 
resultsIterator state
+outputSV4.next();
 
-try{
-  container.clear();
-  outer: while (true) {
-IterOutcome upstream;
-if (first) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-} else {
-  upstream = next(incoming);
-}
-if (upstream == IterOutcome.OK && sorter == null) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-}
-switch (upstream) {
-case NONE:
-  if (first) {
-return upstream;
-  }
-  break outer;
-case NOT_YET:
-  throw new UnsupportedOperationException();
-case STOP:
-  return upstream;
-case OK_NEW_SCHEMA:
-case OK:
-  VectorContainer convertedBatch;
-  // only change in the case that the schema truly changes.  
Artificial schema changes are ignored.
-  if (upstream == IterOutcome.OK_NEW_SCHEMA && 
!incoming.getSchema().equals(schema)) {
-if (schema != null) {
-  if (unionTypeEnabled) {
-this.schema = SchemaUtil.mergeSchemas(schema, 
incoming.getSchema());
-  } else {
-throw new SchemaChangeException("Schema changes not supported 
in External Sort. Please enable Union type");
-  }
-} else {
-  schema = incoming.getSchema();
-}
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-for (BatchGroup b : batchGroups) {
-  b.setSchema(schema);
-}
-for (BatchGroup b : spilledBatchGroups) {
-  b.setSchema(schema);
-}
-this.sorter = createNewSorter(context, convertedBatch);
-  } else {
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-  }
-  if (first) {
-first = false;
-  }
-  if (convertedBatch.getRecordCount() == 0) {
-for (VectorWrapper w : convertedBatch) {
-  w.clear();
-}
-break;
-  }
-  SelectionVector2 sv2;
-  if (incoming.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
-sv2 = incoming.getSelectionVector2().clone();
-  } else {
-try {
-  sv2 = newSV2();
-} catch(InterruptedException e) {
-  return IterOutcome.STOP;
-} catch (OutOfMemoryException e) {
-  throw new OutOfMemoryException(e);
-}
-  }
+// But if results iterator next returns true that means it has 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003961#comment-17003961
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

paul-rogers commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r361572415
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
 ##
 @@ -18,140 +18,300 @@
 package org.apache.drill.exec.physical.impl.xsort;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.cache.VectorSerializer;
+import org.apache.drill.exec.cache.VectorSerializer.Writer;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.SchemaUtil;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 
-public class BatchGroup implements VectorAccessible, AutoCloseable {
+/**
+ * Represents a group of batches spilled to disk.
+ * 
+ * The batches are defined by a schema which can change over time. When the 
schema changes,
+ * all existing and new batches are coerced into the new schema. Provides a
+ * uniform way to iterate over records for one or more batches whether
+ * the batches are in memory or on disk.
+ * 
+ * The BatchGroup operates in two modes as given by the two
+ * subclasses:
+ * 
+ * Input mode (@link InputBatchGroup): Used to buffer in-memory batches
+ * prior to spilling.
+ * Spill mode (@link SpilledBatchGroup): Holds a "memento" to a set
+ * of batches written to disk. Acts as both a reader and writer for
+ * those batches.
+ */
+
+public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BatchGroup.class);
 
-  private VectorContainer currentContainer;
-  private SelectionVector2 sv2;
-  private int pointer = 0;
-  private FSDataInputStream inputStream;
-  private FSDataOutputStream outputStream;
-  private Path path;
-  private FileSystem fs;
-  private BufferAllocator allocator;
-  private int spilledBatches = 0;
-  private OperatorContext context;
-  private BatchSchema schema;
-
-  public BatchGroup(VectorContainer container, SelectionVector2 sv2, 
OperatorContext context) {
-this.sv2 = sv2;
-this.currentContainer = container;
-this.context = context;
-  }
+  /**
+   * The input batch group gathers batches buffered in memory before
+   * spilling. The structure of the data is:
+   * 
+   * Contains a single batch received from the upstream (input)
+   * operator.
+   * Associated selection vector that provides a sorted
+   * indirection to the values in the batch.
+   * 
+   */
 
-  public BatchGroup(VectorContainer container, FileSystem fs, String path, 
OperatorContext context) {
-currentContainer = container;
-this.fs = fs;
-this.path = new Path(path);
-this.allocator = context.getAllocator();
-this.context = context;
-  }
+  public static class InputBatch extends BatchGroup {
+private final SelectionVector2 sv2;
+private final long dataSize;
+
+public InputBatch(VectorContainer container, SelectionVector2 sv2, 
BufferAllocator allocator, long dataSize) {
+  super(container, allocator);
+  this.sv2 = sv2;
+  this.dataSize = dataSize;
+}
 
-  public SelectionVector2 getSv2() {
-return sv2;
+public SelectionVector2 getSv2() { return sv2; }
+
+public long getDataSize() { return dataSize; }
+
+@Override
+public int getRecordCount() {
+  if (sv2 != null) {
+return sv2.getCount();
+  } else {
+return super.getRecordCount();
+  }
+}
+
+@Override
+public int getNextIndex() {
+  int val = super.getNextIndex();
+  if (val == -1) {
+return val;
+  }
+  return 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003957#comment-17003957
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

paul-rogers commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r361574025
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
 ##
 @@ -18,140 +18,300 @@
 package org.apache.drill.exec.physical.impl.xsort;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.cache.VectorSerializer;
+import org.apache.drill.exec.cache.VectorSerializer.Writer;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.SchemaUtil;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 
-public class BatchGroup implements VectorAccessible, AutoCloseable {
+/**
+ * Represents a group of batches spilled to disk.
+ * 
+ * The batches are defined by a schema which can change over time. When the 
schema changes,
+ * all existing and new batches are coerced into the new schema. Provides a
+ * uniform way to iterate over records for one or more batches whether
+ * the batches are in memory or on disk.
+ * 
+ * The BatchGroup operates in two modes as given by the two
+ * subclasses:
+ * 
+ * Input mode (@link InputBatchGroup): Used to buffer in-memory batches
+ * prior to spilling.
+ * Spill mode (@link SpilledBatchGroup): Holds a "memento" to a set
+ * of batches written to disk. Acts as both a reader and writer for
+ * those batches.
+ */
+
+public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BatchGroup.class);
 
-  private VectorContainer currentContainer;
-  private SelectionVector2 sv2;
-  private int pointer = 0;
-  private FSDataInputStream inputStream;
-  private FSDataOutputStream outputStream;
-  private Path path;
-  private FileSystem fs;
-  private BufferAllocator allocator;
-  private int spilledBatches = 0;
-  private OperatorContext context;
-  private BatchSchema schema;
-
-  public BatchGroup(VectorContainer container, SelectionVector2 sv2, 
OperatorContext context) {
-this.sv2 = sv2;
-this.currentContainer = container;
-this.context = context;
-  }
+  /**
+   * The input batch group gathers batches buffered in memory before
+   * spilling. The structure of the data is:
+   * 
+   * Contains a single batch received from the upstream (input)
+   * operator.
+   * Associated selection vector that provides a sorted
+   * indirection to the values in the batch.
+   * 
+   */
 
-  public BatchGroup(VectorContainer container, FileSystem fs, String path, 
OperatorContext context) {
-currentContainer = container;
-this.fs = fs;
-this.path = new Path(path);
-this.allocator = context.getAllocator();
-this.context = context;
-  }
+  public static class InputBatch extends BatchGroup {
+private final SelectionVector2 sv2;
+private final long dataSize;
+
+public InputBatch(VectorContainer container, SelectionVector2 sv2, 
BufferAllocator allocator, long dataSize) {
+  super(container, allocator);
+  this.sv2 = sv2;
+  this.dataSize = dataSize;
+}
 
-  public SelectionVector2 getSv2() {
-return sv2;
+public SelectionVector2 getSv2() { return sv2; }
+
+public long getDataSize() { return dataSize; }
+
+@Override
+public int getRecordCount() {
+  if (sv2 != null) {
+return sv2.getCount();
+  } else {
+return super.getRecordCount();
+  }
+}
+
+@Override
+public int getNextIndex() {
+  int val = super.getNextIndex();
+  if (val == -1) {
+return val;
+  }
+  return 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003966#comment-17003966
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

paul-rogers commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r361588150
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##
 @@ -286,540 +291,419 @@ public void buildSchema() throws SchemaChangeException 
{
 state = BatchState.DONE;
 break;
   default:
-break;
+throw new IllegalStateException("Unexpected iter outcome: " + outcome);
 }
   }
 
+  /**
+   * Process each request for a batch. The first request retrieves
+   * all the incoming batches and sorts them, optionally spilling to
+   * disk as needed. Subsequent calls retrieve the sorted results in
+   * fixed-size batches.
+   */
+
   @Override
   public IterOutcome innerNext() {
-if (schema != null) {
-  if (spillCount == 0) {
-return (getSelectionVector4().next()) ? IterOutcome.OK : 
IterOutcome.NONE;
-  } else {
-Stopwatch w = Stopwatch.createStarted();
-int count = copier.next(targetRecordCount);
-if (count > 0) {
-  long t = w.elapsed(TimeUnit.MICROSECONDS);
-  logger.debug("Took {} us to merge {} records", t, count);
-  container.setRecordCount(count);
-  return IterOutcome.OK;
-} else {
-  logger.debug("copier returned 0 records");
-  return IterOutcome.NONE;
-}
+switch (sortState) {
+case DONE:
+  return NONE;
+case START:
+  return load();
+case LOAD:
+  if (!this.retainInMemoryBatchesOnNone) {
+resetSortState();
   }
+  return (sortState == SortState.DONE) ? NONE : load();
+case DELIVER:
+  return nextOutputBatch();
+default:
+  throw new IllegalStateException("Unexpected sort state: " + sortState);
 }
+  }
 
-int totalCount = 0;
-int totalBatches = 0; // total number of batches received so far
+  private IterOutcome nextOutputBatch() {
+// Call next on outputSV4 for it's state to progress in parallel to 
resultsIterator state
+outputSV4.next();
 
-try{
-  container.clear();
-  outer: while (true) {
-IterOutcome upstream;
-if (first) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-} else {
-  upstream = next(incoming);
-}
-if (upstream == IterOutcome.OK && sorter == null) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-}
-switch (upstream) {
-case NONE:
-  if (first) {
-return upstream;
-  }
-  break outer;
-case NOT_YET:
-  throw new UnsupportedOperationException();
-case STOP:
-  return upstream;
-case OK_NEW_SCHEMA:
-case OK:
-  VectorContainer convertedBatch;
-  // only change in the case that the schema truly changes.  
Artificial schema changes are ignored.
-  if (upstream == IterOutcome.OK_NEW_SCHEMA && 
!incoming.getSchema().equals(schema)) {
-if (schema != null) {
-  if (unionTypeEnabled) {
-this.schema = SchemaUtil.mergeSchemas(schema, 
incoming.getSchema());
-  } else {
-throw new SchemaChangeException("Schema changes not supported 
in External Sort. Please enable Union type");
-  }
-} else {
-  schema = incoming.getSchema();
-}
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-for (BatchGroup b : batchGroups) {
-  b.setSchema(schema);
-}
-for (BatchGroup b : spilledBatchGroups) {
-  b.setSchema(schema);
-}
-this.sorter = createNewSorter(context, convertedBatch);
-  } else {
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-  }
-  if (first) {
-first = false;
-  }
-  if (convertedBatch.getRecordCount() == 0) {
-for (VectorWrapper w : convertedBatch) {
-  w.clear();
-}
-break;
-  }
-  SelectionVector2 sv2;
-  if (incoming.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
-sv2 = incoming.getSelectionVector2().clone();
-  } else {
-try {
-  sv2 = newSV2();
-} catch(InterruptedException e) {
-  return IterOutcome.STOP;
-} catch (OutOfMemoryException e) {
-  throw new OutOfMemoryException(e);
-}
-  }
+// But if results iterator next returns true that means it has 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003958#comment-17003958
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

paul-rogers commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r361570691
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
 ##
 @@ -18,140 +18,300 @@
 package org.apache.drill.exec.physical.impl.xsort;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.cache.VectorSerializer;
+import org.apache.drill.exec.cache.VectorSerializer.Writer;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.SchemaUtil;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 
-public class BatchGroup implements VectorAccessible, AutoCloseable {
+/**
+ * Represents a group of batches spilled to disk.
+ * 
+ * The batches are defined by a schema which can change over time. When the 
schema changes,
+ * all existing and new batches are coerced into the new schema. Provides a
+ * uniform way to iterate over records for one or more batches whether
+ * the batches are in memory or on disk.
+ * 
+ * The BatchGroup operates in two modes as given by the two
+ * subclasses:
+ * 
+ * Input mode (@link InputBatchGroup): Used to buffer in-memory batches
+ * prior to spilling.
+ * Spill mode (@link SpilledBatchGroup): Holds a "memento" to a set
+ * of batches written to disk. Acts as both a reader and writer for
+ * those batches.
+ */
+
+public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BatchGroup.class);
 
-  private VectorContainer currentContainer;
-  private SelectionVector2 sv2;
-  private int pointer = 0;
-  private FSDataInputStream inputStream;
-  private FSDataOutputStream outputStream;
-  private Path path;
-  private FileSystem fs;
-  private BufferAllocator allocator;
-  private int spilledBatches = 0;
-  private OperatorContext context;
-  private BatchSchema schema;
-
-  public BatchGroup(VectorContainer container, SelectionVector2 sv2, 
OperatorContext context) {
-this.sv2 = sv2;
-this.currentContainer = container;
-this.context = context;
-  }
+  /**
+   * The input batch group gathers batches buffered in memory before
+   * spilling. The structure of the data is:
+   * 
+   * Contains a single batch received from the upstream (input)
+   * operator.
+   * Associated selection vector that provides a sorted
+   * indirection to the values in the batch.
+   * 
+   */
 
-  public BatchGroup(VectorContainer container, FileSystem fs, String path, 
OperatorContext context) {
-currentContainer = container;
-this.fs = fs;
-this.path = new Path(path);
-this.allocator = context.getAllocator();
-this.context = context;
-  }
+  public static class InputBatch extends BatchGroup {
 
 Review comment:
   Actually, the actual code changes were done a couple of years back. This PR 
simply shifts files from the "managed" directory to the "xsort" directory; no 
other "substantial" changes were made in this PR. This PR was hard enough to 
review without doing anything "fancy."
   
   Still, since you are now familiar with the changes, went ahead and move the 
classes as suggested.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003962#comment-17003962
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

paul-rogers commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r361584399
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##
 @@ -286,540 +291,419 @@ public void buildSchema() throws SchemaChangeException 
{
 state = BatchState.DONE;
 break;
   default:
-break;
+throw new IllegalStateException("Unexpected iter outcome: " + outcome);
 }
   }
 
+  /**
+   * Process each request for a batch. The first request retrieves
+   * all the incoming batches and sorts them, optionally spilling to
+   * disk as needed. Subsequent calls retrieve the sorted results in
+   * fixed-size batches.
+   */
+
   @Override
   public IterOutcome innerNext() {
-if (schema != null) {
-  if (spillCount == 0) {
-return (getSelectionVector4().next()) ? IterOutcome.OK : 
IterOutcome.NONE;
-  } else {
-Stopwatch w = Stopwatch.createStarted();
-int count = copier.next(targetRecordCount);
-if (count > 0) {
-  long t = w.elapsed(TimeUnit.MICROSECONDS);
-  logger.debug("Took {} us to merge {} records", t, count);
-  container.setRecordCount(count);
-  return IterOutcome.OK;
-} else {
-  logger.debug("copier returned 0 records");
-  return IterOutcome.NONE;
-}
+switch (sortState) {
+case DONE:
+  return NONE;
+case START:
+  return load();
+case LOAD:
+  if (!this.retainInMemoryBatchesOnNone) {
+resetSortState();
   }
+  return (sortState == SortState.DONE) ? NONE : load();
+case DELIVER:
+  return nextOutputBatch();
+default:
+  throw new IllegalStateException("Unexpected sort state: " + sortState);
 }
+  }
 
-int totalCount = 0;
-int totalBatches = 0; // total number of batches received so far
+  private IterOutcome nextOutputBatch() {
+// Call next on outputSV4 for it's state to progress in parallel to 
resultsIterator state
+outputSV4.next();
 
-try{
-  container.clear();
-  outer: while (true) {
-IterOutcome upstream;
-if (first) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-} else {
-  upstream = next(incoming);
-}
-if (upstream == IterOutcome.OK && sorter == null) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-}
-switch (upstream) {
-case NONE:
-  if (first) {
-return upstream;
-  }
-  break outer;
-case NOT_YET:
-  throw new UnsupportedOperationException();
-case STOP:
-  return upstream;
-case OK_NEW_SCHEMA:
-case OK:
-  VectorContainer convertedBatch;
-  // only change in the case that the schema truly changes.  
Artificial schema changes are ignored.
-  if (upstream == IterOutcome.OK_NEW_SCHEMA && 
!incoming.getSchema().equals(schema)) {
-if (schema != null) {
-  if (unionTypeEnabled) {
-this.schema = SchemaUtil.mergeSchemas(schema, 
incoming.getSchema());
-  } else {
-throw new SchemaChangeException("Schema changes not supported 
in External Sort. Please enable Union type");
-  }
-} else {
-  schema = incoming.getSchema();
-}
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-for (BatchGroup b : batchGroups) {
-  b.setSchema(schema);
-}
-for (BatchGroup b : spilledBatchGroups) {
-  b.setSchema(schema);
-}
-this.sorter = createNewSorter(context, convertedBatch);
-  } else {
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-  }
-  if (first) {
-first = false;
-  }
-  if (convertedBatch.getRecordCount() == 0) {
-for (VectorWrapper w : convertedBatch) {
-  w.clear();
-}
-break;
-  }
-  SelectionVector2 sv2;
-  if (incoming.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
-sv2 = incoming.getSelectionVector2().clone();
-  } else {
-try {
-  sv2 = newSV2();
-} catch(InterruptedException e) {
-  return IterOutcome.STOP;
-} catch (OutOfMemoryException e) {
-  throw new OutOfMemoryException(e);
-}
-  }
+// But if results iterator next returns true that means it has 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003963#comment-17003963
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

paul-rogers commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r361595214
 
 

 ##
 File path: 
exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
 ##
 @@ -71,38 +61,22 @@ public void mergeSortWithSv2Legacy() throws Exception {
* @throws Exception
*/
 
-  private void mergeSortWithSv2(boolean testLegacy) throws Exception {
-ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
-.configProperty(ExecConstants.EXTERNAL_SORT_DISABLE_MANAGED, false);
+  @Test
 
 Review comment:
   Actually, what is happening here is removing the previous code that ran the 
tests twice: once for the "unmanaged" sort, the other for "managed." The 
original "driver" function is gone, the "implementation" is now the test.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17003964#comment-17003964
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

paul-rogers commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r361596368
 
 

 ##
 File path: 
exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortEmitOutcome.java
 ##
 @@ -15,9 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.physical.impl.xsort.managed;
+package org.apache.drill.exec.physical.impl.xsort;
+
+import static junit.framework.TestCase.assertTrue;
 
 Review comment:
   We need to agree on an import order. (See DRILL-7352.) I have tried working 
on Drill with auto import management turned off in Eclipse. It is VERY painful.
   
   Our Developer documentation provides an Eclipse styles template that 
(should) include import ordering. Somewhere along the line, the Intellij folks 
started using a different ordering.
   
   We need to decide on an import ordering. Does not matter what it is as long 
as both IDEs can enforce it.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001127#comment-17001127
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360518128
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
 ##
 @@ -17,94 +17,95 @@
  */
 package org.apache.drill.exec.physical.impl.xsort;
 
-import io.netty.buffer.DrillBuf;
-
 import java.io.IOException;
 import java.util.List;
 
 import javax.inject.Named;
 
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 
+import io.netty.buffer.DrillBuf;
 
 public abstract class PriorityQueueCopierTemplate implements 
PriorityQueueCopier {
+//  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(PriorityQueueCopierTemplate.class);
 
 Review comment:
   ```suggestion
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001106#comment-17001106
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360450237
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##
 @@ -286,540 +291,419 @@ public void buildSchema() throws SchemaChangeException 
{
 state = BatchState.DONE;
 break;
   default:
-break;
+throw new IllegalStateException("Unexpected iter outcome: " + outcome);
 }
   }
 
+  /**
+   * Process each request for a batch. The first request retrieves
+   * all the incoming batches and sorts them, optionally spilling to
+   * disk as needed. Subsequent calls retrieve the sorted results in
+   * fixed-size batches.
+   */
+
   @Override
   public IterOutcome innerNext() {
-if (schema != null) {
-  if (spillCount == 0) {
-return (getSelectionVector4().next()) ? IterOutcome.OK : 
IterOutcome.NONE;
-  } else {
-Stopwatch w = Stopwatch.createStarted();
-int count = copier.next(targetRecordCount);
-if (count > 0) {
-  long t = w.elapsed(TimeUnit.MICROSECONDS);
-  logger.debug("Took {} us to merge {} records", t, count);
-  container.setRecordCount(count);
-  return IterOutcome.OK;
-} else {
-  logger.debug("copier returned 0 records");
-  return IterOutcome.NONE;
-}
+switch (sortState) {
+case DONE:
+  return NONE;
+case START:
+  return load();
+case LOAD:
+  if (!this.retainInMemoryBatchesOnNone) {
+resetSortState();
   }
+  return (sortState == SortState.DONE) ? NONE : load();
+case DELIVER:
+  return nextOutputBatch();
+default:
+  throw new IllegalStateException("Unexpected sort state: " + sortState);
 }
+  }
 
-int totalCount = 0;
-int totalBatches = 0; // total number of batches received so far
+  private IterOutcome nextOutputBatch() {
+// Call next on outputSV4 for it's state to progress in parallel to 
resultsIterator state
+outputSV4.next();
 
-try{
-  container.clear();
-  outer: while (true) {
-IterOutcome upstream;
-if (first) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-} else {
-  upstream = next(incoming);
-}
-if (upstream == IterOutcome.OK && sorter == null) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-}
-switch (upstream) {
-case NONE:
-  if (first) {
-return upstream;
-  }
-  break outer;
-case NOT_YET:
-  throw new UnsupportedOperationException();
-case STOP:
-  return upstream;
-case OK_NEW_SCHEMA:
-case OK:
-  VectorContainer convertedBatch;
-  // only change in the case that the schema truly changes.  
Artificial schema changes are ignored.
-  if (upstream == IterOutcome.OK_NEW_SCHEMA && 
!incoming.getSchema().equals(schema)) {
-if (schema != null) {
-  if (unionTypeEnabled) {
-this.schema = SchemaUtil.mergeSchemas(schema, 
incoming.getSchema());
-  } else {
-throw new SchemaChangeException("Schema changes not supported 
in External Sort. Please enable Union type");
-  }
-} else {
-  schema = incoming.getSchema();
-}
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-for (BatchGroup b : batchGroups) {
-  b.setSchema(schema);
-}
-for (BatchGroup b : spilledBatchGroups) {
-  b.setSchema(schema);
-}
-this.sorter = createNewSorter(context, convertedBatch);
-  } else {
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-  }
-  if (first) {
-first = false;
-  }
-  if (convertedBatch.getRecordCount() == 0) {
-for (VectorWrapper w : convertedBatch) {
-  w.clear();
-}
-break;
-  }
-  SelectionVector2 sv2;
-  if (incoming.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
-sv2 = incoming.getSelectionVector2().clone();
-  } else {
-try {
-  sv2 = newSV2();
-} catch(InterruptedException e) {
-  return IterOutcome.STOP;
-} catch (OutOfMemoryException e) {
-  throw new OutOfMemoryException(e);
-}
-  }
+// But if results iterator next returns true that means it has 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001116#comment-17001116
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360511648
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
 ##
 @@ -17,48 +17,59 @@
  */
 package org.apache.drill.exec.physical.impl.xsort;
 
-import com.typesafe.config.ConfigException;
-import io.netty.buffer.DrillBuf;
-
 import java.util.Queue;
 
 import javax.inject.Named;
 
-import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BaseAllocator;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.vector.ValueVector;
 import org.apache.hadoop.util.IndexedSortable;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Queues;
 
+import io.netty.buffer.DrillBuf;
+
 public abstract class MSortTemplate implements MSorter, IndexedSortable {
 //  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(MSortTemplate.class);
 
   private SelectionVector4 vector4;
   private SelectionVector4 aux;
+  @SuppressWarnings("unused")
+  private long compares;
+
+  /**
+   * Holds offsets into the SV4 of the start of each batch
+   * (sorted run.)
+   */
+
   private Queue runStarts = Queues.newLinkedBlockingQueue();
   private FragmentContext context;
 
   /**
-   * This is only useful for debugging and/or unit testing. Controls the 
maximum size of batches exposed to downstream
+   * Controls the maximum size of batches exposed to downstream
*/
   private int desiredRecordBatchCount;
 
   @Override
-  public void setup(final FragmentContext context, final BufferAllocator 
allocator, final SelectionVector4 vector4, final VectorContainer hyperBatch) 
throws SchemaChangeException{
+  public void setup(final FragmentContext context, final BufferAllocator 
allocator, final SelectionVector4 vector4,
+final VectorContainer hyperBatch, int outputBatchSize, int 
desiredBatchSize) throws SchemaChangeException{
 
 Review comment:
   ```suggestion
VectorContainer hyperBatch, int outputBatchSize, int 
desiredBatchSize) throws SchemaChangeException {
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001107#comment-17001107
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360426272
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##
 @@ -17,250 +17,255 @@
  */
 package org.apache.drill.exec.physical.impl.xsort;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
-import org.apache.calcite.rel.RelFieldCollation.Direction;
-import org.apache.drill.common.AutoCloseables;
-import org.apache.drill.common.config.DrillConfig;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
+
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.ErrorCollector;
-import org.apache.drill.common.expression.ErrorCollectorImpl;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.data.Order.Ordering;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.compile.sig.GeneratorMapping;
-import org.apache.drill.exec.compile.sig.MappingSet;
-import org.apache.drill.exec.exception.ClassTransformationException;
-import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.ClassGenerator;
-import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
-import org.apache.drill.exec.expr.CodeGenerator;
-import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
-import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.physical.config.ExternalSort;
-import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
-import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
+import 
org.apache.drill.exec.physical.impl.validate.IteratorValidatorBatchIterator;
+import org.apache.drill.exec.physical.impl.xsort.SortImpl.SortResults;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.SchemaUtil;
-import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.testing.ControlsInjector;
 import org.apache.drill.exec.testing.ControlsInjectorFactory;
-import org.apache.drill.exec.vector.CopyUtil;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
-import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.collect.Iterators;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import com.sun.codemodel.JConditional;
-import com.sun.codemodel.JExpr;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * External sort batch: a sort batch which can spill to disk in
+ * order to operate within a defined memory footprint.
+ * 
+ * Basic Operation
+ * The operator has three key phases:
+ * 
+ * 
+ * The load phase in which batches are read from upstream.
+ * The merge phase in which spilled batches are 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001128#comment-17001128
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r36065
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##
 @@ -286,540 +291,419 @@ public void buildSchema() throws SchemaChangeException 
{
 state = BatchState.DONE;
 break;
   default:
-break;
+throw new IllegalStateException("Unexpected iter outcome: " + outcome);
 }
   }
 
+  /**
+   * Process each request for a batch. The first request retrieves
+   * all the incoming batches and sorts them, optionally spilling to
+   * disk as needed. Subsequent calls retrieve the sorted results in
+   * fixed-size batches.
+   */
+
   @Override
   public IterOutcome innerNext() {
-if (schema != null) {
-  if (spillCount == 0) {
-return (getSelectionVector4().next()) ? IterOutcome.OK : 
IterOutcome.NONE;
-  } else {
-Stopwatch w = Stopwatch.createStarted();
-int count = copier.next(targetRecordCount);
-if (count > 0) {
-  long t = w.elapsed(TimeUnit.MICROSECONDS);
-  logger.debug("Took {} us to merge {} records", t, count);
-  container.setRecordCount(count);
-  return IterOutcome.OK;
-} else {
-  logger.debug("copier returned 0 records");
-  return IterOutcome.NONE;
-}
+switch (sortState) {
+case DONE:
+  return NONE;
+case START:
+  return load();
+case LOAD:
+  if (!this.retainInMemoryBatchesOnNone) {
+resetSortState();
   }
+  return (sortState == SortState.DONE) ? NONE : load();
+case DELIVER:
+  return nextOutputBatch();
+default:
+  throw new IllegalStateException("Unexpected sort state: " + sortState);
 }
+  }
 
-int totalCount = 0;
-int totalBatches = 0; // total number of batches received so far
+  private IterOutcome nextOutputBatch() {
+// Call next on outputSV4 for it's state to progress in parallel to 
resultsIterator state
+outputSV4.next();
 
-try{
-  container.clear();
-  outer: while (true) {
-IterOutcome upstream;
-if (first) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-} else {
-  upstream = next(incoming);
-}
-if (upstream == IterOutcome.OK && sorter == null) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-}
-switch (upstream) {
-case NONE:
-  if (first) {
-return upstream;
-  }
-  break outer;
-case NOT_YET:
-  throw new UnsupportedOperationException();
-case STOP:
-  return upstream;
-case OK_NEW_SCHEMA:
-case OK:
-  VectorContainer convertedBatch;
-  // only change in the case that the schema truly changes.  
Artificial schema changes are ignored.
-  if (upstream == IterOutcome.OK_NEW_SCHEMA && 
!incoming.getSchema().equals(schema)) {
-if (schema != null) {
-  if (unionTypeEnabled) {
-this.schema = SchemaUtil.mergeSchemas(schema, 
incoming.getSchema());
-  } else {
-throw new SchemaChangeException("Schema changes not supported 
in External Sort. Please enable Union type");
-  }
-} else {
-  schema = incoming.getSchema();
-}
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-for (BatchGroup b : batchGroups) {
-  b.setSchema(schema);
-}
-for (BatchGroup b : spilledBatchGroups) {
-  b.setSchema(schema);
-}
-this.sorter = createNewSorter(context, convertedBatch);
-  } else {
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-  }
-  if (first) {
-first = false;
-  }
-  if (convertedBatch.getRecordCount() == 0) {
-for (VectorWrapper w : convertedBatch) {
-  w.clear();
-}
-break;
-  }
-  SelectionVector2 sv2;
-  if (incoming.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
-sv2 = incoming.getSelectionVector2().clone();
-  } else {
-try {
-  sv2 = newSV2();
-} catch(InterruptedException e) {
-  return IterOutcome.STOP;
-} catch (OutOfMemoryException e) {
-  throw new OutOfMemoryException(e);
-}
-  }
+// But if results iterator next returns true that means it has 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001141#comment-17001141
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360371678
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
 ##
 @@ -18,140 +18,300 @@
 package org.apache.drill.exec.physical.impl.xsort;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.cache.VectorSerializer;
+import org.apache.drill.exec.cache.VectorSerializer.Writer;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.SchemaUtil;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 
-public class BatchGroup implements VectorAccessible, AutoCloseable {
+/**
+ * Represents a group of batches spilled to disk.
+ * 
+ * The batches are defined by a schema which can change over time. When the 
schema changes,
+ * all existing and new batches are coerced into the new schema. Provides a
+ * uniform way to iterate over records for one or more batches whether
+ * the batches are in memory or on disk.
+ * 
+ * The BatchGroup operates in two modes as given by the two
+ * subclasses:
+ * 
+ * Input mode (@link InputBatchGroup): Used to buffer in-memory batches
+ * prior to spilling.
+ * Spill mode (@link SpilledBatchGroup): Holds a "memento" to a set
+ * of batches written to disk. Acts as both a reader and writer for
+ * those batches.
+ */
+
+public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BatchGroup.class);
 
-  private VectorContainer currentContainer;
-  private SelectionVector2 sv2;
-  private int pointer = 0;
-  private FSDataInputStream inputStream;
-  private FSDataOutputStream outputStream;
-  private Path path;
-  private FileSystem fs;
-  private BufferAllocator allocator;
-  private int spilledBatches = 0;
-  private OperatorContext context;
-  private BatchSchema schema;
-
-  public BatchGroup(VectorContainer container, SelectionVector2 sv2, 
OperatorContext context) {
-this.sv2 = sv2;
-this.currentContainer = container;
-this.context = context;
-  }
+  /**
+   * The input batch group gathers batches buffered in memory before
+   * spilling. The structure of the data is:
+   * 
+   * Contains a single batch received from the upstream (input)
+   * operator.
+   * Associated selection vector that provides a sorted
+   * indirection to the values in the batch.
+   * 
+   */
 
-  public BatchGroup(VectorContainer container, FileSystem fs, String path, 
OperatorContext context) {
-currentContainer = container;
-this.fs = fs;
-this.path = new Path(path);
-this.allocator = context.getAllocator();
-this.context = context;
-  }
+  public static class InputBatch extends BatchGroup {
+private final SelectionVector2 sv2;
+private final long dataSize;
+
+public InputBatch(VectorContainer container, SelectionVector2 sv2, 
BufferAllocator allocator, long dataSize) {
+  super(container, allocator);
+  this.sv2 = sv2;
+  this.dataSize = dataSize;
+}
 
-  public SelectionVector2 getSv2() {
-return sv2;
+public SelectionVector2 getSv2() { return sv2; }
+
+public long getDataSize() { return dataSize; }
+
+@Override
+public int getRecordCount() {
+  if (sv2 != null) {
+return sv2.getCount();
+  } else {
+return super.getRecordCount();
+  }
+}
+
+@Override
+public int getNextIndex() {
+  int val = super.getNextIndex();
+  if (val == -1) {
+return val;
+  }
+  return 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001100#comment-17001100
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360365118
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
 ##
 @@ -18,140 +18,300 @@
 package org.apache.drill.exec.physical.impl.xsort;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.cache.VectorSerializer;
+import org.apache.drill.exec.cache.VectorSerializer.Writer;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.SchemaUtil;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 
-public class BatchGroup implements VectorAccessible, AutoCloseable {
+/**
+ * Represents a group of batches spilled to disk.
+ * 
+ * The batches are defined by a schema which can change over time. When the 
schema changes,
+ * all existing and new batches are coerced into the new schema. Provides a
+ * uniform way to iterate over records for one or more batches whether
+ * the batches are in memory or on disk.
+ * 
+ * The BatchGroup operates in two modes as given by the two
+ * subclasses:
+ * 
+ * Input mode (@link InputBatchGroup): Used to buffer in-memory batches
+ * prior to spilling.
+ * Spill mode (@link SpilledBatchGroup): Holds a "memento" to a set
+ * of batches written to disk. Acts as both a reader and writer for
+ * those batches.
+ */
+
 
 Review comment:
   ```suggestion
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001122#comment-17001122
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360515412
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSorter.java
 ##
 @@ -24,11 +24,15 @@
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 
-// TODO:  Doc.:  What's an MSorter?  A sorter for merge join?  something else?
-// (What's the "M" part?  Actually, rename interface to clearer.
+/**
+ * In-memory sorter. Takes a list of batches as input, produces a selection
+ * vector 4, with sorted results, as output.
+ */
+
 
 Review comment:
   ```suggestion
   
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001120#comment-17001120
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360485101
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##
 @@ -286,540 +291,419 @@ public void buildSchema() throws SchemaChangeException 
{
 state = BatchState.DONE;
 break;
   default:
-break;
+throw new IllegalStateException("Unexpected iter outcome: " + outcome);
 }
   }
 
+  /**
+   * Process each request for a batch. The first request retrieves
+   * all the incoming batches and sorts them, optionally spilling to
+   * disk as needed. Subsequent calls retrieve the sorted results in
+   * fixed-size batches.
+   */
+
   @Override
   public IterOutcome innerNext() {
-if (schema != null) {
-  if (spillCount == 0) {
-return (getSelectionVector4().next()) ? IterOutcome.OK : 
IterOutcome.NONE;
-  } else {
-Stopwatch w = Stopwatch.createStarted();
-int count = copier.next(targetRecordCount);
-if (count > 0) {
-  long t = w.elapsed(TimeUnit.MICROSECONDS);
-  logger.debug("Took {} us to merge {} records", t, count);
-  container.setRecordCount(count);
-  return IterOutcome.OK;
-} else {
-  logger.debug("copier returned 0 records");
-  return IterOutcome.NONE;
-}
+switch (sortState) {
+case DONE:
+  return NONE;
+case START:
+  return load();
+case LOAD:
+  if (!this.retainInMemoryBatchesOnNone) {
+resetSortState();
   }
+  return (sortState == SortState.DONE) ? NONE : load();
+case DELIVER:
+  return nextOutputBatch();
+default:
+  throw new IllegalStateException("Unexpected sort state: " + sortState);
 }
+  }
 
-int totalCount = 0;
-int totalBatches = 0; // total number of batches received so far
+  private IterOutcome nextOutputBatch() {
+// Call next on outputSV4 for it's state to progress in parallel to 
resultsIterator state
+outputSV4.next();
 
-try{
-  container.clear();
-  outer: while (true) {
-IterOutcome upstream;
-if (first) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-} else {
-  upstream = next(incoming);
-}
-if (upstream == IterOutcome.OK && sorter == null) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-}
-switch (upstream) {
-case NONE:
-  if (first) {
-return upstream;
-  }
-  break outer;
-case NOT_YET:
-  throw new UnsupportedOperationException();
-case STOP:
-  return upstream;
-case OK_NEW_SCHEMA:
-case OK:
-  VectorContainer convertedBatch;
-  // only change in the case that the schema truly changes.  
Artificial schema changes are ignored.
-  if (upstream == IterOutcome.OK_NEW_SCHEMA && 
!incoming.getSchema().equals(schema)) {
-if (schema != null) {
-  if (unionTypeEnabled) {
-this.schema = SchemaUtil.mergeSchemas(schema, 
incoming.getSchema());
-  } else {
-throw new SchemaChangeException("Schema changes not supported 
in External Sort. Please enable Union type");
-  }
-} else {
-  schema = incoming.getSchema();
-}
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-for (BatchGroup b : batchGroups) {
-  b.setSchema(schema);
-}
-for (BatchGroup b : spilledBatchGroups) {
-  b.setSchema(schema);
-}
-this.sorter = createNewSorter(context, convertedBatch);
-  } else {
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-  }
-  if (first) {
-first = false;
-  }
-  if (convertedBatch.getRecordCount() == 0) {
-for (VectorWrapper w : convertedBatch) {
-  w.clear();
-}
-break;
-  }
-  SelectionVector2 sv2;
-  if (incoming.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
-sv2 = incoming.getSelectionVector2().clone();
-  } else {
-try {
-  sv2 = newSV2();
-} catch(InterruptedException e) {
-  return IterOutcome.STOP;
-} catch (OutOfMemoryException e) {
-  throw new OutOfMemoryException(e);
-}
-  }
+// But if results iterator next returns true that means it has 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001123#comment-17001123
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360464313
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##
 @@ -286,540 +291,419 @@ public void buildSchema() throws SchemaChangeException 
{
 state = BatchState.DONE;
 break;
   default:
-break;
+throw new IllegalStateException("Unexpected iter outcome: " + outcome);
 }
   }
 
+  /**
+   * Process each request for a batch. The first request retrieves
+   * all the incoming batches and sorts them, optionally spilling to
+   * disk as needed. Subsequent calls retrieve the sorted results in
+   * fixed-size batches.
+   */
+
   @Override
   public IterOutcome innerNext() {
-if (schema != null) {
-  if (spillCount == 0) {
-return (getSelectionVector4().next()) ? IterOutcome.OK : 
IterOutcome.NONE;
-  } else {
-Stopwatch w = Stopwatch.createStarted();
-int count = copier.next(targetRecordCount);
-if (count > 0) {
-  long t = w.elapsed(TimeUnit.MICROSECONDS);
-  logger.debug("Took {} us to merge {} records", t, count);
-  container.setRecordCount(count);
-  return IterOutcome.OK;
-} else {
-  logger.debug("copier returned 0 records");
-  return IterOutcome.NONE;
-}
+switch (sortState) {
+case DONE:
+  return NONE;
+case START:
+  return load();
+case LOAD:
+  if (!this.retainInMemoryBatchesOnNone) {
+resetSortState();
   }
+  return (sortState == SortState.DONE) ? NONE : load();
+case DELIVER:
+  return nextOutputBatch();
+default:
+  throw new IllegalStateException("Unexpected sort state: " + sortState);
 }
+  }
 
-int totalCount = 0;
-int totalBatches = 0; // total number of batches received so far
+  private IterOutcome nextOutputBatch() {
+// Call next on outputSV4 for it's state to progress in parallel to 
resultsIterator state
+outputSV4.next();
 
-try{
-  container.clear();
-  outer: while (true) {
-IterOutcome upstream;
-if (first) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-} else {
-  upstream = next(incoming);
-}
-if (upstream == IterOutcome.OK && sorter == null) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-}
-switch (upstream) {
-case NONE:
-  if (first) {
-return upstream;
-  }
-  break outer;
-case NOT_YET:
-  throw new UnsupportedOperationException();
-case STOP:
-  return upstream;
-case OK_NEW_SCHEMA:
-case OK:
-  VectorContainer convertedBatch;
-  // only change in the case that the schema truly changes.  
Artificial schema changes are ignored.
-  if (upstream == IterOutcome.OK_NEW_SCHEMA && 
!incoming.getSchema().equals(schema)) {
-if (schema != null) {
-  if (unionTypeEnabled) {
-this.schema = SchemaUtil.mergeSchemas(schema, 
incoming.getSchema());
-  } else {
-throw new SchemaChangeException("Schema changes not supported 
in External Sort. Please enable Union type");
-  }
-} else {
-  schema = incoming.getSchema();
-}
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-for (BatchGroup b : batchGroups) {
-  b.setSchema(schema);
-}
-for (BatchGroup b : spilledBatchGroups) {
-  b.setSchema(schema);
-}
-this.sorter = createNewSorter(context, convertedBatch);
-  } else {
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-  }
-  if (first) {
-first = false;
-  }
-  if (convertedBatch.getRecordCount() == 0) {
-for (VectorWrapper w : convertedBatch) {
-  w.clear();
-}
-break;
-  }
-  SelectionVector2 sv2;
-  if (incoming.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
-sv2 = incoming.getSelectionVector2().clone();
-  } else {
-try {
-  sv2 = newSV2();
-} catch(InterruptedException e) {
-  return IterOutcome.STOP;
-} catch (OutOfMemoryException e) {
-  throw new OutOfMemoryException(e);
-}
-  }
+// But if results iterator next returns true that means it has 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001099#comment-17001099
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360365258
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
 ##
 @@ -18,140 +18,300 @@
 package org.apache.drill.exec.physical.impl.xsort;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.cache.VectorSerializer;
+import org.apache.drill.exec.cache.VectorSerializer.Writer;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.SchemaUtil;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 
-public class BatchGroup implements VectorAccessible, AutoCloseable {
+/**
+ * Represents a group of batches spilled to disk.
+ * 
+ * The batches are defined by a schema which can change over time. When the 
schema changes,
+ * all existing and new batches are coerced into the new schema. Provides a
+ * uniform way to iterate over records for one or more batches whether
+ * the batches are in memory or on disk.
+ * 
+ * The BatchGroup operates in two modes as given by the two
+ * subclasses:
+ * 
+ * Input mode (@link InputBatchGroup): Used to buffer in-memory batches
+ * prior to spilling.
+ * Spill mode (@link SpilledBatchGroup): Holds a "memento" to a set
+ * of batches written to disk. Acts as both a reader and writer for
+ * those batches.
+ */
+
+public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BatchGroup.class);
 
-  private VectorContainer currentContainer;
-  private SelectionVector2 sv2;
-  private int pointer = 0;
-  private FSDataInputStream inputStream;
-  private FSDataOutputStream outputStream;
-  private Path path;
-  private FileSystem fs;
-  private BufferAllocator allocator;
-  private int spilledBatches = 0;
-  private OperatorContext context;
-  private BatchSchema schema;
-
-  public BatchGroup(VectorContainer container, SelectionVector2 sv2, 
OperatorContext context) {
-this.sv2 = sv2;
-this.currentContainer = container;
-this.context = context;
-  }
+  /**
+   * The input batch group gathers batches buffered in memory before
+   * spilling. The structure of the data is:
+   * 
+   * Contains a single batch received from the upstream (input)
+   * operator.
+   * Associated selection vector that provides a sorted
+   * indirection to the values in the batch.
+   * 
+   */
 
 
 Review comment:
   ```suggestion
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001130#comment-17001130
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360526022
 
 

 ##
 File path: 
exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java
 ##
 @@ -71,38 +61,22 @@ public void mergeSortWithSv2Legacy() throws Exception {
* @throws Exception
*/
 
-  private void mergeSortWithSv2(boolean testLegacy) throws Exception {
-ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
-.configProperty(ExecConstants.EXTERNAL_SORT_DISABLE_MANAGED, false);
+  @Test
 
 Review comment:
   good catch :+1: 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001136#comment-17001136
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360517824
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
 ##
 @@ -23,14 +23,10 @@
 import org.apache.drill.exec.compile.TemplateClassDefinition;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.VectorAccessible;
 
 public interface PriorityQueueCopier extends AutoCloseable {
-  public static final long INITIAL_ALLOCATION = 1000;
-  public static final long MAX_ALLOCATION = 2000;
-
-  public void setup(FragmentContext context, BufferAllocator allocator, 
VectorAccessible hyperBatch,
+  public void setup(BufferAllocator allocator, VectorAccessible hyperBatch,
 
 Review comment:
   please remove redundant ```public```, ```final```, ```abstract``` modifiers 
in the interface and move constant above methods. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001118#comment-17001118
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360443648
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##
 @@ -286,540 +291,419 @@ public void buildSchema() throws SchemaChangeException 
{
 state = BatchState.DONE;
 break;
   default:
-break;
+throw new IllegalStateException("Unexpected iter outcome: " + outcome);
 }
   }
 
+  /**
+   * Process each request for a batch. The first request retrieves
+   * all the incoming batches and sorts them, optionally spilling to
+   * disk as needed. Subsequent calls retrieve the sorted results in
+   * fixed-size batches.
+   */
+
   @Override
   public IterOutcome innerNext() {
-if (schema != null) {
-  if (spillCount == 0) {
-return (getSelectionVector4().next()) ? IterOutcome.OK : 
IterOutcome.NONE;
-  } else {
-Stopwatch w = Stopwatch.createStarted();
-int count = copier.next(targetRecordCount);
-if (count > 0) {
-  long t = w.elapsed(TimeUnit.MICROSECONDS);
-  logger.debug("Took {} us to merge {} records", t, count);
-  container.setRecordCount(count);
-  return IterOutcome.OK;
-} else {
-  logger.debug("copier returned 0 records");
-  return IterOutcome.NONE;
-}
+switch (sortState) {
+case DONE:
+  return NONE;
+case START:
+  return load();
+case LOAD:
+  if (!this.retainInMemoryBatchesOnNone) {
+resetSortState();
   }
+  return (sortState == SortState.DONE) ? NONE : load();
+case DELIVER:
+  return nextOutputBatch();
+default:
+  throw new IllegalStateException("Unexpected sort state: " + sortState);
 }
+  }
 
-int totalCount = 0;
-int totalBatches = 0; // total number of batches received so far
+  private IterOutcome nextOutputBatch() {
+// Call next on outputSV4 for it's state to progress in parallel to 
resultsIterator state
+outputSV4.next();
 
-try{
-  container.clear();
-  outer: while (true) {
-IterOutcome upstream;
-if (first) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-} else {
-  upstream = next(incoming);
-}
-if (upstream == IterOutcome.OK && sorter == null) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-}
-switch (upstream) {
-case NONE:
-  if (first) {
-return upstream;
-  }
-  break outer;
-case NOT_YET:
-  throw new UnsupportedOperationException();
-case STOP:
-  return upstream;
-case OK_NEW_SCHEMA:
-case OK:
-  VectorContainer convertedBatch;
-  // only change in the case that the schema truly changes.  
Artificial schema changes are ignored.
-  if (upstream == IterOutcome.OK_NEW_SCHEMA && 
!incoming.getSchema().equals(schema)) {
-if (schema != null) {
-  if (unionTypeEnabled) {
-this.schema = SchemaUtil.mergeSchemas(schema, 
incoming.getSchema());
-  } else {
-throw new SchemaChangeException("Schema changes not supported 
in External Sort. Please enable Union type");
-  }
-} else {
-  schema = incoming.getSchema();
-}
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-for (BatchGroup b : batchGroups) {
-  b.setSchema(schema);
-}
-for (BatchGroup b : spilledBatchGroups) {
-  b.setSchema(schema);
-}
-this.sorter = createNewSorter(context, convertedBatch);
-  } else {
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-  }
-  if (first) {
-first = false;
-  }
-  if (convertedBatch.getRecordCount() == 0) {
-for (VectorWrapper w : convertedBatch) {
-  w.clear();
-}
-break;
-  }
-  SelectionVector2 sv2;
-  if (incoming.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
-sv2 = incoming.getSelectionVector2().clone();
-  } else {
-try {
-  sv2 = newSV2();
-} catch(InterruptedException e) {
-  return IterOutcome.STOP;
-} catch (OutOfMemoryException e) {
-  throw new OutOfMemoryException(e);
-}
-  }
+// But if results iterator next returns true that means it has 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001098#comment-17001098
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360365036
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
 ##
 @@ -18,140 +18,300 @@
 package org.apache.drill.exec.physical.impl.xsort;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.cache.VectorSerializer;
+import org.apache.drill.exec.cache.VectorSerializer.Writer;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.SchemaUtil;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 
-public class BatchGroup implements VectorAccessible, AutoCloseable {
+/**
+ * Represents a group of batches spilled to disk.
+ * 
+ * The batches are defined by a schema which can change over time. When the 
schema changes,
+ * all existing and new batches are coerced into the new schema. Provides a
+ * uniform way to iterate over records for one or more batches whether
+ * the batches are in memory or on disk.
+ * 
+ * The BatchGroup operates in two modes as given by the two
+ * subclasses:
+ * 
+ * Input mode (@link InputBatchGroup): Used to buffer in-memory batches
+ * prior to spilling.
+ * Spill mode (@link SpilledBatchGroup): Holds a "memento" to a set
 
 Review comment:
   ```suggestion
* Spill mode {@link SpilledRun}: Holds a "memento" to a set
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001138#comment-17001138
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360508558
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
 ##
 @@ -123,14 +145,24 @@ public SelectionVector4 getSV4() {
 return vector4;
   }
 
+  /**
+   * Merge a set of pre-sorted runs to produce a combined
+   * result set. Merging is done in the selection vector, record data does
+   * not move.
+   * 
+   * Runs are merge pairwise in multiple passes, providing performance
+   * of O(n * m * log(n)), where n = number of runs, m = number of records
+   * per run.
+   */
+
 
 Review comment:
   ```suggestion
   
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001104#comment-17001104
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360368665
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
 ##
 @@ -18,140 +18,300 @@
 package org.apache.drill.exec.physical.impl.xsort;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.cache.VectorSerializer;
+import org.apache.drill.exec.cache.VectorSerializer.Writer;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.SchemaUtil;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 
-public class BatchGroup implements VectorAccessible, AutoCloseable {
+/**
+ * Represents a group of batches spilled to disk.
+ * 
+ * The batches are defined by a schema which can change over time. When the 
schema changes,
+ * all existing and new batches are coerced into the new schema. Provides a
+ * uniform way to iterate over records for one or more batches whether
+ * the batches are in memory or on disk.
+ * 
+ * The BatchGroup operates in two modes as given by the two
+ * subclasses:
+ * 
+ * Input mode (@link InputBatchGroup): Used to buffer in-memory batches
+ * prior to spilling.
+ * Spill mode (@link SpilledBatchGroup): Holds a "memento" to a set
+ * of batches written to disk. Acts as both a reader and writer for
+ * those batches.
+ */
+
+public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BatchGroup.class);
 
-  private VectorContainer currentContainer;
-  private SelectionVector2 sv2;
-  private int pointer = 0;
-  private FSDataInputStream inputStream;
-  private FSDataOutputStream outputStream;
-  private Path path;
-  private FileSystem fs;
-  private BufferAllocator allocator;
-  private int spilledBatches = 0;
-  private OperatorContext context;
-  private BatchSchema schema;
-
-  public BatchGroup(VectorContainer container, SelectionVector2 sv2, 
OperatorContext context) {
-this.sv2 = sv2;
-this.currentContainer = container;
-this.context = context;
-  }
+  /**
+   * The input batch group gathers batches buffered in memory before
+   * spilling. The structure of the data is:
+   * 
+   * Contains a single batch received from the upstream (input)
+   * operator.
+   * Associated selection vector that provides a sorted
+   * indirection to the values in the batch.
+   * 
+   */
 
-  public BatchGroup(VectorContainer container, FileSystem fs, String path, 
OperatorContext context) {
-currentContainer = container;
-this.fs = fs;
-this.path = new Path(path);
-this.allocator = context.getAllocator();
-this.context = context;
-  }
+  public static class InputBatch extends BatchGroup {
 
 Review comment:
   I really appreciate that you split the responsibilities of ```BatchGroup``` 
class and created ```InputBatch``` & ```SpilledRun```.  Could you please 
extract them into separate source files? 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
> 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1700#comment-1700
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360429440
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##
 @@ -286,540 +291,419 @@ public void buildSchema() throws SchemaChangeException 
{
 state = BatchState.DONE;
 break;
   default:
-break;
+throw new IllegalStateException("Unexpected iter outcome: " + outcome);
 }
   }
 
+  /**
+   * Process each request for a batch. The first request retrieves
+   * all the incoming batches and sorts them, optionally spilling to
+   * disk as needed. Subsequent calls retrieve the sorted results in
+   * fixed-size batches.
+   */
+
   @Override
   public IterOutcome innerNext() {
-if (schema != null) {
-  if (spillCount == 0) {
-return (getSelectionVector4().next()) ? IterOutcome.OK : 
IterOutcome.NONE;
-  } else {
-Stopwatch w = Stopwatch.createStarted();
-int count = copier.next(targetRecordCount);
-if (count > 0) {
-  long t = w.elapsed(TimeUnit.MICROSECONDS);
-  logger.debug("Took {} us to merge {} records", t, count);
-  container.setRecordCount(count);
-  return IterOutcome.OK;
-} else {
-  logger.debug("copier returned 0 records");
-  return IterOutcome.NONE;
-}
+switch (sortState) {
+case DONE:
+  return NONE;
+case START:
+  return load();
+case LOAD:
+  if (!this.retainInMemoryBatchesOnNone) {
+resetSortState();
   }
+  return (sortState == SortState.DONE) ? NONE : load();
+case DELIVER:
+  return nextOutputBatch();
+default:
+  throw new IllegalStateException("Unexpected sort state: " + sortState);
 }
+  }
 
-int totalCount = 0;
-int totalBatches = 0; // total number of batches received so far
+  private IterOutcome nextOutputBatch() {
+// Call next on outputSV4 for it's state to progress in parallel to 
resultsIterator state
+outputSV4.next();
 
-try{
-  container.clear();
-  outer: while (true) {
-IterOutcome upstream;
-if (first) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-} else {
-  upstream = next(incoming);
-}
-if (upstream == IterOutcome.OK && sorter == null) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-}
-switch (upstream) {
-case NONE:
-  if (first) {
-return upstream;
-  }
-  break outer;
-case NOT_YET:
-  throw new UnsupportedOperationException();
-case STOP:
-  return upstream;
-case OK_NEW_SCHEMA:
-case OK:
-  VectorContainer convertedBatch;
-  // only change in the case that the schema truly changes.  
Artificial schema changes are ignored.
-  if (upstream == IterOutcome.OK_NEW_SCHEMA && 
!incoming.getSchema().equals(schema)) {
-if (schema != null) {
-  if (unionTypeEnabled) {
-this.schema = SchemaUtil.mergeSchemas(schema, 
incoming.getSchema());
-  } else {
-throw new SchemaChangeException("Schema changes not supported 
in External Sort. Please enable Union type");
-  }
-} else {
-  schema = incoming.getSchema();
-}
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-for (BatchGroup b : batchGroups) {
-  b.setSchema(schema);
-}
-for (BatchGroup b : spilledBatchGroups) {
-  b.setSchema(schema);
-}
-this.sorter = createNewSorter(context, convertedBatch);
-  } else {
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-  }
-  if (first) {
-first = false;
-  }
-  if (convertedBatch.getRecordCount() == 0) {
-for (VectorWrapper w : convertedBatch) {
-  w.clear();
-}
-break;
-  }
-  SelectionVector2 sv2;
-  if (incoming.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
-sv2 = incoming.getSelectionVector2().clone();
-  } else {
-try {
-  sv2 = newSV2();
-} catch(InterruptedException e) {
-  return IterOutcome.STOP;
-} catch (OutOfMemoryException e) {
-  throw new OutOfMemoryException(e);
-}
-  }
+// But if results iterator next returns true that means it has 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001117#comment-17001117
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360391704
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
 ##
 @@ -18,140 +18,300 @@
 package org.apache.drill.exec.physical.impl.xsort;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.cache.VectorSerializer;
+import org.apache.drill.exec.cache.VectorSerializer.Writer;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.SchemaUtil;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 
-public class BatchGroup implements VectorAccessible, AutoCloseable {
+/**
+ * Represents a group of batches spilled to disk.
+ * 
+ * The batches are defined by a schema which can change over time. When the 
schema changes,
+ * all existing and new batches are coerced into the new schema. Provides a
+ * uniform way to iterate over records for one or more batches whether
+ * the batches are in memory or on disk.
+ * 
+ * The BatchGroup operates in two modes as given by the two
+ * subclasses:
+ * 
+ * Input mode (@link InputBatchGroup): Used to buffer in-memory batches
+ * prior to spilling.
+ * Spill mode (@link SpilledBatchGroup): Holds a "memento" to a set
+ * of batches written to disk. Acts as both a reader and writer for
+ * those batches.
+ */
+
+public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BatchGroup.class);
 
-  private VectorContainer currentContainer;
-  private SelectionVector2 sv2;
-  private int pointer = 0;
-  private FSDataInputStream inputStream;
-  private FSDataOutputStream outputStream;
-  private Path path;
-  private FileSystem fs;
-  private BufferAllocator allocator;
-  private int spilledBatches = 0;
-  private OperatorContext context;
-  private BatchSchema schema;
-
-  public BatchGroup(VectorContainer container, SelectionVector2 sv2, 
OperatorContext context) {
-this.sv2 = sv2;
-this.currentContainer = container;
-this.context = context;
-  }
+  /**
+   * The input batch group gathers batches buffered in memory before
+   * spilling. The structure of the data is:
+   * 
+   * Contains a single batch received from the upstream (input)
+   * operator.
+   * Associated selection vector that provides a sorted
+   * indirection to the values in the batch.
+   * 
+   */
 
-  public BatchGroup(VectorContainer container, FileSystem fs, String path, 
OperatorContext context) {
-currentContainer = container;
-this.fs = fs;
-this.path = new Path(path);
-this.allocator = context.getAllocator();
-this.context = context;
-  }
+  public static class InputBatch extends BatchGroup {
+private final SelectionVector2 sv2;
+private final long dataSize;
+
+public InputBatch(VectorContainer container, SelectionVector2 sv2, 
BufferAllocator allocator, long dataSize) {
+  super(container, allocator);
+  this.sv2 = sv2;
+  this.dataSize = dataSize;
+}
 
-  public SelectionVector2 getSv2() {
-return sv2;
+public SelectionVector2 getSv2() { return sv2; }
+
+public long getDataSize() { return dataSize; }
+
+@Override
+public int getRecordCount() {
+  if (sv2 != null) {
+return sv2.getCount();
+  } else {
+return super.getRecordCount();
+  }
+}
+
+@Override
+public int getNextIndex() {
+  int val = super.getNextIndex();
+  if (val == -1) {
+return val;
+  }
+  return 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001135#comment-17001135
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360472617
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##
 @@ -286,540 +291,419 @@ public void buildSchema() throws SchemaChangeException 
{
 state = BatchState.DONE;
 break;
   default:
-break;
+throw new IllegalStateException("Unexpected iter outcome: " + outcome);
 }
   }
 
+  /**
+   * Process each request for a batch. The first request retrieves
+   * all the incoming batches and sorts them, optionally spilling to
+   * disk as needed. Subsequent calls retrieve the sorted results in
+   * fixed-size batches.
+   */
+
   @Override
   public IterOutcome innerNext() {
-if (schema != null) {
-  if (spillCount == 0) {
-return (getSelectionVector4().next()) ? IterOutcome.OK : 
IterOutcome.NONE;
-  } else {
-Stopwatch w = Stopwatch.createStarted();
-int count = copier.next(targetRecordCount);
-if (count > 0) {
-  long t = w.elapsed(TimeUnit.MICROSECONDS);
-  logger.debug("Took {} us to merge {} records", t, count);
-  container.setRecordCount(count);
-  return IterOutcome.OK;
-} else {
-  logger.debug("copier returned 0 records");
-  return IterOutcome.NONE;
-}
+switch (sortState) {
+case DONE:
+  return NONE;
+case START:
+  return load();
+case LOAD:
+  if (!this.retainInMemoryBatchesOnNone) {
+resetSortState();
   }
+  return (sortState == SortState.DONE) ? NONE : load();
+case DELIVER:
+  return nextOutputBatch();
+default:
+  throw new IllegalStateException("Unexpected sort state: " + sortState);
 }
+  }
 
-int totalCount = 0;
-int totalBatches = 0; // total number of batches received so far
+  private IterOutcome nextOutputBatch() {
+// Call next on outputSV4 for it's state to progress in parallel to 
resultsIterator state
+outputSV4.next();
 
-try{
-  container.clear();
-  outer: while (true) {
-IterOutcome upstream;
-if (first) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-} else {
-  upstream = next(incoming);
-}
-if (upstream == IterOutcome.OK && sorter == null) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-}
-switch (upstream) {
-case NONE:
-  if (first) {
-return upstream;
-  }
-  break outer;
-case NOT_YET:
-  throw new UnsupportedOperationException();
-case STOP:
-  return upstream;
-case OK_NEW_SCHEMA:
-case OK:
-  VectorContainer convertedBatch;
-  // only change in the case that the schema truly changes.  
Artificial schema changes are ignored.
-  if (upstream == IterOutcome.OK_NEW_SCHEMA && 
!incoming.getSchema().equals(schema)) {
-if (schema != null) {
-  if (unionTypeEnabled) {
-this.schema = SchemaUtil.mergeSchemas(schema, 
incoming.getSchema());
-  } else {
-throw new SchemaChangeException("Schema changes not supported 
in External Sort. Please enable Union type");
-  }
-} else {
-  schema = incoming.getSchema();
-}
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-for (BatchGroup b : batchGroups) {
-  b.setSchema(schema);
-}
-for (BatchGroup b : spilledBatchGroups) {
-  b.setSchema(schema);
-}
-this.sorter = createNewSorter(context, convertedBatch);
-  } else {
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-  }
-  if (first) {
-first = false;
-  }
-  if (convertedBatch.getRecordCount() == 0) {
-for (VectorWrapper w : convertedBatch) {
-  w.clear();
-}
-break;
-  }
-  SelectionVector2 sv2;
-  if (incoming.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
-sv2 = incoming.getSelectionVector2().clone();
-  } else {
-try {
-  sv2 = newSV2();
-} catch(InterruptedException e) {
-  return IterOutcome.STOP;
-} catch (OutOfMemoryException e) {
-  throw new OutOfMemoryException(e);
-}
-  }
+// But if results iterator next returns true that means it has 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001119#comment-17001119
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360506932
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##
 @@ -286,540 +291,419 @@ public void buildSchema() throws SchemaChangeException 
{
 state = BatchState.DONE;
 break;
   default:
-break;
+throw new IllegalStateException("Unexpected iter outcome: " + outcome);
 }
   }
 
+  /**
+   * Process each request for a batch. The first request retrieves
+   * all the incoming batches and sorts them, optionally spilling to
+   * disk as needed. Subsequent calls retrieve the sorted results in
+   * fixed-size batches.
+   */
+
   @Override
   public IterOutcome innerNext() {
-if (schema != null) {
-  if (spillCount == 0) {
-return (getSelectionVector4().next()) ? IterOutcome.OK : 
IterOutcome.NONE;
-  } else {
-Stopwatch w = Stopwatch.createStarted();
-int count = copier.next(targetRecordCount);
-if (count > 0) {
-  long t = w.elapsed(TimeUnit.MICROSECONDS);
-  logger.debug("Took {} us to merge {} records", t, count);
-  container.setRecordCount(count);
-  return IterOutcome.OK;
-} else {
-  logger.debug("copier returned 0 records");
-  return IterOutcome.NONE;
-}
+switch (sortState) {
+case DONE:
+  return NONE;
+case START:
+  return load();
+case LOAD:
+  if (!this.retainInMemoryBatchesOnNone) {
+resetSortState();
   }
+  return (sortState == SortState.DONE) ? NONE : load();
+case DELIVER:
+  return nextOutputBatch();
+default:
+  throw new IllegalStateException("Unexpected sort state: " + sortState);
 }
+  }
 
-int totalCount = 0;
-int totalBatches = 0; // total number of batches received so far
+  private IterOutcome nextOutputBatch() {
+// Call next on outputSV4 for it's state to progress in parallel to 
resultsIterator state
+outputSV4.next();
 
-try{
-  container.clear();
-  outer: while (true) {
-IterOutcome upstream;
-if (first) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-} else {
-  upstream = next(incoming);
-}
-if (upstream == IterOutcome.OK && sorter == null) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-}
-switch (upstream) {
-case NONE:
-  if (first) {
-return upstream;
-  }
-  break outer;
-case NOT_YET:
-  throw new UnsupportedOperationException();
-case STOP:
-  return upstream;
-case OK_NEW_SCHEMA:
-case OK:
-  VectorContainer convertedBatch;
-  // only change in the case that the schema truly changes.  
Artificial schema changes are ignored.
-  if (upstream == IterOutcome.OK_NEW_SCHEMA && 
!incoming.getSchema().equals(schema)) {
-if (schema != null) {
-  if (unionTypeEnabled) {
-this.schema = SchemaUtil.mergeSchemas(schema, 
incoming.getSchema());
-  } else {
-throw new SchemaChangeException("Schema changes not supported 
in External Sort. Please enable Union type");
-  }
-} else {
-  schema = incoming.getSchema();
-}
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-for (BatchGroup b : batchGroups) {
-  b.setSchema(schema);
-}
-for (BatchGroup b : spilledBatchGroups) {
-  b.setSchema(schema);
-}
-this.sorter = createNewSorter(context, convertedBatch);
-  } else {
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-  }
-  if (first) {
-first = false;
-  }
-  if (convertedBatch.getRecordCount() == 0) {
-for (VectorWrapper w : convertedBatch) {
-  w.clear();
-}
-break;
-  }
-  SelectionVector2 sv2;
-  if (incoming.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
-sv2 = incoming.getSelectionVector2().clone();
-  } else {
-try {
-  sv2 = newSV2();
-} catch(InterruptedException e) {
-  return IterOutcome.STOP;
-} catch (OutOfMemoryException e) {
-  throw new OutOfMemoryException(e);
-}
-  }
+// But if results iterator next returns true that means it has 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001126#comment-17001126
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360515173
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
 ##
 @@ -123,14 +145,24 @@ public SelectionVector4 getSV4() {
 return vector4;
   }
 
+  /**
+   * Merge a set of pre-sorted runs to produce a combined
+   * result set. Merging is done in the selection vector, record data does
+   * not move.
+   * 
+   * Runs are merge pairwise in multiple passes, providing performance
+   * of O(n * m * log(n)), where n = number of runs, m = number of records
+   * per run.
+   */
+
   @Override
-  public void sort(final VectorContainer container) {
+  public void sort() {
 while (runStarts.size() > 1) {
+  final int totalCount = this.vector4.getTotalCount();
 
-  // check if we're cancelled/failed frequently
+  // check if we're cancelled/failed recently
   if (!context.getExecutorState().shouldContinue()) {
-return;
-  }
+return; }
 
 Review comment:
   ```suggestion
   return; 
 }
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001143#comment-17001143
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360397396
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
 ##
 @@ -18,140 +18,300 @@
 package org.apache.drill.exec.physical.impl.xsort;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.cache.VectorSerializer;
+import org.apache.drill.exec.cache.VectorSerializer.Writer;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.SchemaUtil;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 
-public class BatchGroup implements VectorAccessible, AutoCloseable {
+/**
+ * Represents a group of batches spilled to disk.
+ * 
+ * The batches are defined by a schema which can change over time. When the 
schema changes,
+ * all existing and new batches are coerced into the new schema. Provides a
+ * uniform way to iterate over records for one or more batches whether
+ * the batches are in memory or on disk.
+ * 
+ * The BatchGroup operates in two modes as given by the two
+ * subclasses:
+ * 
+ * Input mode (@link InputBatchGroup): Used to buffer in-memory batches
+ * prior to spilling.
+ * Spill mode (@link SpilledBatchGroup): Holds a "memento" to a set
+ * of batches written to disk. Acts as both a reader and writer for
+ * those batches.
+ */
+
+public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BatchGroup.class);
 
-  private VectorContainer currentContainer;
-  private SelectionVector2 sv2;
-  private int pointer = 0;
-  private FSDataInputStream inputStream;
-  private FSDataOutputStream outputStream;
-  private Path path;
-  private FileSystem fs;
-  private BufferAllocator allocator;
-  private int spilledBatches = 0;
-  private OperatorContext context;
-  private BatchSchema schema;
-
-  public BatchGroup(VectorContainer container, SelectionVector2 sv2, 
OperatorContext context) {
-this.sv2 = sv2;
-this.currentContainer = container;
-this.context = context;
-  }
+  /**
+   * The input batch group gathers batches buffered in memory before
+   * spilling. The structure of the data is:
+   * 
+   * Contains a single batch received from the upstream (input)
+   * operator.
+   * Associated selection vector that provides a sorted
+   * indirection to the values in the batch.
+   * 
+   */
 
-  public BatchGroup(VectorContainer container, FileSystem fs, String path, 
OperatorContext context) {
-currentContainer = container;
-this.fs = fs;
-this.path = new Path(path);
-this.allocator = context.getAllocator();
-this.context = context;
-  }
+  public static class InputBatch extends BatchGroup {
+private final SelectionVector2 sv2;
+private final long dataSize;
+
+public InputBatch(VectorContainer container, SelectionVector2 sv2, 
BufferAllocator allocator, long dataSize) {
+  super(container, allocator);
+  this.sv2 = sv2;
+  this.dataSize = dataSize;
+}
 
-  public SelectionVector2 getSv2() {
-return sv2;
+public SelectionVector2 getSv2() { return sv2; }
+
+public long getDataSize() { return dataSize; }
+
+@Override
+public int getRecordCount() {
+  if (sv2 != null) {
+return sv2.getCount();
+  } else {
+return super.getRecordCount();
+  }
+}
+
+@Override
+public int getNextIndex() {
+  int val = super.getNextIndex();
+  if (val == -1) {
+return val;
+  }
+  return 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001101#comment-17001101
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360364870
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
 ##
 @@ -18,140 +18,300 @@
 package org.apache.drill.exec.physical.impl.xsort;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.cache.VectorSerializer;
+import org.apache.drill.exec.cache.VectorSerializer.Writer;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.SchemaUtil;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 
-public class BatchGroup implements VectorAccessible, AutoCloseable {
+/**
+ * Represents a group of batches spilled to disk.
+ * 
+ * The batches are defined by a schema which can change over time. When the 
schema changes,
+ * all existing and new batches are coerced into the new schema. Provides a
+ * uniform way to iterate over records for one or more batches whether
+ * the batches are in memory or on disk.
+ * 
+ * The BatchGroup operates in two modes as given by the two
+ * subclasses:
+ * 
+ * Input mode (@link InputBatchGroup): Used to buffer in-memory batches
 
 Review comment:
   ```suggestion
* Input mode {@link InputBatch}: Used to buffer in-memory batches
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001109#comment-17001109
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360415588
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##
 @@ -17,250 +17,255 @@
  */
 package org.apache.drill.exec.physical.impl.xsort;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
-import org.apache.calcite.rel.RelFieldCollation.Direction;
-import org.apache.drill.common.AutoCloseables;
-import org.apache.drill.common.config.DrillConfig;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
+
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.ErrorCollector;
-import org.apache.drill.common.expression.ErrorCollectorImpl;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.data.Order.Ordering;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.compile.sig.GeneratorMapping;
-import org.apache.drill.exec.compile.sig.MappingSet;
-import org.apache.drill.exec.exception.ClassTransformationException;
-import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.ClassGenerator;
-import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
-import org.apache.drill.exec.expr.CodeGenerator;
-import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
-import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.physical.config.ExternalSort;
-import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
-import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
+import 
org.apache.drill.exec.physical.impl.validate.IteratorValidatorBatchIterator;
+import org.apache.drill.exec.physical.impl.xsort.SortImpl.SortResults;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.SchemaUtil;
-import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.testing.ControlsInjector;
 import org.apache.drill.exec.testing.ControlsInjectorFactory;
-import org.apache.drill.exec.vector.CopyUtil;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
-import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.collect.Iterators;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import com.sun.codemodel.JConditional;
-import com.sun.codemodel.JExpr;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * External sort batch: a sort batch which can spill to disk in
+ * order to operate within a defined memory footprint.
+ * 
+ * Basic Operation
+ * The operator has three key phases:
+ * 
+ * 
+ * The load phase in which batches are read from upstream.
+ * The merge phase in which spilled batches are 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001102#comment-17001102
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360371811
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
 ##
 @@ -18,140 +18,300 @@
 package org.apache.drill.exec.physical.impl.xsort;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.cache.VectorSerializer;
+import org.apache.drill.exec.cache.VectorSerializer.Writer;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.SchemaUtil;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 
-public class BatchGroup implements VectorAccessible, AutoCloseable {
+/**
+ * Represents a group of batches spilled to disk.
+ * 
+ * The batches are defined by a schema which can change over time. When the 
schema changes,
+ * all existing and new batches are coerced into the new schema. Provides a
+ * uniform way to iterate over records for one or more batches whether
+ * the batches are in memory or on disk.
+ * 
+ * The BatchGroup operates in two modes as given by the two
+ * subclasses:
+ * 
+ * Input mode (@link InputBatchGroup): Used to buffer in-memory batches
+ * prior to spilling.
+ * Spill mode (@link SpilledBatchGroup): Holds a "memento" to a set
+ * of batches written to disk. Acts as both a reader and writer for
+ * those batches.
+ */
+
+public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BatchGroup.class);
 
-  private VectorContainer currentContainer;
-  private SelectionVector2 sv2;
-  private int pointer = 0;
-  private FSDataInputStream inputStream;
-  private FSDataOutputStream outputStream;
-  private Path path;
-  private FileSystem fs;
-  private BufferAllocator allocator;
-  private int spilledBatches = 0;
-  private OperatorContext context;
-  private BatchSchema schema;
-
-  public BatchGroup(VectorContainer container, SelectionVector2 sv2, 
OperatorContext context) {
-this.sv2 = sv2;
-this.currentContainer = container;
-this.context = context;
-  }
+  /**
+   * The input batch group gathers batches buffered in memory before
+   * spilling. The structure of the data is:
+   * 
+   * Contains a single batch received from the upstream (input)
+   * operator.
+   * Associated selection vector that provides a sorted
+   * indirection to the values in the batch.
+   * 
+   */
 
-  public BatchGroup(VectorContainer container, FileSystem fs, String path, 
OperatorContext context) {
-currentContainer = container;
-this.fs = fs;
-this.path = new Path(path);
-this.allocator = context.getAllocator();
-this.context = context;
-  }
+  public static class InputBatch extends BatchGroup {
+private final SelectionVector2 sv2;
+private final long dataSize;
+
+public InputBatch(VectorContainer container, SelectionVector2 sv2, 
BufferAllocator allocator, long dataSize) {
+  super(container, allocator);
+  this.sv2 = sv2;
+  this.dataSize = dataSize;
+}
 
-  public SelectionVector2 getSv2() {
-return sv2;
+public SelectionVector2 getSv2() { return sv2; }
+
+public long getDataSize() { return dataSize; }
+
+@Override
+public int getRecordCount() {
+  if (sv2 != null) {
+return sv2.getCount();
+  } else {
+return super.getRecordCount();
+  }
+}
+
+@Override
+public int getNextIndex() {
+  int val = super.getNextIndex();
+  if (val == -1) {
+return val;
+  }
+  return 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001125#comment-17001125
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360525635
 
 

 ##
 File path: 
exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortEmitOutcome.java
 ##
 @@ -15,9 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.drill.exec.physical.impl.xsort.managed;
+package org.apache.drill.exec.physical.impl.xsort;
+
+import static junit.framework.TestCase.assertTrue;
 
 Review comment:
   please avoid imports reordering when not required, this may cause merge 
conflicts for other contributors.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001140#comment-17001140
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360512752
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
 ##
 @@ -89,15 +98,28 @@ public void setup(final FragmentContext context, final 
BufferAllocator allocator
* ExternalSortBatch to make decisions about whether to spill or not.
*
* @param recordCount
-   * @return The amount of memory MSorter needs for a given record count.
+   * @return
 
 Review comment:
   Please convert the
   
   >// We need 4 bytes (SV4) for each record.
   // The memory allocator will round this to the next
   // power of 2.
   
   into this ```@return``` description.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001133#comment-17001133
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360501525
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##
 @@ -286,540 +291,419 @@ public void buildSchema() throws SchemaChangeException 
{
 state = BatchState.DONE;
 break;
   default:
-break;
+throw new IllegalStateException("Unexpected iter outcome: " + outcome);
 }
   }
 
+  /**
+   * Process each request for a batch. The first request retrieves
+   * all the incoming batches and sorts them, optionally spilling to
+   * disk as needed. Subsequent calls retrieve the sorted results in
+   * fixed-size batches.
+   */
+
   @Override
   public IterOutcome innerNext() {
-if (schema != null) {
-  if (spillCount == 0) {
-return (getSelectionVector4().next()) ? IterOutcome.OK : 
IterOutcome.NONE;
-  } else {
-Stopwatch w = Stopwatch.createStarted();
-int count = copier.next(targetRecordCount);
-if (count > 0) {
-  long t = w.elapsed(TimeUnit.MICROSECONDS);
-  logger.debug("Took {} us to merge {} records", t, count);
-  container.setRecordCount(count);
-  return IterOutcome.OK;
-} else {
-  logger.debug("copier returned 0 records");
-  return IterOutcome.NONE;
-}
+switch (sortState) {
+case DONE:
+  return NONE;
+case START:
+  return load();
+case LOAD:
+  if (!this.retainInMemoryBatchesOnNone) {
+resetSortState();
   }
+  return (sortState == SortState.DONE) ? NONE : load();
+case DELIVER:
+  return nextOutputBatch();
+default:
+  throw new IllegalStateException("Unexpected sort state: " + sortState);
 }
+  }
 
-int totalCount = 0;
-int totalBatches = 0; // total number of batches received so far
+  private IterOutcome nextOutputBatch() {
+// Call next on outputSV4 for it's state to progress in parallel to 
resultsIterator state
+outputSV4.next();
 
-try{
-  container.clear();
-  outer: while (true) {
-IterOutcome upstream;
-if (first) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-} else {
-  upstream = next(incoming);
-}
-if (upstream == IterOutcome.OK && sorter == null) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-}
-switch (upstream) {
-case NONE:
-  if (first) {
-return upstream;
-  }
-  break outer;
-case NOT_YET:
-  throw new UnsupportedOperationException();
-case STOP:
-  return upstream;
-case OK_NEW_SCHEMA:
-case OK:
-  VectorContainer convertedBatch;
-  // only change in the case that the schema truly changes.  
Artificial schema changes are ignored.
-  if (upstream == IterOutcome.OK_NEW_SCHEMA && 
!incoming.getSchema().equals(schema)) {
-if (schema != null) {
-  if (unionTypeEnabled) {
-this.schema = SchemaUtil.mergeSchemas(schema, 
incoming.getSchema());
-  } else {
-throw new SchemaChangeException("Schema changes not supported 
in External Sort. Please enable Union type");
-  }
-} else {
-  schema = incoming.getSchema();
-}
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-for (BatchGroup b : batchGroups) {
-  b.setSchema(schema);
-}
-for (BatchGroup b : spilledBatchGroups) {
-  b.setSchema(schema);
-}
-this.sorter = createNewSorter(context, convertedBatch);
-  } else {
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-  }
-  if (first) {
-first = false;
-  }
-  if (convertedBatch.getRecordCount() == 0) {
-for (VectorWrapper w : convertedBatch) {
-  w.clear();
-}
-break;
-  }
-  SelectionVector2 sv2;
-  if (incoming.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
-sv2 = incoming.getSelectionVector2().clone();
-  } else {
-try {
-  sv2 = newSV2();
-} catch(InterruptedException e) {
-  return IterOutcome.STOP;
-} catch (OutOfMemoryException e) {
-  throw new OutOfMemoryException(e);
-}
-  }
+// But if results iterator next returns true that means it has 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001144#comment-17001144
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360521056
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
 ##
 @@ -17,94 +17,95 @@
  */
 package org.apache.drill.exec.physical.impl.xsort;
 
-import io.netty.buffer.DrillBuf;
-
 import java.io.IOException;
 import java.util.List;
 
 import javax.inject.Named;
 
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 
+import io.netty.buffer.DrillBuf;
 
 public abstract class PriorityQueueCopierTemplate implements 
PriorityQueueCopier {
+//  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(PriorityQueueCopierTemplate.class);
 
   private SelectionVector4 vector4;
   private List batchGroups;
   private VectorAccessible hyperBatch;
   private VectorAccessible outgoing;
   private int size;
-  private int queueSize;
+  private int queueSize = 0;
 
   @Override
-  public void setup(FragmentContext context, BufferAllocator allocator,
-  VectorAccessible hyperBatch, List batchGroups,
-  VectorAccessible outgoing) throws SchemaChangeException {
+  public void setup(BufferAllocator allocator, VectorAccessible hyperBatch, 
List batchGroups,
+VectorAccessible outgoing) throws SchemaChangeException {
 this.hyperBatch = hyperBatch;
 this.batchGroups = batchGroups;
 this.outgoing = outgoing;
 this.size = batchGroups.size();
 
 final DrillBuf drillBuf = allocator.buffer(4 * size);
 vector4 = new SelectionVector4(drillBuf, size, Character.MAX_VALUE);
-doSetup(context, hyperBatch, outgoing);
+doSetup(hyperBatch, outgoing);
 
 queueSize = 0;
 for (int i = 0; i < size; i++) {
-  vector4.set(i, i, batchGroups.get(i).getNextIndex());
-  siftUp();
-  queueSize++;
+  int index = batchGroups.get(i).getNextIndex();
+  vector4.set(i, i, index);
+  if (index > -1) {
+siftUp();
+queueSize++;
+  }
 }
   }
 
   @Override
   public int next(int targetRecordCount) {
-allocateVectors(targetRecordCount);
 for (int outgoingIndex = 0; outgoingIndex < targetRecordCount; 
outgoingIndex++) {
   if (queueSize == 0) {
 return 0;
   }
   int compoundIndex = vector4.get(0);
   int batch = compoundIndex >>> 16;
   assert batch < batchGroups.size() : String.format("batch: %d 
batchGroups: %d", batch, batchGroups.size());
-  doCopy(compoundIndex, outgoingIndex);
+  try {
 
 Review comment:
   It's better to put whole loop inside try-catch and remove this and 
```siftDown()``` one.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001115#comment-17001115
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360404337
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
 ##
 @@ -18,140 +18,300 @@
 package org.apache.drill.exec.physical.impl.xsort;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.cache.VectorSerializer;
+import org.apache.drill.exec.cache.VectorSerializer.Writer;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.SchemaUtil;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 
-public class BatchGroup implements VectorAccessible, AutoCloseable {
+/**
+ * Represents a group of batches spilled to disk.
+ * 
+ * The batches are defined by a schema which can change over time. When the 
schema changes,
+ * all existing and new batches are coerced into the new schema. Provides a
+ * uniform way to iterate over records for one or more batches whether
+ * the batches are in memory or on disk.
+ * 
+ * The BatchGroup operates in two modes as given by the two
+ * subclasses:
+ * 
+ * Input mode (@link InputBatchGroup): Used to buffer in-memory batches
+ * prior to spilling.
+ * Spill mode (@link SpilledBatchGroup): Holds a "memento" to a set
+ * of batches written to disk. Acts as both a reader and writer for
+ * those batches.
+ */
+
+public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BatchGroup.class);
 
-  private VectorContainer currentContainer;
-  private SelectionVector2 sv2;
-  private int pointer = 0;
-  private FSDataInputStream inputStream;
-  private FSDataOutputStream outputStream;
-  private Path path;
-  private FileSystem fs;
-  private BufferAllocator allocator;
-  private int spilledBatches = 0;
-  private OperatorContext context;
-  private BatchSchema schema;
-
-  public BatchGroup(VectorContainer container, SelectionVector2 sv2, 
OperatorContext context) {
-this.sv2 = sv2;
-this.currentContainer = container;
-this.context = context;
-  }
+  /**
+   * The input batch group gathers batches buffered in memory before
+   * spilling. The structure of the data is:
+   * 
+   * Contains a single batch received from the upstream (input)
+   * operator.
+   * Associated selection vector that provides a sorted
+   * indirection to the values in the batch.
+   * 
+   */
 
-  public BatchGroup(VectorContainer container, FileSystem fs, String path, 
OperatorContext context) {
-currentContainer = container;
-this.fs = fs;
-this.path = new Path(path);
-this.allocator = context.getAllocator();
-this.context = context;
-  }
+  public static class InputBatch extends BatchGroup {
+private final SelectionVector2 sv2;
+private final long dataSize;
+
+public InputBatch(VectorContainer container, SelectionVector2 sv2, 
BufferAllocator allocator, long dataSize) {
+  super(container, allocator);
+  this.sv2 = sv2;
+  this.dataSize = dataSize;
+}
 
-  public SelectionVector2 getSv2() {
-return sv2;
+public SelectionVector2 getSv2() { return sv2; }
+
+public long getDataSize() { return dataSize; }
+
+@Override
+public int getRecordCount() {
+  if (sv2 != null) {
+return sv2.getCount();
+  } else {
+return super.getRecordCount();
+  }
+}
+
+@Override
+public int getNextIndex() {
+  int val = super.getNextIndex();
+  if (val == -1) {
+return val;
+  }
+  return 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001108#comment-17001108
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360423445
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##
 @@ -17,250 +17,255 @@
  */
 package org.apache.drill.exec.physical.impl.xsort;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
-import org.apache.calcite.rel.RelFieldCollation.Direction;
-import org.apache.drill.common.AutoCloseables;
-import org.apache.drill.common.config.DrillConfig;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
+
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.ErrorCollector;
-import org.apache.drill.common.expression.ErrorCollectorImpl;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.data.Order.Ordering;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.compile.sig.GeneratorMapping;
-import org.apache.drill.exec.compile.sig.MappingSet;
-import org.apache.drill.exec.exception.ClassTransformationException;
-import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.ClassGenerator;
-import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
-import org.apache.drill.exec.expr.CodeGenerator;
-import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
-import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.physical.config.ExternalSort;
-import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
-import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
+import 
org.apache.drill.exec.physical.impl.validate.IteratorValidatorBatchIterator;
+import org.apache.drill.exec.physical.impl.xsort.SortImpl.SortResults;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.SchemaUtil;
-import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.testing.ControlsInjector;
 import org.apache.drill.exec.testing.ControlsInjectorFactory;
-import org.apache.drill.exec.vector.CopyUtil;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
-import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.collect.Iterators;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import com.sun.codemodel.JConditional;
-import com.sun.codemodel.JExpr;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * External sort batch: a sort batch which can spill to disk in
+ * order to operate within a defined memory footprint.
+ * 
+ * Basic Operation
+ * The operator has three key phases:
+ * 
+ * 
+ * The load phase in which batches are read from upstream.
+ * The merge phase in which spilled batches are 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001146#comment-17001146
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360398804
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
 ##
 @@ -18,140 +18,300 @@
 package org.apache.drill.exec.physical.impl.xsort;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.cache.VectorSerializer;
+import org.apache.drill.exec.cache.VectorSerializer.Writer;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.SchemaUtil;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 
-public class BatchGroup implements VectorAccessible, AutoCloseable {
+/**
+ * Represents a group of batches spilled to disk.
+ * 
+ * The batches are defined by a schema which can change over time. When the 
schema changes,
+ * all existing and new batches are coerced into the new schema. Provides a
+ * uniform way to iterate over records for one or more batches whether
+ * the batches are in memory or on disk.
+ * 
+ * The BatchGroup operates in two modes as given by the two
+ * subclasses:
+ * 
+ * Input mode (@link InputBatchGroup): Used to buffer in-memory batches
+ * prior to spilling.
+ * Spill mode (@link SpilledBatchGroup): Holds a "memento" to a set
+ * of batches written to disk. Acts as both a reader and writer for
+ * those batches.
+ */
+
+public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BatchGroup.class);
 
-  private VectorContainer currentContainer;
-  private SelectionVector2 sv2;
-  private int pointer = 0;
-  private FSDataInputStream inputStream;
-  private FSDataOutputStream outputStream;
-  private Path path;
-  private FileSystem fs;
-  private BufferAllocator allocator;
-  private int spilledBatches = 0;
-  private OperatorContext context;
-  private BatchSchema schema;
-
-  public BatchGroup(VectorContainer container, SelectionVector2 sv2, 
OperatorContext context) {
-this.sv2 = sv2;
-this.currentContainer = container;
-this.context = context;
-  }
+  /**
+   * The input batch group gathers batches buffered in memory before
+   * spilling. The structure of the data is:
+   * 
+   * Contains a single batch received from the upstream (input)
+   * operator.
+   * Associated selection vector that provides a sorted
+   * indirection to the values in the batch.
+   * 
+   */
 
-  public BatchGroup(VectorContainer container, FileSystem fs, String path, 
OperatorContext context) {
-currentContainer = container;
-this.fs = fs;
-this.path = new Path(path);
-this.allocator = context.getAllocator();
-this.context = context;
-  }
+  public static class InputBatch extends BatchGroup {
+private final SelectionVector2 sv2;
+private final long dataSize;
+
+public InputBatch(VectorContainer container, SelectionVector2 sv2, 
BufferAllocator allocator, long dataSize) {
+  super(container, allocator);
+  this.sv2 = sv2;
+  this.dataSize = dataSize;
+}
 
-  public SelectionVector2 getSv2() {
-return sv2;
+public SelectionVector2 getSv2() { return sv2; }
+
+public long getDataSize() { return dataSize; }
+
+@Override
+public int getRecordCount() {
+  if (sv2 != null) {
+return sv2.getCount();
+  } else {
+return super.getRecordCount();
+  }
+}
+
+@Override
+public int getNextIndex() {
+  int val = super.getNextIndex();
+  if (val == -1) {
+return val;
+  }
+  return 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001134#comment-17001134
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360511541
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
 ##
 @@ -17,48 +17,59 @@
  */
 package org.apache.drill.exec.physical.impl.xsort;
 
-import com.typesafe.config.ConfigException;
-import io.netty.buffer.DrillBuf;
-
 import java.util.Queue;
 
 import javax.inject.Named;
 
-import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BaseAllocator;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.vector.ValueVector;
 import org.apache.hadoop.util.IndexedSortable;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Queues;
 
+import io.netty.buffer.DrillBuf;
+
 public abstract class MSortTemplate implements MSorter, IndexedSortable {
 //  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(MSortTemplate.class);
 
   private SelectionVector4 vector4;
   private SelectionVector4 aux;
+  @SuppressWarnings("unused")
+  private long compares;
+
+  /**
+   * Holds offsets into the SV4 of the start of each batch
+   * (sorted run.)
+   */
+
   private Queue runStarts = Queues.newLinkedBlockingQueue();
   private FragmentContext context;
 
   /**
-   * This is only useful for debugging and/or unit testing. Controls the 
maximum size of batches exposed to downstream
+   * Controls the maximum size of batches exposed to downstream
*/
   private int desiredRecordBatchCount;
 
   @Override
-  public void setup(final FragmentContext context, final BufferAllocator 
allocator, final SelectionVector4 vector4, final VectorContainer hyperBatch) 
throws SchemaChangeException{
+  public void setup(final FragmentContext context, final BufferAllocator 
allocator, final SelectionVector4 vector4,
 
 Review comment:
   ```suggestion
 public void setup(FragmentContext context, BufferAllocator allocator, 
SelectionVector4 vector4,
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001112#comment-17001112
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360391462
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
 ##
 @@ -18,140 +18,300 @@
 package org.apache.drill.exec.physical.impl.xsort;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.cache.VectorSerializer;
+import org.apache.drill.exec.cache.VectorSerializer.Writer;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.SchemaUtil;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 
-public class BatchGroup implements VectorAccessible, AutoCloseable {
+/**
+ * Represents a group of batches spilled to disk.
+ * 
+ * The batches are defined by a schema which can change over time. When the 
schema changes,
+ * all existing and new batches are coerced into the new schema. Provides a
+ * uniform way to iterate over records for one or more batches whether
+ * the batches are in memory or on disk.
+ * 
+ * The BatchGroup operates in two modes as given by the two
+ * subclasses:
+ * 
+ * Input mode (@link InputBatchGroup): Used to buffer in-memory batches
+ * prior to spilling.
+ * Spill mode (@link SpilledBatchGroup): Holds a "memento" to a set
+ * of batches written to disk. Acts as both a reader and writer for
+ * those batches.
+ */
+
+public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BatchGroup.class);
 
-  private VectorContainer currentContainer;
-  private SelectionVector2 sv2;
-  private int pointer = 0;
-  private FSDataInputStream inputStream;
-  private FSDataOutputStream outputStream;
-  private Path path;
-  private FileSystem fs;
-  private BufferAllocator allocator;
-  private int spilledBatches = 0;
-  private OperatorContext context;
-  private BatchSchema schema;
-
-  public BatchGroup(VectorContainer container, SelectionVector2 sv2, 
OperatorContext context) {
-this.sv2 = sv2;
-this.currentContainer = container;
-this.context = context;
-  }
+  /**
+   * The input batch group gathers batches buffered in memory before
+   * spilling. The structure of the data is:
+   * 
+   * Contains a single batch received from the upstream (input)
+   * operator.
+   * Associated selection vector that provides a sorted
+   * indirection to the values in the batch.
+   * 
+   */
 
-  public BatchGroup(VectorContainer container, FileSystem fs, String path, 
OperatorContext context) {
-currentContainer = container;
-this.fs = fs;
-this.path = new Path(path);
-this.allocator = context.getAllocator();
-this.context = context;
-  }
+  public static class InputBatch extends BatchGroup {
+private final SelectionVector2 sv2;
+private final long dataSize;
+
+public InputBatch(VectorContainer container, SelectionVector2 sv2, 
BufferAllocator allocator, long dataSize) {
+  super(container, allocator);
+  this.sv2 = sv2;
+  this.dataSize = dataSize;
+}
 
-  public SelectionVector2 getSv2() {
-return sv2;
+public SelectionVector2 getSv2() { return sv2; }
+
+public long getDataSize() { return dataSize; }
+
+@Override
+public int getRecordCount() {
+  if (sv2 != null) {
+return sv2.getCount();
+  } else {
+return super.getRecordCount();
+  }
+}
+
+@Override
+public int getNextIndex() {
+  int val = super.getNextIndex();
+  if (val == -1) {
+return val;
+  }
+  return 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001129#comment-17001129
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360525569
 
 

 ##
 File path: 
exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortImpl.java
 ##
 @@ -33,32 +33,31 @@
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.physical.impl.spill.SpillSet;
-import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults;
+import org.apache.drill.exec.physical.impl.xsort.SortImpl.SortResults;
 
 Review comment:
   please avoid imports reordering when not required, this may cause merge 
conflicts for other contributors.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001131#comment-17001131
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360508494
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
 ##
 @@ -89,15 +98,28 @@ public void setup(final FragmentContext context, final 
BufferAllocator allocator
* ExternalSortBatch to make decisions about whether to spill or not.
*
* @param recordCount
-   * @return The amount of memory MSorter needs for a given record count.
+   * @return
*/
   public static long memoryNeeded(final int recordCount) {
-// We need 4 bytes (SV4) for each record, power of 2 rounded.
+// We need 4 bytes (SV4) for each record.
+// The memory allocator will round this to the next
+// power of 2.
 
 return BaseAllocator.nextPowerOfTwo(recordCount * 4);
   }
 
-  private int merge(final int leftStart, final int rightStart, final int 
rightEnd, final int outStart) {
+  /**
+   * Given two regions within the selection vector 4 (a left and a right), 
merge
+   * the two regions to produce a combined output region in the auxiliary
+   * selection vector.
+   *
+   * @param leftStart
+   * @param rightStart
+   * @param rightEnd
+   * @param outStart
+   * @return
 
 Review comment:
   please finish the javadoc
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001145#comment-17001145
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360427244
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##
 @@ -286,540 +291,419 @@ public void buildSchema() throws SchemaChangeException 
{
 state = BatchState.DONE;
 break;
   default:
-break;
+throw new IllegalStateException("Unexpected iter outcome: " + outcome);
 }
   }
 
+  /**
+   * Process each request for a batch. The first request retrieves
+   * all the incoming batches and sorts them, optionally spilling to
+   * disk as needed. Subsequent calls retrieve the sorted results in
+   * fixed-size batches.
+   */
+
 
 Review comment:
   ```suggestion
   
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001103#comment-17001103
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360397128
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
 ##
 @@ -221,4 +363,19 @@ public SelectionVector4 getSelectionVector4() {
 throw new UnsupportedOperationException();
   }
 
+  public static void closeAll(Collection groups) {
 
 Review comment:
   Please remove the method and use one from ```AutoCloseables```. Note the 
util methods in `AutoCloseables` might be extended to not throw checked 
exception but accept function or consumer which will convert to runtime.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001113#comment-17001113
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360394111
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
 ##
 @@ -18,140 +18,300 @@
 package org.apache.drill.exec.physical.impl.xsort;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.cache.VectorSerializer;
+import org.apache.drill.exec.cache.VectorSerializer.Writer;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.SchemaUtil;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 
-public class BatchGroup implements VectorAccessible, AutoCloseable {
+/**
+ * Represents a group of batches spilled to disk.
+ * 
+ * The batches are defined by a schema which can change over time. When the 
schema changes,
+ * all existing and new batches are coerced into the new schema. Provides a
+ * uniform way to iterate over records for one or more batches whether
+ * the batches are in memory or on disk.
+ * 
+ * The BatchGroup operates in two modes as given by the two
+ * subclasses:
+ * 
+ * Input mode (@link InputBatchGroup): Used to buffer in-memory batches
+ * prior to spilling.
+ * Spill mode (@link SpilledBatchGroup): Holds a "memento" to a set
+ * of batches written to disk. Acts as both a reader and writer for
+ * those batches.
+ */
+
+public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BatchGroup.class);
 
-  private VectorContainer currentContainer;
-  private SelectionVector2 sv2;
-  private int pointer = 0;
-  private FSDataInputStream inputStream;
-  private FSDataOutputStream outputStream;
-  private Path path;
-  private FileSystem fs;
-  private BufferAllocator allocator;
-  private int spilledBatches = 0;
-  private OperatorContext context;
-  private BatchSchema schema;
-
-  public BatchGroup(VectorContainer container, SelectionVector2 sv2, 
OperatorContext context) {
-this.sv2 = sv2;
-this.currentContainer = container;
-this.context = context;
-  }
+  /**
+   * The input batch group gathers batches buffered in memory before
+   * spilling. The structure of the data is:
+   * 
+   * Contains a single batch received from the upstream (input)
+   * operator.
+   * Associated selection vector that provides a sorted
+   * indirection to the values in the batch.
+   * 
+   */
 
-  public BatchGroup(VectorContainer container, FileSystem fs, String path, 
OperatorContext context) {
-currentContainer = container;
-this.fs = fs;
-this.path = new Path(path);
-this.allocator = context.getAllocator();
-this.context = context;
-  }
+  public static class InputBatch extends BatchGroup {
+private final SelectionVector2 sv2;
+private final long dataSize;
+
+public InputBatch(VectorContainer container, SelectionVector2 sv2, 
BufferAllocator allocator, long dataSize) {
+  super(container, allocator);
+  this.sv2 = sv2;
+  this.dataSize = dataSize;
+}
 
-  public SelectionVector2 getSv2() {
-return sv2;
+public SelectionVector2 getSv2() { return sv2; }
+
+public long getDataSize() { return dataSize; }
+
+@Override
+public int getRecordCount() {
+  if (sv2 != null) {
+return sv2.getCount();
+  } else {
+return super.getRecordCount();
+  }
+}
+
+@Override
+public int getNextIndex() {
+  int val = super.getNextIndex();
+  if (val == -1) {
+return val;
+  }
+  return 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001142#comment-17001142
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360439104
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##
 @@ -286,540 +291,419 @@ public void buildSchema() throws SchemaChangeException 
{
 state = BatchState.DONE;
 break;
   default:
-break;
+throw new IllegalStateException("Unexpected iter outcome: " + outcome);
 }
   }
 
+  /**
+   * Process each request for a batch. The first request retrieves
+   * all the incoming batches and sorts them, optionally spilling to
+   * disk as needed. Subsequent calls retrieve the sorted results in
+   * fixed-size batches.
+   */
+
   @Override
   public IterOutcome innerNext() {
-if (schema != null) {
-  if (spillCount == 0) {
-return (getSelectionVector4().next()) ? IterOutcome.OK : 
IterOutcome.NONE;
-  } else {
-Stopwatch w = Stopwatch.createStarted();
-int count = copier.next(targetRecordCount);
-if (count > 0) {
-  long t = w.elapsed(TimeUnit.MICROSECONDS);
-  logger.debug("Took {} us to merge {} records", t, count);
-  container.setRecordCount(count);
-  return IterOutcome.OK;
-} else {
-  logger.debug("copier returned 0 records");
-  return IterOutcome.NONE;
-}
+switch (sortState) {
+case DONE:
+  return NONE;
+case START:
+  return load();
+case LOAD:
+  if (!this.retainInMemoryBatchesOnNone) {
+resetSortState();
   }
+  return (sortState == SortState.DONE) ? NONE : load();
+case DELIVER:
+  return nextOutputBatch();
+default:
+  throw new IllegalStateException("Unexpected sort state: " + sortState);
 }
+  }
 
-int totalCount = 0;
-int totalBatches = 0; // total number of batches received so far
+  private IterOutcome nextOutputBatch() {
+// Call next on outputSV4 for it's state to progress in parallel to 
resultsIterator state
+outputSV4.next();
 
-try{
-  container.clear();
-  outer: while (true) {
-IterOutcome upstream;
-if (first) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-} else {
-  upstream = next(incoming);
-}
-if (upstream == IterOutcome.OK && sorter == null) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-}
-switch (upstream) {
-case NONE:
-  if (first) {
-return upstream;
-  }
-  break outer;
-case NOT_YET:
-  throw new UnsupportedOperationException();
-case STOP:
-  return upstream;
-case OK_NEW_SCHEMA:
-case OK:
-  VectorContainer convertedBatch;
-  // only change in the case that the schema truly changes.  
Artificial schema changes are ignored.
-  if (upstream == IterOutcome.OK_NEW_SCHEMA && 
!incoming.getSchema().equals(schema)) {
-if (schema != null) {
-  if (unionTypeEnabled) {
-this.schema = SchemaUtil.mergeSchemas(schema, 
incoming.getSchema());
-  } else {
-throw new SchemaChangeException("Schema changes not supported 
in External Sort. Please enable Union type");
-  }
-} else {
-  schema = incoming.getSchema();
-}
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-for (BatchGroup b : batchGroups) {
-  b.setSchema(schema);
-}
-for (BatchGroup b : spilledBatchGroups) {
-  b.setSchema(schema);
-}
-this.sorter = createNewSorter(context, convertedBatch);
-  } else {
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-  }
-  if (first) {
-first = false;
-  }
-  if (convertedBatch.getRecordCount() == 0) {
-for (VectorWrapper w : convertedBatch) {
-  w.clear();
-}
-break;
-  }
-  SelectionVector2 sv2;
-  if (incoming.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
-sv2 = incoming.getSelectionVector2().clone();
-  } else {
-try {
-  sv2 = newSV2();
-} catch(InterruptedException e) {
-  return IterOutcome.STOP;
-} catch (OutOfMemoryException e) {
-  throw new OutOfMemoryException(e);
-}
-  }
+// But if results iterator next returns true that means it has 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001121#comment-17001121
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360494754
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##
 @@ -286,540 +291,419 @@ public void buildSchema() throws SchemaChangeException 
{
 state = BatchState.DONE;
 break;
   default:
-break;
+throw new IllegalStateException("Unexpected iter outcome: " + outcome);
 }
   }
 
+  /**
+   * Process each request for a batch. The first request retrieves
+   * all the incoming batches and sorts them, optionally spilling to
+   * disk as needed. Subsequent calls retrieve the sorted results in
+   * fixed-size batches.
+   */
+
   @Override
   public IterOutcome innerNext() {
-if (schema != null) {
-  if (spillCount == 0) {
-return (getSelectionVector4().next()) ? IterOutcome.OK : 
IterOutcome.NONE;
-  } else {
-Stopwatch w = Stopwatch.createStarted();
-int count = copier.next(targetRecordCount);
-if (count > 0) {
-  long t = w.elapsed(TimeUnit.MICROSECONDS);
-  logger.debug("Took {} us to merge {} records", t, count);
-  container.setRecordCount(count);
-  return IterOutcome.OK;
-} else {
-  logger.debug("copier returned 0 records");
-  return IterOutcome.NONE;
-}
+switch (sortState) {
+case DONE:
+  return NONE;
+case START:
+  return load();
+case LOAD:
+  if (!this.retainInMemoryBatchesOnNone) {
+resetSortState();
   }
+  return (sortState == SortState.DONE) ? NONE : load();
+case DELIVER:
+  return nextOutputBatch();
+default:
+  throw new IllegalStateException("Unexpected sort state: " + sortState);
 }
+  }
 
-int totalCount = 0;
-int totalBatches = 0; // total number of batches received so far
+  private IterOutcome nextOutputBatch() {
+// Call next on outputSV4 for it's state to progress in parallel to 
resultsIterator state
+outputSV4.next();
 
-try{
-  container.clear();
-  outer: while (true) {
-IterOutcome upstream;
-if (first) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-} else {
-  upstream = next(incoming);
-}
-if (upstream == IterOutcome.OK && sorter == null) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-}
-switch (upstream) {
-case NONE:
-  if (first) {
-return upstream;
-  }
-  break outer;
-case NOT_YET:
-  throw new UnsupportedOperationException();
-case STOP:
-  return upstream;
-case OK_NEW_SCHEMA:
-case OK:
-  VectorContainer convertedBatch;
-  // only change in the case that the schema truly changes.  
Artificial schema changes are ignored.
-  if (upstream == IterOutcome.OK_NEW_SCHEMA && 
!incoming.getSchema().equals(schema)) {
-if (schema != null) {
-  if (unionTypeEnabled) {
-this.schema = SchemaUtil.mergeSchemas(schema, 
incoming.getSchema());
-  } else {
-throw new SchemaChangeException("Schema changes not supported 
in External Sort. Please enable Union type");
-  }
-} else {
-  schema = incoming.getSchema();
-}
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-for (BatchGroup b : batchGroups) {
-  b.setSchema(schema);
-}
-for (BatchGroup b : spilledBatchGroups) {
-  b.setSchema(schema);
-}
-this.sorter = createNewSorter(context, convertedBatch);
-  } else {
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-  }
-  if (first) {
-first = false;
-  }
-  if (convertedBatch.getRecordCount() == 0) {
-for (VectorWrapper w : convertedBatch) {
-  w.clear();
-}
-break;
-  }
-  SelectionVector2 sv2;
-  if (incoming.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
-sv2 = incoming.getSelectionVector2().clone();
-  } else {
-try {
-  sv2 = newSV2();
-} catch(InterruptedException e) {
-  return IterOutcome.STOP;
-} catch (OutOfMemoryException e) {
-  throw new OutOfMemoryException(e);
-}
-  }
+// But if results iterator next returns true that means it has 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001139#comment-17001139
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360516350
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSorter.java
 ##
 @@ -24,11 +24,15 @@
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 
-// TODO:  Doc.:  What's an MSorter?  A sorter for merge join?  something else?
-// (What's the "M" part?  Actually, rename interface to clearer.
+/**
+ * In-memory sorter. Takes a list of batches as input, produces a selection
+ * vector 4, with sorted results, as output.
+ */
+
 public interface MSorter {
-  public void setup(FragmentContext context, BufferAllocator allocator, 
SelectionVector4 vector4, VectorContainer hyperBatch) throws 
SchemaChangeException;
-  public void sort(VectorContainer container);
+  public void setup(FragmentContext context, BufferAllocator allocator, 
SelectionVector4 vector4,
 
 Review comment:
   Please remove redundant ```public``` and ```static``` modifiers and move 
constant ```TEMPLATE_DEFINITION``` to top, above methods. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001105#comment-17001105
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360365959
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
 ##
 @@ -18,140 +18,300 @@
 package org.apache.drill.exec.physical.impl.xsort;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.cache.VectorSerializer;
+import org.apache.drill.exec.cache.VectorSerializer.Writer;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.SchemaUtil;
 import org.apache.drill.exec.record.TransferPair;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 
-public class BatchGroup implements VectorAccessible, AutoCloseable {
+/**
+ * Represents a group of batches spilled to disk.
+ * 
+ * The batches are defined by a schema which can change over time. When the 
schema changes,
+ * all existing and new batches are coerced into the new schema. Provides a
+ * uniform way to iterate over records for one or more batches whether
+ * the batches are in memory or on disk.
+ * 
+ * The BatchGroup operates in two modes as given by the two
+ * subclasses:
+ * 
+ * Input mode (@link InputBatchGroup): Used to buffer in-memory batches
+ * prior to spilling.
+ * Spill mode (@link SpilledBatchGroup): Holds a "memento" to a set
+ * of batches written to disk. Acts as both a reader and writer for
+ * those batches.
+ */
+
+public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BatchGroup.class);
 
-  private VectorContainer currentContainer;
-  private SelectionVector2 sv2;
-  private int pointer = 0;
-  private FSDataInputStream inputStream;
-  private FSDataOutputStream outputStream;
-  private Path path;
-  private FileSystem fs;
-  private BufferAllocator allocator;
-  private int spilledBatches = 0;
-  private OperatorContext context;
-  private BatchSchema schema;
-
-  public BatchGroup(VectorContainer container, SelectionVector2 sv2, 
OperatorContext context) {
-this.sv2 = sv2;
-this.currentContainer = container;
-this.context = context;
-  }
+  /**
+   * The input batch group gathers batches buffered in memory before
+   * spilling. The structure of the data is:
+   * 
+   * Contains a single batch received from the upstream (input)
+   * operator.
+   * Associated selection vector that provides a sorted
+   * indirection to the values in the batch.
+   * 
+   */
 
-  public BatchGroup(VectorContainer container, FileSystem fs, String path, 
OperatorContext context) {
-currentContainer = container;
-this.fs = fs;
-this.path = new Path(path);
-this.allocator = context.getAllocator();
-this.context = context;
-  }
+  public static class InputBatch extends BatchGroup {
+private final SelectionVector2 sv2;
+private final long dataSize;
+
+public InputBatch(VectorContainer container, SelectionVector2 sv2, 
BufferAllocator allocator, long dataSize) {
+  super(container, allocator);
+  this.sv2 = sv2;
+  this.dataSize = dataSize;
+}
 
-  public SelectionVector2 getSv2() {
-return sv2;
+public SelectionVector2 getSv2() { return sv2; }
+
+public long getDataSize() { return dataSize; }
+
+@Override
+public int getRecordCount() {
+  if (sv2 != null) {
+return sv2.getCount();
+  } else {
+return super.getRecordCount();
+  }
+}
+
+@Override
+public int getNextIndex() {
+  int val = super.getNextIndex();
+  if (val == -1) {
+return val;
+  }
+  return 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001114#comment-17001114
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360443790
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##
 @@ -286,540 +291,419 @@ public void buildSchema() throws SchemaChangeException 
{
 state = BatchState.DONE;
 break;
   default:
-break;
+throw new IllegalStateException("Unexpected iter outcome: " + outcome);
 }
   }
 
+  /**
+   * Process each request for a batch. The first request retrieves
+   * all the incoming batches and sorts them, optionally spilling to
+   * disk as needed. Subsequent calls retrieve the sorted results in
+   * fixed-size batches.
+   */
+
   @Override
   public IterOutcome innerNext() {
-if (schema != null) {
-  if (spillCount == 0) {
-return (getSelectionVector4().next()) ? IterOutcome.OK : 
IterOutcome.NONE;
-  } else {
-Stopwatch w = Stopwatch.createStarted();
-int count = copier.next(targetRecordCount);
-if (count > 0) {
-  long t = w.elapsed(TimeUnit.MICROSECONDS);
-  logger.debug("Took {} us to merge {} records", t, count);
-  container.setRecordCount(count);
-  return IterOutcome.OK;
-} else {
-  logger.debug("copier returned 0 records");
-  return IterOutcome.NONE;
-}
+switch (sortState) {
+case DONE:
+  return NONE;
+case START:
+  return load();
+case LOAD:
+  if (!this.retainInMemoryBatchesOnNone) {
+resetSortState();
   }
+  return (sortState == SortState.DONE) ? NONE : load();
+case DELIVER:
+  return nextOutputBatch();
+default:
+  throw new IllegalStateException("Unexpected sort state: " + sortState);
 }
+  }
 
-int totalCount = 0;
-int totalBatches = 0; // total number of batches received so far
+  private IterOutcome nextOutputBatch() {
+// Call next on outputSV4 for it's state to progress in parallel to 
resultsIterator state
+outputSV4.next();
 
-try{
-  container.clear();
-  outer: while (true) {
-IterOutcome upstream;
-if (first) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-} else {
-  upstream = next(incoming);
-}
-if (upstream == IterOutcome.OK && sorter == null) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-}
-switch (upstream) {
-case NONE:
-  if (first) {
-return upstream;
-  }
-  break outer;
-case NOT_YET:
-  throw new UnsupportedOperationException();
-case STOP:
-  return upstream;
-case OK_NEW_SCHEMA:
-case OK:
-  VectorContainer convertedBatch;
-  // only change in the case that the schema truly changes.  
Artificial schema changes are ignored.
-  if (upstream == IterOutcome.OK_NEW_SCHEMA && 
!incoming.getSchema().equals(schema)) {
-if (schema != null) {
-  if (unionTypeEnabled) {
-this.schema = SchemaUtil.mergeSchemas(schema, 
incoming.getSchema());
-  } else {
-throw new SchemaChangeException("Schema changes not supported 
in External Sort. Please enable Union type");
-  }
-} else {
-  schema = incoming.getSchema();
-}
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-for (BatchGroup b : batchGroups) {
-  b.setSchema(schema);
-}
-for (BatchGroup b : spilledBatchGroups) {
-  b.setSchema(schema);
-}
-this.sorter = createNewSorter(context, convertedBatch);
-  } else {
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-  }
-  if (first) {
-first = false;
-  }
-  if (convertedBatch.getRecordCount() == 0) {
-for (VectorWrapper w : convertedBatch) {
-  w.clear();
-}
-break;
-  }
-  SelectionVector2 sv2;
-  if (incoming.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
-sv2 = incoming.getSelectionVector2().clone();
-  } else {
-try {
-  sv2 = newSV2();
-} catch(InterruptedException e) {
-  return IterOutcome.STOP;
-} catch (OutOfMemoryException e) {
-  throw new OutOfMemoryException(e);
-}
-  }
+// But if results iterator next returns true that means it has 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001132#comment-17001132
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360425824
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##
 @@ -17,250 +17,255 @@
  */
 package org.apache.drill.exec.physical.impl.xsort;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
-import org.apache.calcite.rel.RelFieldCollation.Direction;
-import org.apache.drill.common.AutoCloseables;
-import org.apache.drill.common.config.DrillConfig;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static 
org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
+
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.ErrorCollector;
-import org.apache.drill.common.expression.ErrorCollectorImpl;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.data.Order.Ordering;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.compile.sig.GeneratorMapping;
-import org.apache.drill.exec.compile.sig.MappingSet;
-import org.apache.drill.exec.exception.ClassTransformationException;
-import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.ClassGenerator;
-import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
-import org.apache.drill.exec.expr.CodeGenerator;
-import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
-import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.physical.config.ExternalSort;
-import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
-import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
+import 
org.apache.drill.exec.physical.impl.validate.IteratorValidatorBatchIterator;
+import org.apache.drill.exec.physical.impl.xsort.SortImpl.SortResults;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.SchemaUtil;
-import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.testing.ControlsInjector;
 import org.apache.drill.exec.testing.ControlsInjectorFactory;
-import org.apache.drill.exec.vector.CopyUtil;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
-import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.collect.Iterators;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import com.sun.codemodel.JConditional;
-import com.sun.codemodel.JExpr;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * External sort batch: a sort batch which can spill to disk in
+ * order to operate within a defined memory footprint.
+ * 
+ * Basic Operation
+ * The operator has three key phases:
+ * 
+ * 
+ * The load phase in which batches are read from upstream.
+ * The merge phase in which spilled batches are 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001147#comment-17001147
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360485448
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##
 @@ -286,540 +291,419 @@ public void buildSchema() throws SchemaChangeException 
{
 state = BatchState.DONE;
 break;
   default:
-break;
+throw new IllegalStateException("Unexpected iter outcome: " + outcome);
 }
   }
 
+  /**
+   * Process each request for a batch. The first request retrieves
+   * all the incoming batches and sorts them, optionally spilling to
+   * disk as needed. Subsequent calls retrieve the sorted results in
+   * fixed-size batches.
+   */
+
   @Override
   public IterOutcome innerNext() {
-if (schema != null) {
-  if (spillCount == 0) {
-return (getSelectionVector4().next()) ? IterOutcome.OK : 
IterOutcome.NONE;
-  } else {
-Stopwatch w = Stopwatch.createStarted();
-int count = copier.next(targetRecordCount);
-if (count > 0) {
-  long t = w.elapsed(TimeUnit.MICROSECONDS);
-  logger.debug("Took {} us to merge {} records", t, count);
-  container.setRecordCount(count);
-  return IterOutcome.OK;
-} else {
-  logger.debug("copier returned 0 records");
-  return IterOutcome.NONE;
-}
+switch (sortState) {
+case DONE:
+  return NONE;
+case START:
+  return load();
+case LOAD:
+  if (!this.retainInMemoryBatchesOnNone) {
+resetSortState();
   }
+  return (sortState == SortState.DONE) ? NONE : load();
+case DELIVER:
+  return nextOutputBatch();
+default:
+  throw new IllegalStateException("Unexpected sort state: " + sortState);
 }
+  }
 
-int totalCount = 0;
-int totalBatches = 0; // total number of batches received so far
+  private IterOutcome nextOutputBatch() {
+// Call next on outputSV4 for it's state to progress in parallel to 
resultsIterator state
+outputSV4.next();
 
-try{
-  container.clear();
-  outer: while (true) {
-IterOutcome upstream;
-if (first) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-} else {
-  upstream = next(incoming);
-}
-if (upstream == IterOutcome.OK && sorter == null) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-}
-switch (upstream) {
-case NONE:
-  if (first) {
-return upstream;
-  }
-  break outer;
-case NOT_YET:
-  throw new UnsupportedOperationException();
-case STOP:
-  return upstream;
-case OK_NEW_SCHEMA:
-case OK:
-  VectorContainer convertedBatch;
-  // only change in the case that the schema truly changes.  
Artificial schema changes are ignored.
-  if (upstream == IterOutcome.OK_NEW_SCHEMA && 
!incoming.getSchema().equals(schema)) {
-if (schema != null) {
-  if (unionTypeEnabled) {
-this.schema = SchemaUtil.mergeSchemas(schema, 
incoming.getSchema());
-  } else {
-throw new SchemaChangeException("Schema changes not supported 
in External Sort. Please enable Union type");
-  }
-} else {
-  schema = incoming.getSchema();
-}
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-for (BatchGroup b : batchGroups) {
-  b.setSchema(schema);
-}
-for (BatchGroup b : spilledBatchGroups) {
-  b.setSchema(schema);
-}
-this.sorter = createNewSorter(context, convertedBatch);
-  } else {
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-  }
-  if (first) {
-first = false;
-  }
-  if (convertedBatch.getRecordCount() == 0) {
-for (VectorWrapper w : convertedBatch) {
-  w.clear();
-}
-break;
-  }
-  SelectionVector2 sv2;
-  if (incoming.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
-sv2 = incoming.getSelectionVector2().clone();
-  } else {
-try {
-  sv2 = newSV2();
-} catch(InterruptedException e) {
-  return IterOutcome.STOP;
-} catch (OutOfMemoryException e) {
-  throw new OutOfMemoryException(e);
-}
-  }
+// But if results iterator next returns true that means it has 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001110#comment-17001110
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360475113
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
 ##
 @@ -286,540 +291,419 @@ public void buildSchema() throws SchemaChangeException 
{
 state = BatchState.DONE;
 break;
   default:
-break;
+throw new IllegalStateException("Unexpected iter outcome: " + outcome);
 }
   }
 
+  /**
+   * Process each request for a batch. The first request retrieves
+   * all the incoming batches and sorts them, optionally spilling to
+   * disk as needed. Subsequent calls retrieve the sorted results in
+   * fixed-size batches.
+   */
+
   @Override
   public IterOutcome innerNext() {
-if (schema != null) {
-  if (spillCount == 0) {
-return (getSelectionVector4().next()) ? IterOutcome.OK : 
IterOutcome.NONE;
-  } else {
-Stopwatch w = Stopwatch.createStarted();
-int count = copier.next(targetRecordCount);
-if (count > 0) {
-  long t = w.elapsed(TimeUnit.MICROSECONDS);
-  logger.debug("Took {} us to merge {} records", t, count);
-  container.setRecordCount(count);
-  return IterOutcome.OK;
-} else {
-  logger.debug("copier returned 0 records");
-  return IterOutcome.NONE;
-}
+switch (sortState) {
+case DONE:
+  return NONE;
+case START:
+  return load();
+case LOAD:
+  if (!this.retainInMemoryBatchesOnNone) {
+resetSortState();
   }
+  return (sortState == SortState.DONE) ? NONE : load();
+case DELIVER:
+  return nextOutputBatch();
+default:
+  throw new IllegalStateException("Unexpected sort state: " + sortState);
 }
+  }
 
-int totalCount = 0;
-int totalBatches = 0; // total number of batches received so far
+  private IterOutcome nextOutputBatch() {
+// Call next on outputSV4 for it's state to progress in parallel to 
resultsIterator state
+outputSV4.next();
 
-try{
-  container.clear();
-  outer: while (true) {
-IterOutcome upstream;
-if (first) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-} else {
-  upstream = next(incoming);
-}
-if (upstream == IterOutcome.OK && sorter == null) {
-  upstream = IterOutcome.OK_NEW_SCHEMA;
-}
-switch (upstream) {
-case NONE:
-  if (first) {
-return upstream;
-  }
-  break outer;
-case NOT_YET:
-  throw new UnsupportedOperationException();
-case STOP:
-  return upstream;
-case OK_NEW_SCHEMA:
-case OK:
-  VectorContainer convertedBatch;
-  // only change in the case that the schema truly changes.  
Artificial schema changes are ignored.
-  if (upstream == IterOutcome.OK_NEW_SCHEMA && 
!incoming.getSchema().equals(schema)) {
-if (schema != null) {
-  if (unionTypeEnabled) {
-this.schema = SchemaUtil.mergeSchemas(schema, 
incoming.getSchema());
-  } else {
-throw new SchemaChangeException("Schema changes not supported 
in External Sort. Please enable Union type");
-  }
-} else {
-  schema = incoming.getSchema();
-}
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-for (BatchGroup b : batchGroups) {
-  b.setSchema(schema);
-}
-for (BatchGroup b : spilledBatchGroups) {
-  b.setSchema(schema);
-}
-this.sorter = createNewSorter(context, convertedBatch);
-  } else {
-convertedBatch = SchemaUtil.coerceContainer(incoming, schema, 
oContext);
-  }
-  if (first) {
-first = false;
-  }
-  if (convertedBatch.getRecordCount() == 0) {
-for (VectorWrapper w : convertedBatch) {
-  w.clear();
-}
-break;
-  }
-  SelectionVector2 sv2;
-  if (incoming.getSchema().getSelectionVectorMode() == 
BatchSchema.SelectionVectorMode.TWO_BYTE) {
-sv2 = incoming.getSelectionVector2().clone();
-  } else {
-try {
-  sv2 = newSV2();
-} catch(InterruptedException e) {
-  return IterOutcome.STOP;
-} catch (OutOfMemoryException e) {
-  throw new OutOfMemoryException(e);
-}
-  }
+// But if results iterator next returns true that means it has 

[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001137#comment-17001137
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360507467
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
 ##
 @@ -17,48 +17,59 @@
  */
 package org.apache.drill.exec.physical.impl.xsort;
 
-import com.typesafe.config.ConfigException;
-import io.netty.buffer.DrillBuf;
-
 import java.util.Queue;
 
 import javax.inject.Named;
 
-import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BaseAllocator;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.vector.ValueVector;
 import org.apache.hadoop.util.IndexedSortable;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Queues;
 
+import io.netty.buffer.DrillBuf;
+
 public abstract class MSortTemplate implements MSorter, IndexedSortable {
 //  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(MSortTemplate.class);
 
   private SelectionVector4 vector4;
   private SelectionVector4 aux;
+  @SuppressWarnings("unused")
+  private long compares;
+
+  /**
+   * Holds offsets into the SV4 of the start of each batch
+   * (sorted run.)
+   */
+
 
 Review comment:
   ```suggestion
   
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17001124#comment-17001124
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

ihuzenko commented on pull request #1929: DRILL-6832: Remove the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929#discussion_r360518409
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
 ##
 @@ -17,94 +17,95 @@
  */
 package org.apache.drill.exec.physical.impl.xsort;
 
-import io.netty.buffer.DrillBuf;
-
 import java.io.IOException;
 import java.util.List;
 
 import javax.inject.Named;
 
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorAccessibleUtilities;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 
+import io.netty.buffer.DrillBuf;
 
 public abstract class PriorityQueueCopierTemplate implements 
PriorityQueueCopier {
+//  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(PriorityQueueCopierTemplate.class);
 
   private SelectionVector4 vector4;
   private List batchGroups;
   private VectorAccessible hyperBatch;
   private VectorAccessible outgoing;
   private int size;
-  private int queueSize;
+  private int queueSize = 0;
 
 Review comment:
   ```suggestion
 private int queueSize;
   ```
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (DRILL-6832) Remove old "unmanaged" sort implementation

2019-12-12 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16995293#comment-16995293
 ] 

ASF GitHub Bot commented on DRILL-6832:
---

paul-rogers commented on pull request #1929: DRILL-6832: Removes the old 
"unmanaged" external sort
URL: https://github.com/apache/drill/pull/1929
 
 
   When the "managed" external sort was implemented a couple of years back, we 
retained the original "unmanaged" version out of an abundance of caution. The 
new version is now battle tested and it is time to retire the original one.
   
   Tests: reran all tests. Adjusted one test where the metrics names for the 
two sort versions got out of sync.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove old "unmanaged" sort implementation
> --
>
> Key: DRILL-6832
> URL: https://issues.apache.org/jira/browse/DRILL-6832
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.14.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
>Priority: Minor
>
> Several releases back Drill introduced a new "managed" external sort that 
> enhanced the sort operator's memory management. To be safe, at the time, the 
> new version was controlled by an option, with the ability to revert to the 
> old version.
> The new version has proven to be stable. The time has come to remove the old 
> version.
> * Remove the implementation in {{physical.impl.xsort}}.
> * Move the implementation from {{physical.impl.xsort.managed}} to the parent 
> package.
> * Remove the conditional code in the batch creator.
> * Remove the option that allowed disabling the new version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)