[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16375188#comment-16375188
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

Github user asfgit closed the pull request at:

https://github.com/apache/drill/pull/1110


> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.12.0
>Reporter: Hanumath Rao Maduri
>Assignee: Hanumath Rao Maduri
>Priority: Major
>  Labels: ready-to-commit
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. 
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from 
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +--+--+
> | text | json |
> +--+--+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], 
> groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec 
> [tableName=maprfs:///drill/tables/lineitem, condition=null], 
> columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor 
> fragments which are all merged on a single node with one merge receiver. 
> Doing so will create lot of memory pressure on the receiver node and also 
> execution bottleneck. To address this issue, merge receiver should be 
> multiphase merge receiver. 
> Ideally for large cluster one can introduce tree merges so that merging can 
> be done parallel. But as a first step I think it is better to use the 
> existing infrastructure for multiplexing operators to generate an OrderedMux 
> so that all the minor fragments pertaining to one DRILLBIT should be merged 
> and the merged data can be sent across to the receiver operator.
> On a 10 node cluster if each node processes 14 minor fragments.
> Current version of code merges 140 minor fragments
> the proposed version has two level merges 1 - 14 merge in each drillbit which 
> is parallel 
> and 10 minorfragments are merged at the receiver node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365069#comment-16365069
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

Github user HanumathRao commented on the issue:

https://github.com/apache/drill/pull/1110
  
@amansinha100  @vrozov  Thanks for the review. I have squashed all the 
commits into two commits.

Please merge these two commits individually into the apache master branch. 
First commit is for refactoring existing code. Second commit is about fixing 
this JIRA request.


> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.12.0
>Reporter: Hanumath Rao Maduri
>Assignee: Hanumath Rao Maduri
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. 
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from 
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +--+--+
> | text | json |
> +--+--+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], 
> groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec 
> [tableName=maprfs:///drill/tables/lineitem, condition=null], 
> columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor 
> fragments which are all merged on a single node with one merge receiver. 
> Doing so will create lot of memory pressure on the receiver node and also 
> execution bottleneck. To address this issue, merge receiver should be 
> multiphase merge receiver. 
> Ideally for large cluster one can introduce tree merges so that merging can 
> be done parallel. But as a first step I think it is better to use the 
> existing infrastructure for multiplexing operators to generate an OrderedMux 
> so that all the minor fragments pertaining to one DRILLBIT should be merged 
> and the merged data can be sent across to the receiver operator.
> On a 10 node cluster if each node processes 14 minor fragments.
> Current version of code merges 140 minor fragments
> the proposed version has two level merges 1 - 14 merge in each drillbit which 
> is parallel 
> and 10 minorfragments are merged at the receiver node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364992#comment-16364992
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

Github user vrozov commented on the issue:

https://github.com/apache/drill/pull/1110
  
LGTM, please squash two last commits.


> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.12.0
>Reporter: Hanumath Rao Maduri
>Assignee: Hanumath Rao Maduri
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. 
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from 
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +--+--+
> | text | json |
> +--+--+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], 
> groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec 
> [tableName=maprfs:///drill/tables/lineitem, condition=null], 
> columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor 
> fragments which are all merged on a single node with one merge receiver. 
> Doing so will create lot of memory pressure on the receiver node and also 
> execution bottleneck. To address this issue, merge receiver should be 
> multiphase merge receiver. 
> Ideally for large cluster one can introduce tree merges so that merging can 
> be done parallel. But as a first step I think it is better to use the 
> existing infrastructure for multiplexing operators to generate an OrderedMux 
> so that all the minor fragments pertaining to one DRILLBIT should be merged 
> and the merged data can be sent across to the receiver operator.
> On a 10 node cluster if each node processes 14 minor fragments.
> Current version of code merges 140 minor fragments
> the proposed version has two level merges 1 - 14 merge in each drillbit which 
> is parallel 
> and 10 minorfragments are merged at the receiver node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364969#comment-16364969
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

Github user amansinha100 commented on the issue:

https://github.com/apache/drill/pull/1110
  
LGTM.  +1


> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.12.0
>Reporter: Hanumath Rao Maduri
>Assignee: Hanumath Rao Maduri
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. 
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from 
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +--+--+
> | text | json |
> +--+--+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], 
> groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec 
> [tableName=maprfs:///drill/tables/lineitem, condition=null], 
> columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor 
> fragments which are all merged on a single node with one merge receiver. 
> Doing so will create lot of memory pressure on the receiver node and also 
> execution bottleneck. To address this issue, merge receiver should be 
> multiphase merge receiver. 
> Ideally for large cluster one can introduce tree merges so that merging can 
> be done parallel. But as a first step I think it is better to use the 
> existing infrastructure for multiplexing operators to generate an OrderedMux 
> so that all the minor fragments pertaining to one DRILLBIT should be merged 
> and the merged data can be sent across to the receiver operator.
> On a 10 node cluster if each node processes 14 minor fragments.
> Current version of code merges 140 minor fragments
> the proposed version has two level merges 1 - 14 merge in each drillbit which 
> is parallel 
> and 10 minorfragments are merged at the receiver node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364908#comment-16364908
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

Github user HanumathRao commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r168338663
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java
 ---
@@ -93,6 +94,21 @@ public PhysicalOperator 
getPhysicalOperator(PhysicalPlanCreator creator) throws
 return creator.addMetadata(this, g);
   }
 
+  /**
+   * This method creates a new OrderedMux exchange if mux operators are 
enabled.
+   * @param child input to the new muxPrel or new SingleMergeExchange node.
+   * @param options options manager to check if mux is enabled.
+   */
+  @Override
+  public Prel constructMuxPrel(Prel child, OptionManager options) throws 
RuntimeException {
+Prel outPrel = child;
+if 
(options.getOption(PlannerSettings.ORDERED_MUX_EXCHANGE.getOptionName()).bool_val)
 {
--- End diff --

@amansinha100  Thanks for the review . I have done the needed code changes 
to fix it. Please let me know if anything is required.


> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.12.0
>Reporter: Hanumath Rao Maduri
>Assignee: Hanumath Rao Maduri
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. 
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from 
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +--+--+
> | text | json |
> +--+--+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], 
> groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec 
> [tableName=maprfs:///drill/tables/lineitem, condition=null], 
> columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor 
> fragments which are all merged on a single node with one merge receiver. 
> Doing so will create lot of memory pressure on the receiver node and also 
> execution bottleneck. To address this issue, merge receiver should be 
> multiphase merge receiver. 
> Ideally for large cluster one can introduce tree merges so that merging can 
> be done parallel. But as a first step I think it is better to use the 
> existing infrastructure for multiplexing operators to generate an OrderedMux 
> so that all the minor fragments pertaining to one DRILLBIT should be merged 
> and the merged data can be sent across to the receiver operator.
> On a 10 node cluster if each node processes 14 minor fragments.
> Current version of code merges 140 minor fragments
> the proposed version has two level merges 1 - 14 merge in each drillbit which 
> is parallel 
> and 10 minorfragments are merged at the receiver node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16363267#comment-16363267
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

Github user amansinha100 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r168044397
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java
 ---
@@ -93,6 +94,21 @@ public PhysicalOperator 
getPhysicalOperator(PhysicalPlanCreator creator) throws
 return creator.addMetadata(this, g);
   }
 
+  /**
+   * This method creates a new OrderedMux exchange if mux operators are 
enabled.
+   * @param child input to the new muxPrel or new SingleMergeExchange node.
+   * @param options options manager to check if mux is enabled.
+   */
+  @Override
+  public Prel constructMuxPrel(Prel child, OptionManager options) throws 
RuntimeException {
+Prel outPrel = child;
+if 
(options.getOption(PlannerSettings.ORDERED_MUX_EXCHANGE.getOptionName()).bool_val)
 {
--- End diff --

@HanumathRao I think the ordered_mux should be created when both mux and 
ordered_mux flags are enabled.  Users who disable the global 'mux' flag would 
very likely expect that all mux exchanges are disabled. 


> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.12.0
>Reporter: Hanumath Rao Maduri
>Assignee: Hanumath Rao Maduri
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. 
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from 
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +--+--+
> | text | json |
> +--+--+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], 
> groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec 
> [tableName=maprfs:///drill/tables/lineitem, condition=null], 
> columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor 
> fragments which are all merged on a single node with one merge receiver. 
> Doing so will create lot of memory pressure on the receiver node and also 
> execution bottleneck. To address this issue, merge receiver should be 
> multiphase merge receiver. 
> Ideally for large cluster one can introduce tree merges so that merging can 
> be done parallel. But as a first step I think it is better to use the 
> existing infrastructure for multiplexing operators to generate an OrderedMux 
> so that all the minor fragments pertaining to one DRILLBIT should be merged 
> and the merged data can be sent across to the receiver operator.
> On a 10 node cluster if each node processes 14 minor fragments.
> Current version of code merges 140 minor fragments
> the proposed version has two level merges 1 - 14 merge in each drillbit which 
> is parallel 
> and 10 minorfragments are merged at the receiver node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16363171#comment-16363171
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

Github user HanumathRao commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r168025910
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java
 ---
@@ -37,14 +37,8 @@ public UnorderedMuxExchange(@JsonProperty("child") 
PhysicalOperator child) {
   }
 
   @Override
-  public Receiver getReceiver(int minorFragmentId) {
-createSenderReceiverMapping();
-
-List senders = 
receiverToSenderMapping.get(minorFragmentId);
-if (senders == null || senders.size() <= 0) {
-  throw new IllegalStateException(String.format("Failed to find 
senders for receiver [%d]", minorFragmentId));
-}
-
+  protected Receiver getReceiverInternal(int oppositeMajorFragmentId,
+ List 
senders, boolean spooling) {
 return new UnorderedReceiver(senderMajorFragmentId, senders, false);
--- End diff --

Done.


> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.12.0
>Reporter: Hanumath Rao Maduri
>Assignee: Hanumath Rao Maduri
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. 
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from 
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +--+--+
> | text | json |
> +--+--+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], 
> groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec 
> [tableName=maprfs:///drill/tables/lineitem, condition=null], 
> columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor 
> fragments which are all merged on a single node with one merge receiver. 
> Doing so will create lot of memory pressure on the receiver node and also 
> execution bottleneck. To address this issue, merge receiver should be 
> multiphase merge receiver. 
> Ideally for large cluster one can introduce tree merges so that merging can 
> be done parallel. But as a first step I think it is better to use the 
> existing infrastructure for multiplexing operators to generate an OrderedMux 
> so that all the minor fragments pertaining to one DRILLBIT should be merged 
> and the merged data can be sent across to the receiver operator.
> On a 10 node cluster if each node processes 14 minor fragments.
> Current version of code merges 140 minor fragments
> the proposed version has two level merges 1 - 14 merge in each drillbit which 
> is parallel 
> and 10 minorfragments are merged at the receiver node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16363140#comment-16363140
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

Github user HanumathRao commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r168021043
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractMuxExchange.java
 ---
@@ -92,24 +92,19 @@ public Sender getSender(int minorFragmentId, 
PhysicalOperator child) {
 return new SingleSender(receiverMajorFragmentId, receiver.getId(), 
child, receiver.getEndpoint());
   }
 
-
-  @Override
-  public final Receiver getReceiver(int minorFragmentId) {
+  protected final List getSenders(int 
minorFragmentId) {
 createSenderReceiverMapping();
 
 List senders = 
receiverToSenderMapping.get(minorFragmentId);
 
-logger.debug(String.format("Minor fragment %d, receives data from 
following senders %s", minorFragmentId, senders));
+logger.debug("Minor fragment %d, receives data from following senders 
%s", minorFragmentId, senders);
--- End diff --

Done. 


> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.12.0
>Reporter: Hanumath Rao Maduri
>Assignee: Hanumath Rao Maduri
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. 
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from 
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +--+--+
> | text | json |
> +--+--+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], 
> groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec 
> [tableName=maprfs:///drill/tables/lineitem, condition=null], 
> columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor 
> fragments which are all merged on a single node with one merge receiver. 
> Doing so will create lot of memory pressure on the receiver node and also 
> execution bottleneck. To address this issue, merge receiver should be 
> multiphase merge receiver. 
> Ideally for large cluster one can introduce tree merges so that merging can 
> be done parallel. But as a first step I think it is better to use the 
> existing infrastructure for multiplexing operators to generate an OrderedMux 
> so that all the minor fragments pertaining to one DRILLBIT should be merged 
> and the merged data can be sent across to the receiver operator.
> On a 10 node cluster if each node processes 14 minor fragments.
> Current version of code merges 140 minor fragments
> the proposed version has two level merges 1 - 14 merge in each drillbit which 
> is parallel 
> and 10 minorfragments are merged at the receiver node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362990#comment-16362990
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167988964
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractMuxExchange.java
 ---
@@ -92,24 +92,19 @@ public Sender getSender(int minorFragmentId, 
PhysicalOperator child) {
 return new SingleSender(receiverMajorFragmentId, receiver.getId(), 
child, receiver.getEndpoint());
   }
 
-
-  @Override
-  public final Receiver getReceiver(int minorFragmentId) {
+  protected final List getSenders(int 
minorFragmentId) {
 createSenderReceiverMapping();
 
 List senders = 
receiverToSenderMapping.get(minorFragmentId);
 
-logger.debug(String.format("Minor fragment %d, receives data from 
following senders %s", minorFragmentId, senders));
+logger.debug("Minor fragment %d, receives data from following senders 
%s", minorFragmentId, senders);
--- End diff --

use {} in place of %d and %s.


> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.12.0
>Reporter: Hanumath Rao Maduri
>Assignee: Hanumath Rao Maduri
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. 
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from 
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +--+--+
> | text | json |
> +--+--+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], 
> groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec 
> [tableName=maprfs:///drill/tables/lineitem, condition=null], 
> columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor 
> fragments which are all merged on a single node with one merge receiver. 
> Doing so will create lot of memory pressure on the receiver node and also 
> execution bottleneck. To address this issue, merge receiver should be 
> multiphase merge receiver. 
> Ideally for large cluster one can introduce tree merges so that merging can 
> be done parallel. But as a first step I think it is better to use the 
> existing infrastructure for multiplexing operators to generate an OrderedMux 
> so that all the minor fragments pertaining to one DRILLBIT should be merged 
> and the merged data can be sent across to the receiver operator.
> On a 10 node cluster if each node processes 14 minor fragments.
> Current version of code merges 140 minor fragments
> the proposed version has two level merges 1 - 14 merge in each drillbit which 
> is parallel 
> and 10 minorfragments are merged at the receiver node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362860#comment-16362860
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167956002
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java
 ---
@@ -20,133 +20,41 @@
 import com.google.common.collect.Lists;
 
 import org.apache.drill.exec.planner.physical.ExchangePrel;
-import org.apache.drill.exec.planner.physical.HashPrelUtil;
-import 
org.apache.drill.exec.planner.physical.HashPrelUtil.HashExpressionCreatorHelper;
-import org.apache.drill.exec.planner.physical.HashToRandomExchangePrel;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.physical.Prel;
-import org.apache.drill.exec.planner.physical.ProjectPrel;
-import 
org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
-import org.apache.drill.exec.planner.physical.UnorderedDeMuxExchangePrel;
-import org.apache.drill.exec.planner.physical.UnorderedMuxExchangePrel;
-import org.apache.drill.exec.planner.sql.DrillSqlOperator;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexUtil;
-
-import java.math.BigDecimal;
-import java.util.Collections;
 import java.util.List;
 
 public class InsertLocalExchangeVisitor extends BasePrelVisitor {
-  private final boolean isMuxEnabled;
-  private final boolean isDeMuxEnabled;
-
-
-  public static class RexNodeBasedHashExpressionCreatorHelper implements 
HashExpressionCreatorHelper {
-private final RexBuilder rexBuilder;
+  private final OptionManager options;
 
-public RexNodeBasedHashExpressionCreatorHelper(RexBuilder rexBuilder) {
-  this.rexBuilder = rexBuilder;
-}
-
-@Override
-public RexNode createCall(String funcName, List inputFields) {
-  final DrillSqlOperator op =
-  new DrillSqlOperator(funcName, inputFields.size(), true, false);
-  return rexBuilder.makeCall(op, inputFields);
+  private static boolean isMuxEnabled(OptionManager options) {
+if 
(options.getOption(PlannerSettings.MUX_EXCHANGE.getOptionName()).bool_val ||
--- End diff --

use `return` instead of `if`


> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.12.0
>Reporter: Hanumath Rao Maduri
>Assignee: Hanumath Rao Maduri
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. 
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from 
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +--+--+
> | text | json |
> +--+--+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], 
> groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec 
> [tableName=maprfs:///drill/tables/lineitem, condition=null], 
> columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor 
> fragments which are all merged on a single node with one merge receiver. 
> Doing so will create lot of memory pressure on the receiver node and also 
> execution bottleneck. To address this issue, merge receiver should be 
> multiphase merge receiver. 
> Ideally for large cluster one can introduce tree merges so that merging can 
> be done parallel. But as a first step I think it is better to use the 
> existing infrastructure for multiplexing operators to generate an OrderedMux 
> so that all the minor fragments pertaining to one DRILLBIT should be merged 
> and the merged data can be sent across to the receiver

[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362862#comment-16362862
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167960149
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java
 ---
@@ -37,14 +37,8 @@ public UnorderedMuxExchange(@JsonProperty("child") 
PhysicalOperator child) {
   }
 
   @Override
-  public Receiver getReceiver(int minorFragmentId) {
-createSenderReceiverMapping();
-
-List senders = 
receiverToSenderMapping.get(minorFragmentId);
-if (senders == null || senders.size() <= 0) {
-  throw new IllegalStateException(String.format("Failed to find 
senders for receiver [%d]", minorFragmentId));
-}
-
+  protected Receiver getReceiverInternal(int oppositeMajorFragmentId,
+ List 
senders, boolean spooling) {
 return new UnorderedReceiver(senderMajorFragmentId, senders, false);
--- End diff --

Consider creating helper method like `getSenders()` that will return ` 
List` instead of `getReceiverInternal()`.


> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.12.0
>Reporter: Hanumath Rao Maduri
>Assignee: Hanumath Rao Maduri
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. 
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from 
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +--+--+
> | text | json |
> +--+--+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], 
> groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec 
> [tableName=maprfs:///drill/tables/lineitem, condition=null], 
> columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor 
> fragments which are all merged on a single node with one merge receiver. 
> Doing so will create lot of memory pressure on the receiver node and also 
> execution bottleneck. To address this issue, merge receiver should be 
> multiphase merge receiver. 
> Ideally for large cluster one can introduce tree merges so that merging can 
> be done parallel. But as a first step I think it is better to use the 
> existing infrastructure for multiplexing operators to generate an OrderedMux 
> so that all the minor fragments pertaining to one DRILLBIT should be merged 
> and the merged data can be sent across to the receiver operator.
> On a 10 node cluster if each node processes 14 minor fragments.
> Current version of code merges 140 minor fragments
> the proposed version has two level merges 1 - 14 merge in each drillbit which 
> is parallel 
> and 10 minorfragments are merged at the receiver node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362861#comment-16362861
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167954543
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java
 ---
@@ -37,14 +37,8 @@ public UnorderedMuxExchange(@JsonProperty("child") 
PhysicalOperator child) {
   }
 
   @Override
-  public Receiver getReceiver(int minorFragmentId) {
-createSenderReceiverMapping();
-
-List senders = 
receiverToSenderMapping.get(minorFragmentId);
-if (senders == null || senders.size() <= 0) {
-  throw new IllegalStateException(String.format("Failed to find 
senders for receiver [%d]", minorFragmentId));
-}
-
+  protected Receiver getReceiverInternal(int oppositeMajorFragmentId,
+ List 
senders, boolean spooling) {
 return new UnorderedReceiver(senderMajorFragmentId, senders, false);
--- End diff --

`spooling` parameter is ignored, is this expected?


> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.12.0
>Reporter: Hanumath Rao Maduri
>Assignee: Hanumath Rao Maduri
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. 
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from 
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +--+--+
> | text | json |
> +--+--+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], 
> groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec 
> [tableName=maprfs:///drill/tables/lineitem, condition=null], 
> columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor 
> fragments which are all merged on a single node with one merge receiver. 
> Doing so will create lot of memory pressure on the receiver node and also 
> execution bottleneck. To address this issue, merge receiver should be 
> multiphase merge receiver. 
> Ideally for large cluster one can introduce tree merges so that merging can 
> be done parallel. But as a first step I think it is better to use the 
> existing infrastructure for multiplexing operators to generate an OrderedMux 
> so that all the minor fragments pertaining to one DRILLBIT should be merged 
> and the merged data can be sent across to the receiver operator.
> On a 10 node cluster if each node processes 14 minor fragments.
> Current version of code merges 140 minor fragments
> the proposed version has two level merges 1 - 14 merge in each drillbit which 
> is parallel 
> and 10 minorfragments are merged at the receiver node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362859#comment-16362859
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167953826
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractMuxExchange.java
 ---
@@ -90,6 +92,24 @@ public Sender getSender(int minorFragmentId, 
PhysicalOperator child) {
 return new SingleSender(receiverMajorFragmentId, receiver.getId(), 
child, receiver.getEndpoint());
   }
 
+
+  @Override
+  public final Receiver getReceiver(int minorFragmentId) {
+createSenderReceiverMapping();
+
+List senders = 
receiverToSenderMapping.get(minorFragmentId);
+
+logger.debug(String.format("Minor fragment %d, receives data from 
following senders %s", minorFragmentId, senders));
--- End diff --

Use SLF4J smart logging instead of `String.format`.


> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.12.0
>Reporter: Hanumath Rao Maduri
>Assignee: Hanumath Rao Maduri
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. 
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from 
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +--+--+
> | text | json |
> +--+--+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], 
> groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec 
> [tableName=maprfs:///drill/tables/lineitem, condition=null], 
> columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor 
> fragments which are all merged on a single node with one merge receiver. 
> Doing so will create lot of memory pressure on the receiver node and also 
> execution bottleneck. To address this issue, merge receiver should be 
> multiphase merge receiver. 
> Ideally for large cluster one can introduce tree merges so that merging can 
> be done parallel. But as a first step I think it is better to use the 
> existing infrastructure for multiplexing operators to generate an OrderedMux 
> so that all the minor fragments pertaining to one DRILLBIT should be merged 
> and the merged data can be sent across to the receiver operator.
> On a 10 node cluster if each node processes 14 minor fragments.
> Current version of code merges 140 minor fragments
> the proposed version has two level merges 1 - 14 merge in each drillbit which 
> is parallel 
> and 10 minorfragments are merged at the receiver node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362756#comment-16362756
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

Github user HanumathRao commented on the issue:

https://github.com/apache/drill/pull/1110
  
@amansinha100 @vrozov  Thank you for the review. I have addressed all the 
review comments. Please let me know if any changes are required.

The commits are organized such that one commit is for refactoring the 
existing code and the second one is specific to the changes required for this 
JIRA. This is done for ease of reviewing the code. 


> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.12.0
>Reporter: Hanumath Rao Maduri
>Assignee: Hanumath Rao Maduri
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. 
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from 
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +--+--+
> | text | json |
> +--+--+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], 
> groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec 
> [tableName=maprfs:///drill/tables/lineitem, condition=null], 
> columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor 
> fragments which are all merged on a single node with one merge receiver. 
> Doing so will create lot of memory pressure on the receiver node and also 
> execution bottleneck. To address this issue, merge receiver should be 
> multiphase merge receiver. 
> Ideally for large cluster one can introduce tree merges so that merging can 
> be done parallel. But as a first step I think it is better to use the 
> existing infrastructure for multiplexing operators to generate an OrderedMux 
> so that all the minor fragments pertaining to one DRILLBIT should be merged 
> and the merged data can be sent across to the receiver operator.
> On a 10 node cluster if each node processes 14 minor fragments.
> Current version of code merges 140 minor fragments
> the proposed version has two level merges 1 - 14 merge in each drillbit which 
> is parallel 
> and 10 minorfragments are merged at the receiver node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16361119#comment-16361119
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167626024
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ExchangePrel.java
 ---
@@ -34,4 +37,14 @@ public ExchangePrel(RelOptCluster cluster, RelTraitSet 
traits, RelNode child) {
 return logicalVisitor.visitExchange(this, value);
   }
 
+  /**
+   * The derived classes can override this method to create relevant mux 
exchanges.
+   * If this method is not overrided the default behaviour is to clone 
itself.
+   * @param child input to the new muxPrel or new Exchange node.
+   * @param options options manager to check if mux is enabled.
+   */
+  public Prel getMuxPrel(Prel child, OptionManager options) {
--- End diff --

Consider renaming to `constructMuxPrel`.


> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.12.0
>Reporter: Hanumath Rao Maduri
>Assignee: Hanumath Rao Maduri
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. 
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from 
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +--+--+
> | text | json |
> +--+--+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], 
> groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec 
> [tableName=maprfs:///drill/tables/lineitem, condition=null], 
> columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor 
> fragments which are all merged on a single node with one merge receiver. 
> Doing so will create lot of memory pressure on the receiver node and also 
> execution bottleneck. To address this issue, merge receiver should be 
> multiphase merge receiver. 
> Ideally for large cluster one can introduce tree merges so that merging can 
> be done parallel. But as a first step I think it is better to use the 
> existing infrastructure for multiplexing operators to generate an OrderedMux 
> so that all the minor fragments pertaining to one DRILLBIT should be merged 
> and the merged data can be sent across to the receiver operator.
> On a 10 node cluster if each node processes 14 minor fragments.
> Current version of code merges 140 minor fragments
> the proposed version has two level merges 1 - 14 merge in each drillbit which 
> is parallel 
> and 10 minorfragments are merged at the receiver node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16361118#comment-16361118
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167587159
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedMuxExchange.java
 ---
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import java.util.List;
+
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.common.logical.data.Order.Ordering;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * OrderedMuxExchange is a version of MuxExchange where the incoming 
batches are sorted
+ * merge operation is performed to produced a sorted stream as output.
+ */
+@JsonTypeName("ordered-mux-exchange")
+public class OrderedMuxExchange extends AbstractMuxExchange {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(OrderedMuxExchange.class);
+
+  private final List orderings;
+
+  public OrderedMuxExchange(@JsonProperty("child") PhysicalOperator child, 
@JsonProperty("orderings")List orderings) {
+super(child);
+this.orderings = orderings;
+  }
+
+  @Override
+  public Receiver getReceiver(int minorFragmentId) {
+createSenderReceiverMapping();
+
+List senders = 
receiverToSenderMapping.get(minorFragmentId);
+if (senders == null || senders.size() <= 0) {
+  throw new IllegalStateException(String.format("Failed to find 
senders for receiver [%d]", minorFragmentId));
+}
+
+if (logger.isDebugEnabled()) {
+  logger.debug(String.format("Minor fragment %d, receives data from 
following senders %s", minorFragmentId, senders));
--- End diff --

Use smart SLF4J logging, remove `isDebugEnabled()` check, consider moving 
logging prior to the exception.


> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.12.0
>Reporter: Hanumath Rao Maduri
>Assignee: Hanumath Rao Maduri
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. 
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from 
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +--+--+
> | text | json |
> +--+--+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], 
> groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec 
> [tableName=maprfs:///drill/tables/lineitem, condition=null], 
> columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor 
> fragments which are all merged on a single node with one merge receiver. 
> Doing so will create lot of memory pressure on the receiver node and also 
> execution bottleneck. To address this issue, merge receiver should be 
> multiphase merge re

[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16361122#comment-16361122
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167625240
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedMuxExchange.java
 ---
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import java.util.List;
+
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.common.logical.data.Order.Ordering;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * OrderedMuxExchange is a version of MuxExchange where the incoming 
batches are sorted
+ * merge operation is performed to produced a sorted stream as output.
+ */
+@JsonTypeName("ordered-mux-exchange")
+public class OrderedMuxExchange extends AbstractMuxExchange {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(OrderedMuxExchange.class);
+
+  private final List orderings;
+
+  public OrderedMuxExchange(@JsonProperty("child") PhysicalOperator child, 
@JsonProperty("orderings")List orderings) {
+super(child);
+this.orderings = orderings;
+  }
+
+  @Override
+  public Receiver getReceiver(int minorFragmentId) {
+createSenderReceiverMapping();
--- End diff --

Consider moving this functionality to the parent class and keep only 
creating an instance of concrete `MergingReceiver` in the subclass.


> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.12.0
>Reporter: Hanumath Rao Maduri
>Assignee: Hanumath Rao Maduri
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. 
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from 
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +--+--+
> | text | json |
> +--+--+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], 
> groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec 
> [tableName=maprfs:///drill/tables/lineitem, condition=null], 
> columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor 
> fragments which are all merged on a single node with one merge receiver. 
> Doing so will create lot of memory pressure on the receiver node and also 
> execution bottleneck. To address this issue, merge receiver should be 
> multiphase merge receiver. 
> Ideally for large cluster one can introduce tree merges so that merging can 
> be done parallel. But as a first step I think it is better to use the 
> existing infrastructure for multiplexing operators to generate an OrderedMux 
> so that all the minor fragments pertaining to one DRILLBIT should be merged 
> and the merged data can be sent across to the receiver operator.
> On a 10 node cluste

[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16361120#comment-16361120
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167589289
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java
 ---
@@ -112,6 +120,73 @@ public RelWriter explainTerms(RelWriter pw) {
 return pw;
   }
 
+  /**
+   * This method creates a new UnorderedMux and Demux exchanges if mux 
operators are enabled.
+   * @param child input to the new Unordered[Mux/Demux]Prel or new 
HashToRandomExchange node.
+   * @param options options manager to check if mux is enabled.
+   */
+  @Override
+  public Prel getMuxPrel(Prel child, OptionManager options) {
+boolean isMuxEnabled = 
options.getOption(PlannerSettings.MUX_EXCHANGE.getOptionName()).bool_val;
+Prel newPrel = child;
+
+final List childFields = child.getRowType().getFieldNames();
+
+List  removeUpdatedExpr = null;
+
+if (isMuxEnabled) {
+  // Insert Project Operator with new column that will be a hash for 
HashToRandomExchange fields
+  final List distFields = getFields();
+  final List outputFieldNames = 
Lists.newArrayList(childFields);
+  final RexBuilder rexBuilder = getCluster().getRexBuilder();
+  final List childRowTypeFields = 
child.getRowType().getFieldList();
+
+  final HashPrelUtil.HashExpressionCreatorHelper hashHelper = 
new HashPrelUtil.RexNodeBasedHashExpressionCreatorHelper(rexBuilder);
+  final List distFieldRefs = 
Lists.newArrayListWithExpectedSize(distFields.size());
+  for(int i=0; i updatedExpr = 
Lists.newArrayListWithExpectedSize(childRowTypeFields.size());
+  removeUpdatedExpr = 
Lists.newArrayListWithExpectedSize(childRowTypeFields.size());
+  for ( RelDataTypeField field : childRowTypeFields) {
+RexNode rex = rexBuilder.makeInputRef(field.getType(), 
field.getIndex());
+updatedExpr.add(rex);
+removeUpdatedExpr.add(rex);
+  }
+
+  outputFieldNames.add(HashPrelUtil.HASH_EXPR_NAME);
+  final RexNode distSeed = 
rexBuilder.makeBigintLiteral(BigDecimal.valueOf(HashPrelUtil.DIST_SEED)); // 
distribution seed
+  
updatedExpr.add(HashPrelUtil.createHashBasedPartitionExpression(distFieldRefs, 
distSeed, hashHelper));
+
+  RelDataType rowType = 
RexUtil.createStructType(getCluster().getTypeFactory(), updatedExpr, 
outputFieldNames);
+
+  ProjectPrel addColumnprojectPrel = new 
ProjectPrel(child.getCluster(), child.getTraitSet(), child, updatedExpr, 
rowType);
+
+  newPrel = new 
UnorderedMuxExchangePrel(addColumnprojectPrel.getCluster(), 
addColumnprojectPrel.getTraitSet(),
+  addColumnprojectPrel);
+}
+
+newPrel = new HashToRandomExchangePrel(getCluster(), getTraitSet(), 
newPrel, getFields());
+
+if 
(options.getOption(PlannerSettings.DEMUX_EXCHANGE.getOptionName()).bool_val) {
+  HashToRandomExchangePrel hashExchangePrel = 
(HashToRandomExchangePrel) newPrel;
+  // Insert a DeMuxExchange to narrow down the number of receivers
+  newPrel = new UnorderedDeMuxExchangePrel(getCluster(), 
getTraitSet(), hashExchangePrel,
+  hashExchangePrel.getFields());
+}
+
+if ( isMuxEnabled ) {
--- End diff --

Use consistent formating.


> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.12.0
>Reporter: Hanumath Rao Maduri
>Assignee: Hanumath Rao Maduri
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. 
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from 
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +--+--+
> | text | json |
> +--+--+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/l

[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16361121#comment-16361121
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167616026
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedMuxExchange.java
 ---
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import java.util.List;
+
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.common.logical.data.Order.Ordering;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * OrderedMuxExchange is a version of MuxExchange where the incoming 
batches are sorted
+ * merge operation is performed to produced a sorted stream as output.
+ */
+@JsonTypeName("ordered-mux-exchange")
+public class OrderedMuxExchange extends AbstractMuxExchange {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(OrderedMuxExchange.class);
+
+  private final List orderings;
+
+  public OrderedMuxExchange(@JsonProperty("child") PhysicalOperator child, 
@JsonProperty("orderings")List orderings) {
+super(child);
+this.orderings = orderings;
+  }
+
+  @Override
+  public Receiver getReceiver(int minorFragmentId) {
+createSenderReceiverMapping();
+
+List senders = 
receiverToSenderMapping.get(minorFragmentId);
+if (senders == null || senders.size() <= 0) {
+  throw new IllegalStateException(String.format("Failed to find 
senders for receiver [%d]", minorFragmentId));
+}
+
+if (logger.isDebugEnabled()) {
+  logger.debug(String.format("Minor fragment %d, receives data from 
following senders %s", minorFragmentId, senders));
+}
+
+return new MergingReceiverPOP(senderMajorFragmentId, senders, 
orderings, false);
--- End diff --

I don't think that locality plays a role in enabling/disabling spooling. If 
spooling was disabled for a remote receiver, it should be also disabled for a 
local receiver.


> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.12.0
>Reporter: Hanumath Rao Maduri
>Assignee: Hanumath Rao Maduri
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. 
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from 
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +--+--+
> | text | json |
> +--+--+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], 
> groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec 
> [tableName=maprfs:///drill/tables/lineitem, condition=null], 
> columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor 
> fragments which are all merged on a single node with one merge receiver.

[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16361117#comment-16361117
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167586208
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
 ---
@@ -119,7 +119,7 @@ public final void setupReceivers(int majorFragmentId, 
List rec
   }
 
   @Override
-  public final  T accept(PhysicalVisitor physicalVisitor, X value) throws E {
+  public  T accept(PhysicalVisitor 
physicalVisitor, X value) throws E {
--- End diff --

Is this change still required?


> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.12.0
>Reporter: Hanumath Rao Maduri
>Assignee: Hanumath Rao Maduri
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. 
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from 
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +--+--+
> | text | json |
> +--+--+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], 
> groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec 
> [tableName=maprfs:///drill/tables/lineitem, condition=null], 
> columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor 
> fragments which are all merged on a single node with one merge receiver. 
> Doing so will create lot of memory pressure on the receiver node and also 
> execution bottleneck. To address this issue, merge receiver should be 
> multiphase merge receiver. 
> Ideally for large cluster one can introduce tree merges so that merging can 
> be done parallel. But as a first step I think it is better to use the 
> existing infrastructure for multiplexing operators to generate an OrderedMux 
> so that all the minor fragments pertaining to one DRILLBIT should be merged 
> and the merged data can be sent across to the receiver operator.
> On a 10 node cluster if each node processes 14 minor fragments.
> Current version of code merges 140 minor fragments
> the proposed version has two level merges 1 - 14 merge in each drillbit which 
> is parallel 
> and 10 minorfragments are merged at the receiver node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360111#comment-16360111
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

Github user amansinha100 commented on the issue:

https://github.com/apache/drill/pull/1110
  
@HanumathRao I have a few comments in the JIRA for the overall design; we 
can discuss. 


> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.12.0
>Reporter: Hanumath Rao Maduri
>Assignee: Hanumath Rao Maduri
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. 
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from 
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +--+--+
> | text | json |
> +--+--+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], 
> groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec 
> [tableName=maprfs:///drill/tables/lineitem, condition=null], 
> columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor 
> fragments which are all merged on a single node with one merge receiver. 
> Doing so will create lot of memory pressure on the receiver node and also 
> execution bottleneck. To address this issue, merge receiver should be 
> multiphase merge receiver. 
> Ideally for large cluster one can introduce tree merges so that merging can 
> be done parallel. But as a first step I think it is better to use the 
> existing infrastructure for multiplexing operators to generate an OrderedMux 
> so that all the minor fragments pertaining to one DRILLBIT should be merged 
> and the merged data can be sent across to the receiver operator.
> On a 10 node cluster if each node processes 14 minor fragments.
> Current version of code merges 140 minor fragments
> the proposed version has two level merges 1 - 14 merge in each drillbit which 
> is parallel 
> and 10 minorfragments are merged at the receiver node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360110#comment-16360110
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

Github user amansinha100 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167448218
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedMuxExchange.java
 ---
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import java.util.List;
+
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.common.logical.data.Order.Ordering;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * OrderedMuxExchange is a version of MuxExchange where the incoming 
batches are sorted
+ * merge operation is performed to produced a sorted stream as output.
+ */
+@JsonTypeName("ordered-mux-exchange")
+public class OrderedMuxExchange extends AbstractMuxExchange {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(OrderedMuxExchange.class);
+
+  private final List orderings;
+
+  public OrderedMuxExchange(@JsonProperty("child") PhysicalOperator child, 
@JsonProperty("orderings")List orderings) {
+super(child);
+this.orderings = orderings;
+  }
+
+  @Override
+  public Receiver getReceiver(int minorFragmentId) {
+createSenderReceiverMapping();
+
+List senders = 
receiverToSenderMapping.get(minorFragmentId);
+if (senders == null || senders.size() <= 0) {
+  throw new IllegalStateException(String.format("Failed to find 
senders for receiver [%d]", minorFragmentId));
+}
+
+if (logger.isDebugEnabled()) {
+  logger.debug(String.format("Minor fragment %d, receives data from 
following senders %s", minorFragmentId, senders));
+}
+
+return new MergingReceiverPOP(senderMajorFragmentId, senders, 
orderings, false);
--- End diff --

The HashToMergeExchange creates a MergingReciver with spooling TRUE, 
whereas the SingleMergeExchange creates one with spooling FALSE.  Although we 
don't test the spooling, I feel the new OrderedMuxExchange should probably have 
the same spooling setting as the HashToMergeExchange since both do the merge on 
local drill bits vs the foreman. 


> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.12.0
>Reporter: Hanumath Rao Maduri
>Assignee: Hanumath Rao Maduri
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. 
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from 
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +--+--+
> | text | json |
> +--+--+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], 
> groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec 
> [tableName=maprfs:///drill/tables/lineitem, condition=null], 
> columns=[`L_LINENUMBER

[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360109#comment-16360109
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

Github user amansinha100 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167447863
  
--- Diff: 
exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOrderedMuxExchange.java
 ---
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl;
+
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClientFixture;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestOrderedMuxExchange extends PlanTestBase {
+
+  private final static String ORDERED_MUX_EXCHANGE = "OrderedMuxExchange";
+
+
+  private void validateResults(BufferAllocator allocator, 
List results) throws SchemaChangeException {
+long previousBigInt = Long.MIN_VALUE;
+
+for (QueryDataBatch b : results) {
+  RecordBatchLoader loader = new RecordBatchLoader(allocator);
+  if (b.getHeader().getRowCount() > 0) {
+loader.load(b.getHeader().getDef(),b.getData());
+@SuppressWarnings({ "deprecation", "resource" })
+IntVector c1 = (IntVector) 
loader.getValueAccessorById(IntVector.class,
+   loader.getValueVectorId(new SchemaPath("id_i", 
ExpressionPosition.UNKNOWN)).getFieldIds()).getValueVector();
+IntVector.Accessor a1 = c1.getAccessor();
+
+for (int i = 0; i < c1.getAccessor().getValueCount(); i++) {
+  assertTrue(String.format("%d > %d", previousBigInt, a1.get(i)), 
previousBigInt <= a1.get(i));
+  previousBigInt = a1.get(i);
+}
+  }
+  loader.clear();
+  b.release();
+}
+  }
+
+  /**
+   * Test case to verify the OrderedMuxExchange created for order by 
clause.
+   * It checks by forcing the plan to create OrderedMuxExchange and also 
verifies the
+   * output column is ordered.
+   *
+   * @throws Exception if anything goes wrong
+   */
+
+  @Test
+  public void testOrderedMuxForOrderBy() throws Exception {
+ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+.maxParallelization(1)
+
.configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, true)
+;
+
+try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+  client.alterSession(ExecConstants.SLICE_TARGET, 10);
+  String sql = "SELECT id_i, name_s10 FROM `mock`.`employees_10K` 
ORDER BY id_i limit 10";
--- End diff --

I am not sure how the table is organized..does it have already ordered id_i 
column ? if so, we should use a different column. 


> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Version

[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360108#comment-16360108
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

Github user amansinha100 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167448543
  
--- Diff: 
exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOrderedMuxExchange.java
 ---
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl;
+
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClientFixture;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestOrderedMuxExchange extends PlanTestBase {
+
+  private final static String ORDERED_MUX_EXCHANGE = "OrderedMuxExchange";
+
+
+  private void validateResults(BufferAllocator allocator, 
List results) throws SchemaChangeException {
+long previousBigInt = Long.MIN_VALUE;
+
+for (QueryDataBatch b : results) {
+  RecordBatchLoader loader = new RecordBatchLoader(allocator);
+  if (b.getHeader().getRowCount() > 0) {
+loader.load(b.getHeader().getDef(),b.getData());
+@SuppressWarnings({ "deprecation", "resource" })
+IntVector c1 = (IntVector) 
loader.getValueAccessorById(IntVector.class,
+   loader.getValueVectorId(new SchemaPath("id_i", 
ExpressionPosition.UNKNOWN)).getFieldIds()).getValueVector();
+IntVector.Accessor a1 = c1.getAccessor();
+
+for (int i = 0; i < c1.getAccessor().getValueCount(); i++) {
+  assertTrue(String.format("%d > %d", previousBigInt, a1.get(i)), 
previousBigInt <= a1.get(i));
+  previousBigInt = a1.get(i);
+}
+  }
+  loader.clear();
+  b.release();
+}
+  }
+
+  /**
+   * Test case to verify the OrderedMuxExchange created for order by 
clause.
+   * It checks by forcing the plan to create OrderedMuxExchange and also 
verifies the
+   * output column is ordered.
+   *
+   * @throws Exception if anything goes wrong
+   */
+
+  @Test
+  public void testOrderedMuxForOrderBy() throws Exception {
+ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+.maxParallelization(1)
+
.configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, true)
+;
+
+try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+  client.alterSession(ExecConstants.SLICE_TARGET, 10);
+  String sql = "SELECT id_i, name_s10 FROM `mock`.`employees_10K` 
ORDER BY id_i limit 10";
+
+  String explainText = client.queryBuilder().sql(sql).explainText();
+  assertTrue(explainText.contains(ORDERED_MUX_EXCHANGE));
+  validateResults(client.allocator(), 
client.queryBuilder().sql(sql).results());
+}
+  }
+
+  /**
+   * Test case to verify the OrderedMuxExchange created for window 
functions.
+   * It checks by forcing the plan to create OrderedMuxExchange and also 
verifies the
+   * output column is ordered.
+   *
+   * @throws Exception if anything goes wrong
+   */
+
+  @Test
+  public void te

[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360107#comment-16360107
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

Github user amansinha100 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167447232
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java
 ---
@@ -20,133 +20,34 @@
 import com.google.common.collect.Lists;
 
 import org.apache.drill.exec.planner.physical.ExchangePrel;
-import org.apache.drill.exec.planner.physical.HashPrelUtil;
-import 
org.apache.drill.exec.planner.physical.HashPrelUtil.HashExpressionCreatorHelper;
-import org.apache.drill.exec.planner.physical.HashToRandomExchangePrel;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.physical.Prel;
-import org.apache.drill.exec.planner.physical.ProjectPrel;
-import 
org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
-import org.apache.drill.exec.planner.physical.UnorderedDeMuxExchangePrel;
-import org.apache.drill.exec.planner.physical.UnorderedMuxExchangePrel;
-import org.apache.drill.exec.planner.sql.DrillSqlOperator;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexUtil;
-
-import java.math.BigDecimal;
-import java.util.Collections;
 import java.util.List;
 
 public class InsertLocalExchangeVisitor extends BasePrelVisitor {
-  private final boolean isMuxEnabled;
-  private final boolean isDeMuxEnabled;
-
-
-  public static class RexNodeBasedHashExpressionCreatorHelper implements 
HashExpressionCreatorHelper {
-private final RexBuilder rexBuilder;
-
-public RexNodeBasedHashExpressionCreatorHelper(RexBuilder rexBuilder) {
-  this.rexBuilder = rexBuilder;
-}
-
-@Override
-public RexNode createCall(String funcName, List inputFields) {
-  final DrillSqlOperator op =
-  new DrillSqlOperator(funcName, inputFields.size(), true, false);
-  return rexBuilder.makeCall(op, inputFields);
-}
-  }
+  private final OptionManager options;
 
   public static Prel insertLocalExchanges(Prel prel, OptionManager 
options) {
 boolean isMuxEnabled = 
options.getOption(PlannerSettings.MUX_EXCHANGE.getOptionName()).bool_val;
 boolean isDeMuxEnabled = 
options.getOption(PlannerSettings.DEMUX_EXCHANGE.getOptionName()).bool_val;
 
 if (isMuxEnabled || isDeMuxEnabled) {
-  return prel.accept(new InsertLocalExchangeVisitor(isMuxEnabled, 
isDeMuxEnabled), null);
+  return prel.accept(new InsertLocalExchangeVisitor(options), null);
--- End diff --

Since the local variables isMuxEnabled/disabled are not being used anymore, 
you can remove them on lines 33, 34. 


> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.12.0
>Reporter: Hanumath Rao Maduri
>Assignee: Hanumath Rao Maduri
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. 
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from 
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +--+--+
> | text | json |
> +--+--+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], 
> groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec 
> [tableName=maprfs:///drill/tables/lineitem, condition=null], 
> columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor 
> fragments which are all merged on a single node with one merge receiver. 
> Doing so will create lot of memory pressure on the r

[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360091#comment-16360091
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

Github user HanumathRao commented on the issue:

https://github.com/apache/drill/pull/1110
  
@vrozov  Thank you for reviewing the code. I have incorporated all the 
review comments. Please let me know if anything needs to be changed.


> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.12.0
>Reporter: Hanumath Rao Maduri
>Assignee: Hanumath Rao Maduri
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. 
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from 
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +--+--+
> | text | json |
> +--+--+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], 
> groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec 
> [tableName=maprfs:///drill/tables/lineitem, condition=null], 
> columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor 
> fragments which are all merged on a single node with one merge receiver. 
> Doing so will create lot of memory pressure on the receiver node and also 
> execution bottleneck. To address this issue, merge receiver should be 
> multiphase merge receiver. 
> Ideally for large cluster one can introduce tree merges so that merging can 
> be done parallel. But as a first step I think it is better to use the 
> existing infrastructure for multiplexing operators to generate an OrderedMux 
> so that all the minor fragments pertaining to one DRILLBIT should be merged 
> and the merged data can be sent across to the receiver operator.
> On a 10 node cluster if each node processes 14 minor fragments.
> Current version of code merges 140 minor fragments
> the proposed version has two level merges 1 - 14 merge in each drillbit which 
> is parallel 
> and 10 minorfragments are merged at the receiver node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-11 Thread Aman Sinha (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360086#comment-16360086
 ] 

Aman Sinha commented on DRILL-6115:
---

While looking at the PR, I re-visited the design doc and had a few comments: 
 * If there is an ORDER BY with LIMIT, currently, we push LIMIT past the 
SingleMergeExchange such that each Sort on a minor fragment outputs the smaller 
set of rows.  I think your changes need to consider modifications to the 
LimitExchangeTransposeRule to avoid regression because now it needs to push the 
LIMIT past the OrderedMuxExchange otherwise each local merge will be merging 
lot more rows than before. 
 * The E_X_P_R_H_A_S_H_F_I_E_L_D=[hash32AsDouble($1, 1301011)])  that is 
computed for an UnorderedMuxExchange should be re-usable for the 
OrderedMuxExchange, right ?  We should not have to re-compute the hash value on 
the same order-by key twice since it adds CPU cost. 
 * Is it always cheaper to do this 2 level merge even when there is very small 
(e.g 2) minor fragments per node ?  I think we would want to have some control 
over that, so let's think about adding a knob and doing performance 
experiments. 

> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.12.0
>Reporter: Hanumath Rao Maduri
>Assignee: Hanumath Rao Maduri
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. 
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from 
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +--+--+
> | text | json |
> +--+--+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], 
> groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec 
> [tableName=maprfs:///drill/tables/lineitem, condition=null], 
> columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor 
> fragments which are all merged on a single node with one merge receiver. 
> Doing so will create lot of memory pressure on the receiver node and also 
> execution bottleneck. To address this issue, merge receiver should be 
> multiphase merge receiver. 
> Ideally for large cluster one can introduce tree merges so that merging can 
> be done parallel. But as a first step I think it is better to use the 
> existing infrastructure for multiplexing operators to generate an OrderedMux 
> so that all the minor fragments pertaining to one DRILLBIT should be merged 
> and the merged data can be sent across to the receiver operator.
> On a 10 node cluster if each node processes 14 minor fragments.
> Current version of code merges 140 minor fragments
> the proposed version has two level merges 1 - 14 merge in each drillbit which 
> is parallel 
> and 10 minorfragments are merged at the receiver node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16357680#comment-16357680
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167089878
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
 ---
@@ -55,10 +56,10 @@
 
 
   public RETURN visitExchange(Exchange exchange, EXTRA value) throws EXCEP;
+  public RETURN visitSingleMergeExchange(SingleMergeExchange exchange, 
EXTRA value) throws EXCEP;
--- End diff --

The same question as for `PrelVisitor.java`. Is it necessary to have 
separate `visitSingleMergeExchange`?


> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.12.0
>Reporter: Hanumath Rao Maduri
>Assignee: Hanumath Rao Maduri
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. 
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from 
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +--+--+
> | text | json |
> +--+--+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], 
> groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec 
> [tableName=maprfs:///drill/tables/lineitem, condition=null], 
> columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor 
> fragments which are all merged on a single node with one merge receiver. 
> Doing so will create lot of memory pressure on the receiver node and also 
> execution bottleneck. To address this issue, merge receiver should be 
> multiphase merge receiver. 
> Ideally for large cluster one can introduce tree merges so that merging can 
> be done parallel. But as a first step I think it is better to use the 
> existing infrastructure for multiplexing operators to generate an OrderedMux 
> so that all the minor fragments pertaining to one DRILLBIT should be merged 
> and the merged data can be sent across to the receiver operator.
> On a 10 node cluster if each node processes 14 minor fragments.
> Current version of code merges 140 minor fragments
> the proposed version has two level merges 1 - 14 merge in each drillbit which 
> is parallel 
> and 10 minorfragments are merged at the receiver node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16357681#comment-16357681
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167088606
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java
 ---
@@ -35,7 +38,9 @@
   public RETURN visitScan(ScanPrel prel, EXTRA value) throws EXCEP;
   public RETURN visitJoin(JoinPrel prel, EXTRA value) throws EXCEP;
   public RETURN visitProject(ProjectPrel prel, EXTRA value) throws EXCEP;
-
+  public RETURN visitHashToRandomExchange(HashToRandomExchangePrel prel, 
EXTRA value) throws EXCEP;
--- End diff --

Are 3 new methods necessary? Can `visitExchange` delegate to `prel` or use 
instance of? 


> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.12.0
>Reporter: Hanumath Rao Maduri
>Assignee: Hanumath Rao Maduri
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. 
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from 
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +--+--+
> | text | json |
> +--+--+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], 
> groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec 
> [tableName=maprfs:///drill/tables/lineitem, condition=null], 
> columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor 
> fragments which are all merged on a single node with one merge receiver. 
> Doing so will create lot of memory pressure on the receiver node and also 
> execution bottleneck. To address this issue, merge receiver should be 
> multiphase merge receiver. 
> Ideally for large cluster one can introduce tree merges so that merging can 
> be done parallel. But as a first step I think it is better to use the 
> existing infrastructure for multiplexing operators to generate an OrderedMux 
> so that all the minor fragments pertaining to one DRILLBIT should be merged 
> and the merged data can be sent across to the receiver operator.
> On a 10 node cluster if each node processes 14 minor fragments.
> Current version of code merges 140 minor fragments
> the proposed version has two level merges 1 - 14 merge in each drillbit which 
> is parallel 
> and 10 minorfragments are merged at the receiver node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16357683#comment-16357683
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167090991
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedMuxExchange.java
 ---
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import java.util.List;
+
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.common.logical.data.Order.Ordering;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * OrderedMuxExchange is a version of MuxExchange where the incoming 
batches are sorted
+ * merge operation is performed to produced a sorted stream as output.
+ */
+@JsonTypeName("ordered-mux-exchange")
+public class OrderedMuxExchange extends AbstractMuxExchange {
+  private final List orderings;
+
+  public OrderedMuxExchange(@JsonProperty("child") PhysicalOperator child, 
List orderings) {
--- End diff --

Json annotation for orderings?


> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.12.0
>Reporter: Hanumath Rao Maduri
>Assignee: Hanumath Rao Maduri
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. 
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from 
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +--+--+
> | text | json |
> +--+--+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], 
> groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec 
> [tableName=maprfs:///drill/tables/lineitem, condition=null], 
> columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor 
> fragments which are all merged on a single node with one merge receiver. 
> Doing so will create lot of memory pressure on the receiver node and also 
> execution bottleneck. To address this issue, merge receiver should be 
> multiphase merge receiver. 
> Ideally for large cluster one can introduce tree merges so that merging can 
> be done parallel. But as a first step I think it is better to use the 
> existing infrastructure for multiplexing operators to generate an OrderedMux 
> so that all the minor fragments pertaining to one DRILLBIT should be merged 
> and the merged data can be sent across to the receiver operator.
> On a 10 node cluster if each node processes 14 minor fragments.
> Current version of code merges 140 minor fragments
> the proposed version has two level merges 1 - 14 merge in each drillbit which 
> is parallel 
> and 10 minorfragments are merged at the receiver node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16357682#comment-16357682
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167092037
  
--- Diff: 
exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java
 ---
@@ -59,6 +59,7 @@ public static void setup() throws Exception {
 ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
 .configProperty(ExecConstants.EXTERNAL_SORT_SPILL_THRESHOLD, 1) // 
Unmanaged
 .configProperty(ExecConstants.EXTERNAL_SORT_SPILL_GROUP_SIZE, 1) 
// Unmanaged
+.configProperty(ExecConstants.EXTERNAL_SORT_MAX_MEMORY, 10 * 1024 
* 1024) //use less memory for sorting.
--- End diff --

Why is the change necessary?


> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.12.0
>Reporter: Hanumath Rao Maduri
>Assignee: Hanumath Rao Maduri
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. 
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from 
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +--+--+
> | text | json |
> +--+--+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], 
> groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec 
> [tableName=maprfs:///drill/tables/lineitem, condition=null], 
> columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor 
> fragments which are all merged on a single node with one merge receiver. 
> Doing so will create lot of memory pressure on the receiver node and also 
> execution bottleneck. To address this issue, merge receiver should be 
> multiphase merge receiver. 
> Ideally for large cluster one can introduce tree merges so that merging can 
> be done parallel. But as a first step I think it is better to use the 
> existing infrastructure for multiplexing operators to generate an OrderedMux 
> so that all the minor fragments pertaining to one DRILLBIT should be merged 
> and the merged data can be sent across to the receiver operator.
> On a 10 node cluster if each node processes 14 minor fragments.
> Current version of code merges 140 minor fragments
> the proposed version has two level merges 1 - 14 merge in each drillbit which 
> is parallel 
> and 10 minorfragments are merged at the receiver node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16357679#comment-16357679
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1110#discussion_r167091413
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedMuxExchange.java
 ---
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.config;
+
+import java.util.List;
+
+import org.apache.drill.exec.physical.MinorFragmentEndpoint;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.common.logical.data.Order.Ordering;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * OrderedMuxExchange is a version of MuxExchange where the incoming 
batches are sorted
+ * merge operation is performed to produced a sorted stream as output.
+ */
+@JsonTypeName("ordered-mux-exchange")
+public class OrderedMuxExchange extends AbstractMuxExchange {
+  private final List orderings;
+
+  public OrderedMuxExchange(@JsonProperty("child") PhysicalOperator child, 
List orderings) {
+super(child);
+this.orderings = orderings;
+  }
+
+  @Override
+  public Receiver getReceiver(int minorFragmentId) {
+createSenderReceiverMapping();
+
+List senders = 
receiverToSenderMapping.get(minorFragmentId);
+if (senders == null || senders.size() <= 0) {
--- End diff --

Add debug level info for `receiverToSenderMapping` and minorFragmentId.


> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.12.0
>Reporter: Hanumath Rao Maduri
>Assignee: Hanumath Rao Maduri
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. 
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from 
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +--+--+
> | text | json |
> +--+--+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], 
> groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec 
> [tableName=maprfs:///drill/tables/lineitem, condition=null], 
> columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor 
> fragments which are all merged on a single node with one merge receiver. 
> Doing so will create lot of memory pressure on the receiver node and also 
> execution bottleneck. To address this issue, merge receiver should be 
> multiphase merge receiver. 
> Ideally for large cluster one can introduce tree merges so that merging can 
> be done parallel. But as a first step I think it is better to use the 
> existing infrastructure for multiplexing operators to generate an OrderedMux 
> so that all the minor fragments pertaining to one DRILLBIT should be merged 
> and the merged data can be sent across to the receiver operator.
> On a 10 node cluster if each node processes 14 minor fragments.
> Current version of code merges 140 

[jira] [Commented] (DRILL-6115) SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

2018-02-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16351858#comment-16351858
 ] 

ASF GitHub Bot commented on DRILL-6115:
---

GitHub user HanumathRao opened a pull request:

https://github.com/apache/drill/pull/1110

DRILL-6115: SingleMergeExchange is not scaling up when many minor fra…

…gments are allocated for a query.

Currently a singlemerge exchange is merging all the fragment streams on 
foreman. This can cause cpu bottleneck and also huge memory consumption at the 
foreman. 

This PR contains changes to introduce a new Multiplex Operator called 
OrderedMuxExchange which merges the minor fragment streams pertaining to one 
drillbit and send as one output stream to the foreman. 

The existing multiplex mechanism is used to introduce these operators.

Please review this PR.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HanumathRao/drill DRILL-6115

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/drill/pull/1110.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1110


commit 43a71277aeec9bb377181728b2ce563437d7e46d
Author: hmaduri 
Date:   2018-01-22T00:42:28Z

DRILL-6115: SingleMergeExchange is not scaling up when many minor fragments 
are allocated for a query.




> SingleMergeExchange is not scaling up when many minor fragments are allocated 
> for a query.
> --
>
> Key: DRILL-6115
> URL: https://issues.apache.org/jira/browse/DRILL-6115
> Project: Apache Drill
>  Issue Type: Improvement
>  Components: Execution - Relational Operators
>Affects Versions: 1.12.0
>Reporter: Hanumath Rao Maduri
>Assignee: Hanumath Rao Maduri
>Priority: Major
> Fix For: 1.13.0
>
> Attachments: Enhancing Drill to multiplex ordered merge exchanges.docx
>
>
> SingleMergeExchange is created when a global order is required in the output. 
> The following query produces the SingleMergeExchange.
> {code:java}
> 0: jdbc:drill:zk=local> explain plan for select L_LINENUMBER from 
> dfs.`/drill/tables/lineitem` order by L_LINENUMBER;
> +--+--+
> | text | json |
> +--+--+
> | 00-00 Screen
> 00-01 Project(L_LINENUMBER=[$0])
> 00-02 SingleMergeExchange(sort0=[0])
> 01-01 SelectionVectorRemover
> 01-02 Sort(sort0=[$0], dir0=[ASC])
> 01-03 HashToRandomExchange(dist0=[[$0]])
> 02-01 Scan(table=[[dfs, /drill/tables/lineitem]], 
> groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec 
> [tableName=maprfs:///drill/tables/lineitem, condition=null], 
> columns=[`L_LINENUMBER`], maxwidth=15]])
> {code}
> On a 10 node cluster if the table is huge then DRILL can spawn many minor 
> fragments which are all merged on a single node with one merge receiver. 
> Doing so will create lot of memory pressure on the receiver node and also 
> execution bottleneck. To address this issue, merge receiver should be 
> multiphase merge receiver. 
> Ideally for large cluster one can introduce tree merges so that merging can 
> be done parallel. But as a first step I think it is better to use the 
> existing infrastructure for multiplexing operators to generate an OrderedMux 
> so that all the minor fragments pertaining to one DRILLBIT should be merged 
> and the merged data can be sent across to the receiver operator.
> On a 10 node cluster if each node processes 14 minor fragments.
> Current version of code merges 140 minor fragments
> the proposed version has two level merges 1 - 14 merge in each drillbit which 
> is parallel 
> and 10 minorfragments are merged at the receiver node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)