Dmitry Lychagin has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2277

Change subject: [ASTERIXDB-2244][RTM] Implement micro union-all operator
......................................................................

[ASTERIXDB-2244][RTM] Implement micro union-all operator

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- implement support for binary micro operators in subplans
- implement micro union-all operator
- fix free variables visitor

Change-Id: I11be926f175889978c144dd4483ec565d3d86e2d
---
M asterixdb/LICENSE
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.1.ddl.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.2.update.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.3.query.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/non_unary_subplan_01/non_unary_subplan_01.1.adm
M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
M asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
M 
asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
M 
hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
A 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnionAllPOperator.java
A 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroUnionAllPOperator.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
M 
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
M 
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
M 
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/AlgebricksPipeline.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
M 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
A 
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/union/MicroUnionAllRuntimeFactory.java
M 
hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
42 files changed, 1,007 insertions(+), 612 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/77/2277/1

diff --git a/asterixdb/LICENSE b/asterixdb/LICENSE
index bff778e..18245c5 100644
--- a/asterixdb/LICENSE
+++ b/asterixdb/LICENSE
@@ -273,6 +273,184 @@
    Bootstrap-based projects, I would certainly appreciate any form of support,
    even a nice Tweet is enough. Of course if you want, you can say thank you 
and
    support me by buying more icons on GLYPHICONS.com.
+---
+   Portions of the AsterixDB QueryUI
+       located at:
+         asterix-app/src/main/resources/queryui/js/json-formatter.min.js,
+       and
+         asterix-app/src/main/resources/queryui/css/json-formatter.min.css
+
+   are available under the following license:
+---
+   Copyright 2014 Mohsen Azimi
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+---
+   Portions of the AsterixDB QueryUI
+       located at:
+         asterix-app/src/main/resources/queryui/js/bootstrap.min.js,
+         asterix-app/src/main/resources/queryui/css/bootstrap.min.css,
+         asterix-app/src/main/resources/queryui/css/bootstrap-theme.min.css,
+         asterix-app/src/main/resources/queryui/css/bootstrap-theme.min.css,
+         
asterix-app/src/main/resources/queryui/fonts/glyphicons-halflings-regular.eot,
+         
asterix-app/src/main/resources/queryui/fonts/glyphicons-halflings-regular.svg,
+         
asterix-app/src/main/resources/queryui/fonts/glyphicons-halflings-regular.ttf,
+         
asterix-app/src/main/resources/queryui/fonts/glyphicons-halflings-regular.woff,
+       and
+         
asterix-app/src/main/resources/queryui/fonts/glyphicons-halflings-regular.woff2
+
+   are available under the following license:
+---
+   The MIT License (MIT)
+
+   Copyright (c) 2011-2015 Twitter, Inc
+
+   Permission is hereby granted, free of charge, to any person obtaining a copy
+   of this software and associated documentation files (the "Software"), to 
deal
+   in the Software without restriction, including without limitation the rights
+   to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+   copies of the Software, and to permit persons to whom the Software is
+   furnished to do so, subject to the following conditions:
+
+   The above copyright notice and this permission notice shall be included in
+   all copies or substantial portions of the Software.
+
+   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+   FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+   AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+   LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
FROM,
+   OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+   THE SOFTWARE.
+
+   queryui/css/bootstrap.min.css also contains normalize.css v3.0.3
+   (http://github.com/necolas/normalize.css), having the following copyright:
+   --
+   Copyright © Nicolas Gallagher and Jonathan Neal
+
+   Permission is hereby granted, free of charge, to any person obtaining a 
copy of
+   this software and associated documentation files (the "Software"), to deal 
in
+   the Software without restriction, including without limitation the rights to
+   use, copy, modify, merge, publish, distribute, sublicense, and/or sell 
copies of
+   the Software, and to permit persons to whom the Software is furnished to do 
so,
+   subject to the following conditions:
+
+   The above copyright notice and this permission notice shall be included in 
all
+   copies or substantial portions of the Software.
+
+   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 
FITNESS
+   FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS 
OR
+   COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 
WHETHER
+   IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+   CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+   --
+   Credit for font files: queryui/fonts/glyphicons-halflings-regular.eot,
+                          queryui/fonts/glyphicons-halflings-regular.svg,
+                          queryui/fonts/glyphicons-halflings-regular.ttf,
+                          queryui/fonts/glyphicons-halflings-regular.woff,
+                        and
+                          queryui/fonts/glyphicons-halflings-regular.woff2
+
+   GLYPHICONS Halflings font is also released as an extension of a Bootstrap
+   (www.getbootstrap.com) for free and it is released under the same license as
+   Bootstrap. While you are not required to include attribution on your
+   Bootstrap-based projects, I would certainly appreciate any form of support,
+   even a nice Tweet is enough. Of course if you want, you can say thank you 
and
+   support me by buying more icons on GLYPHICONS.com.
+---
+
+   Portions of the AsterixDB QueryUI
+       located at:
+         asterix-app/src/main/resources/queryui/js/angular.min.js
+
+   are available under The MIT License:
+---
+   Copyright (c) 2010-2016 Google, Inc. http://angularjs.org
+
+   Permission is hereby granted, free of charge, to any person obtaining a copy
+   of this software and associated documentation files (the "Software"), to 
deal
+   in the Software without restriction, including without limitation the rights
+   to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+   copies of the Software, and to permit persons to whom the Software is
+   furnished to do so, subject to the following conditions:
+
+   The above copyright notice and this permission notice shall be included in
+   all copies or substantial portions of the Software.
+
+   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+   FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+   AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+   LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
FROM,
+   OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+   THE SOFTWARE.
+---
+   Portions of the AsterixDB QueryUI
+       located at:
+         asterix-app/src/main/resources/queryui/js/codemirror.js,
+         asterix-app/src/main/resources/queryui/js/javascript.js,
+       and
+         asterix-app/src/main/resources/queryui/css/codemirror.css
+
+   are available under The MIT License:
+---
+   Copyright (C) 2016 by Marijn Haverbeke <[email protected]> and others
+
+   Permission is hereby granted, free of charge, to any person obtaining a copy
+   of this software and associated documentation files (the "Software"), to 
deal
+   in the Software without restriction, including without limitation the rights
+   to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+   copies of the Software, and to permit persons to whom the Software is
+   furnished to do so, subject to the following conditions:
+
+   The above copyright notice and this permission notice shall be included in
+   all copies or substantial portions of the Software.
+
+   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+   FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+   AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+   LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
FROM,
+   OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+   THE SOFTWARE.
+---
+   Portions of the AsterixDB QueryUI
+       located at:
+         asterix-app/src/main/resources/queryui/js/ui-codemirror.js
+
+   are available under The MIT License:
+---
+   Copyright (c) 2012 the AngularUI Team, http://angular-ui.github.com
+
+   Permission is hereby granted, free of charge, to any person obtaining a copy
+   of this software and associated documentation files (the "Software"), to 
deal
+   in the Software without restriction, including without limitation the rights
+   to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+   copies of the Software, and to permit persons to whom the Software is
+   furnished to do so, subject to the following conditions:
+
+   The above copyright notice and this permission notice shall be included in
+   all copies or substantial portions of the Software.
+
+   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+   FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+   AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+   LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
FROM,
+   OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+   THE SOFTWARE.
+---
    Portions of the AsterixDB runtime
        located at:
          asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/*
@@ -673,357 +851,3 @@
    such warranty or additional liability.
    END OF TERMS AND CONDITIONS
 ---
-File Saver Portions of the AsterixDB Dashboard located at:
- 
asterix-app/src/main/resources/dashboard/src/app/dashboard/query/output.component.ts
- 
asterix-app/src/main/resources/dashboard/static/main.37b7b7cad656490b195a.bundle.js
- 
asterix-app/src/main/resources/dashboard/static/styles.9f50282210bba5318775.bundle
- 
asterix-app/src/main/resources/dashboard/static/scripts.da68998bdd77aff4e764.bundle.js
- 
asterix-app/src/main/resources/dashboard/static/polyfills.32ca5670d6503e090789.bundle.js
- 
asterix-app/src/main/resources/dashboard/static/inline.66bd6b83f86cf773a001.bundle.js
- asterix-app/src/main/resources/dashboard/static/index.html
-
-  are available under The MIT License:
----
-The MIT License
-
-Copyright © 2016 Eli Grey.
-
-Permission is hereby granted, free of charge, to any person obtaining a copy of
-this software and associated documentation files (the "Software"), to deal in
-the Software without restriction, including without limitation the rights to
-use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 
of
-the Software, and to permit persons to whom the Software is furnished to do so,
-subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in all
-copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 
FITNESS
-FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
-COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
-IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
-CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
----
-ReactiveX/rxjs located at:
- asterix-app/src/main/resources/dashboard/src/app/dashboard/metadata/
- 
asterix-app/src/main/resources/dashboard/src/app/dashboard/metadata/datasets-collection/
- 
asterix-app/src/main/resources/dashboard/src/app/dashboard/metadata/datatypes-collection/
- 
asterix-app/src/main/resources/dashboard/src/app/dashboard/metadata/dataverses-collection/
- asterix-app/src/main/resources/dashboard/src/app/dashboard/query/
- asterix-app/src/main/resources/dashboard/src/app/dashboard/
- asterix-app/src/main/resources/dashboard/src/app/
- 
asterix-app/src/main/resources/dashboard/static/main.37b7b7cad656490b195a.bundle.js
- 
asterix-app/src/main/resources/dashboard/static/styles.9f50282210bba5318775.bundle
- 
asterix-app/src/main/resources/dashboard/static/scripts.da68998bdd77aff4e764.bundle.js
- 
asterix-app/src/main/resources/dashboard/static/polyfills.32ca5670d6503e090789.bundle.js
- 
asterix-app/src/main/resources/dashboard/static/inline.66bd6b83f86cf773a001.bundle.js
- asterix-app/src/main/resources/dashboard/static/index.html
-is licensed under the Apache License 2.0
----
-PrimeNG Portions of the AsterixDB Dashboard located at:
- 
asterix-app/src/main/resources/dashboard/src/app/dashboard/query/output.component.ts
- 
asterix-app/src/main/resources/dashboard/src/app/dashboard/query/output.component.html
- 
asterix-app/src/main/resources/dashboard/src/app/dashboard/query/output.component.scss
- 
asterix-app/src/main/resources/dashboard/src/app/dashboard/query/metadata.component.ts
- 
asterix-app/src/main/resources/dashboard/src/app/dashboard/query/metadata.component.html
- 
asterix-app/src/main/resources/dashboard/src/app/dashboard/query/metadata.component.scss
- 
asterix-app/src/main/resources/dashboard/static/main.37b7b7cad656490b195a.bundle.js
- 
asterix-app/src/main/resources/dashboard/static/styles.9f50282210bba5318775.bundle
- 
asterix-app/src/main/resources/dashboard/static/scripts.da68998bdd77aff4e764.bundle.js
- 
asterix-app/src/main/resources/dashboard/static/polyfills.32ca5670d6503e090789.bundle.js
- 
asterix-app/src/main/resources/dashboard/static/inline.66bd6b83f86cf773a001.bundle.js
- asterix-app/src/main/resources/dashboard/static/index.html
-
-are available under The MIT License:
----
-The MIT License (MIT)
-
-Copyright (c) 2016-2017 PrimeTek
-
-Permission is hereby granted, free of charge, to any person obtaining a copy of
-this software and associated documentation files (the "Software"), to deal in
-the Software without restriction, including without limitation the rights to
-use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 
of
-the Software, and to permit persons to whom the Software is furnished to do so,
-subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in all
-copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 
FITNESS
-FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
-COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
-IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
-CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
----
-
-NGRX Portions of the AsterixDB Dashboard located at:
- asterix-app/src/main/resources/dashboard/src/app/dashboard/metadata/
- 
asterix-app/src/main/resources/dashboard/src/app/dashboard/metadata/datasets-collection/
- 
asterix-app/src/main/resources/dashboard/src/app/dashboard/metadata/datatypes-collection/
- 
asterix-app/src/main/resources/dashboard/src/app/dashboard/metadata/dataverses-collection/
- asterix-app/src/main/resources/dashboard/src/app/dashboard/query/
- asterix-app/src/main/resources/dashboard/src/app/dashboard/
- asterix-app/src/main/resources/dashboard/src/app/dashboard/
- asterix-app/src/main/resources/dashboard/src/app/
- 
asterix-app/src/main/resources/dashboard/static/main.37b7b7cad656490b195a.bundle.js
- 
asterix-app/src/main/resources/dashboard/static/styles.9f50282210bba5318775.bundle
- 
asterix-app/src/main/resources/dashboard/static/scripts.da68998bdd77aff4e764.bundle.js
- 
asterix-app/src/main/resources/dashboard/static/polyfills.32ca5670d6503e090789.bundle.js
- 
asterix-app/src/main/resources/dashboard/static/inline.66bd6b83f86cf773a001.bundle.js
- asterix-app/src/main/resources/dashboard/static/index.html
-
-are available under The MIT License:
----
-The MIT License (MIT)
-
-Copyright (c) 2017 Brandon Roberts, Mike Ryan, Victor Savkin, Rob Wormald
-
-Permission is hereby granted, free of charge, to any person obtaining a copy
-of this software and associated documentation files (the "Software"), to deal
-in the Software without restriction, including without limitation the rights
-to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-copies of the Software, and to permit persons to whom the Software is
-furnished to do so, subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in all
-copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-SOFTWARE.
----
-Codemirror Portions of the AsterixDB Dashboard located at:
- 
asterix-app/src/main/resources/dashboard/src/app/dashboard/query/input.component.ts
- 
asterix-app/src/main/resources/dashboard/src/app/dashboard/query/input.component.html
- 
asterix-app/src/main/resources/dashboard/src/app/dashboard/query/input.component.scss
- 
asterix-app/src/main/resources/dashboard/src/app/dashboard/query/codemirror.component.ts
- 
asterix-app/src/main/resources/dashboard/src/app/dashboard/query/codemirror.component.scss
- 
asterix-app/src/main/resources/dashboard/src/app/dashboard/metadata/input-metadata.component.ts
- 
asterix-app/src/main/resources/dashboard/src/app/dashboard/metadata/input-metadata.component.html
- 
asterix-app/src/main/resources/dashboard/src/app/dashboard/metadata/input-metadata.component.scss
- 
asterix-app/src/main/resources/dashboard/src/app/dashboard/metadata/codemirror-metadata.component.ts
- 
asterix-app/src/main/resources/dashboard/src/app/dashboard/metadata/codemirror-metadata.component.scss
- 
asterix-app/src/main/resources/dashboard/static/main.37b7b7cad656490b195a.bundle.js
- 
asterix-app/src/main/resources/dashboard/static/styles.9f50282210bba5318775.bundle
- 
asterix-app/src/main/resources/dashboard/static/scripts.da68998bdd77aff4e764.bundle.js
- 
asterix-app/src/main/resources/dashboard/static/polyfills.32ca5670d6503e090789.bundle.js
- 
asterix-app/src/main/resources/dashboard/static/inline.66bd6b83f86cf773a001.bundle.js
- asterix-app/src/main/resources/dashboard/static/index.html
-
-   are available under The MIT License:
----
-   MIT License
-
-Copyright (C) 2017 by Marijn Haverbeke <[email protected]> and others
-
-Permission is hereby granted, free of charge, to any person obtaining a copy
-of this software and associated documentation files (the "Software"), to deal
-in the Software without restriction, including without limitation the rights
-to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-copies of the Software, and to permit persons to whom the Software is
-furnished to do so, subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in
-all copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-THE SOFTWARE.
-
----
-Angular Portions of the AsterixDB Dashboard located at:
- asterix-app/src/main/resources/dashboard/src/
- 
asterix-app/src/main/resources/dashboard/static/main.37b7b7cad656490b195a.bundle.js
- 
asterix-app/src/main/resources/dashboard/static/styles.9f50282210bba5318775.bundle
- 
asterix-app/src/main/resources/dashboard/static/scripts.da68998bdd77aff4e764.bundle.js
- 
asterix-app/src/main/resources/dashboard/static/polyfills.32ca5670d6503e090789.bundle.js
- 
asterix-app/src/main/resources/dashboard/static/inline.66bd6b83f86cf773a001.bundle.js
- asterix-app/src/main/resources/dashboard/static/index.html
-
-   are available under The MIT License:
----
-The MIT License
-
-Copyright (c) 2014-2017 Google, Inc. http://angular.io
-
-Permission is hereby granted, free of charge, to any person obtaining a copy
-of this software and associated documentation files (the "Software"), to deal
-in the Software without restriction, including without limitation the rights
-to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-copies of the Software, and to permit persons to whom the Software is
-furnished to do so, subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in
-all copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-THE SOFTWARE.
-
----
-Angular Material, hammerjs Portions of the AsterixDB Dashboard located at:
- asterix-app/src/main/resources/dashboard/src/app/dashboard/
- asterix-app/src/main/resources/dashboard/src/app/material.module.ts
- asterix-app/src/main/resources/dashboard/src/main.ts
- 
asterix-app/src/main/resources/dashboard/static/main.37b7b7cad656490b195a.bundle.js
- 
asterix-app/src/main/resources/dashboard/static/styles.9f50282210bba5318775.bundle
- 
asterix-app/src/main/resources/dashboard/static/scripts.da68998bdd77aff4e764.bundle.js
- 
asterix-app/src/main/resources/dashboard/static/polyfills.32ca5670d6503e090789.bundle.js
- 
asterix-app/src/main/resources/dashboard/static/inline.66bd6b83f86cf773a001.bundle.js
- asterix-app/src/main/resources/dashboard/static/index.html
-
-   are available under The MIT License:
----
-   The MIT License
-
-Copyright (c) 2017 Google LLC.
-
-Permission is hereby granted, free of charge, to any person obtaining a copy
-of this software and associated documentation files (the "Software"), to deal
-in the Software without restriction, including without limitation the rights
-to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-copies of the Software, and to permit persons to whom the Software is
-furnished to do so, subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in
-all copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-THE SOFTWARE.
-
-
----
-Core JS Material Portions of the AsterixDB Dashboard located at:
- asterix-app/src/main/resources/dashboard/src/
- 
asterix-app/src/main/resources/dashboard/static/main.37b7b7cad656490b195a.bundle.js
- 
asterix-app/src/main/resources/dashboard/static/styles.9f50282210bba5318775.bundle
- 
asterix-app/src/main/resources/dashboard/static/scripts.da68998bdd77aff4e764.bundle.js
- 
asterix-app/src/main/resources/dashboard/static/polyfills.32ca5670d6503e090789.bundle.js
- 
asterix-app/src/main/resources/dashboard/static/inline.66bd6b83f86cf773a001.bundle.js
- asterix-app/src/main/resources/dashboard/static/index.html
-
-   are available under following License:
-   https://github.com/zloirock/core-js/blob/master/LICENSE
----
-
-Copyright (c) 2014-2017 Denis Pushkarev
-
-Permission is hereby granted, free of charge, to any person obtaining a copy
-of this software and associated documentation files (the "Software"), to deal
-in the Software without restriction, including without limitation the rights
-to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-copies of the Software, and to permit persons to whom the Software is
-furnished to do so, subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in
-all copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-THE SOFTWARE.
----
-Roboto Font Portions of the AsterixDB Dashboard located at:
- asterix-app/src/main/resources/dashboard/src/index.html
- 
asterix-app/src/main/resources/dashboard/static/roboto-v15-latin-regular.3d3a53586bd78d1069ae.svg
- 
asterix-app/src/main/resources/dashboard/static/roboto-v15-latin-regular.7e367be02cd17a96d513.woff2
- 
asterix-app/src/main/resources/dashboard/static/roboto-v15-latin-regular.9f916e330c478bbfa2a0.eot
- 
asterix-app/src/main/resources/dashboard/static/roboto-v15-latin-regular.16e1d930cf13fb7a9563.woff
- 
asterix-app/src/main/resources/dashboard/static/roboto-v15-latin-regular.38861cba61c66739c145.ttf
- asterix-app/src/main/resources/dashboard/static/index.html
-   are available under following License: Apache2.0
-   https://github.com/google/roboto/blob/master/LICENSE
----
-Font Awesome Portions of the AsterixDB Dashboard, used in primeng components 
located at:
- asterix-app/src/main/resources/dashboard/src/index.html
- asterix-app/src/main/resources/dashboard/static/index.html
-
-    http://fontawesome.io/license/ Applies to all CSS and LESS files in the
-    following directories if exist: font-awesome/css/, font-awesome/less/, and
-    font-awesome/scss/.  and downloaded from CDN network services and loaded in
-    index.html License: MIT License URL:
-    http://opensource.org/licenses/mit-license.html
----
[email protected] portions of the AsterixDB Dashboard
-       located at:
-        asterix-app/src/main/resources/dashboard/
-   are available under The MIT License:
----
-MIT
-Copyright JS Foundation and other contributors
-
-Permission is hereby granted, free of charge, to any person obtaining
-a copy of this software and associated documentation files (the
-'Software'), to deal in the Software without restriction, including
-without limitation the rights to use, copy, modify, merge, publish,
-distribute, sublicense, and/or sell copies of the Software, and to
-permit persons to whom the Software is furnished to do so, subject to
-the following conditions:
-
-The above copyright notice and this permission notice shall be
-included in all copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND,
-EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
-MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
-IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
-CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
-TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
-SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
----
[email protected] Portions of the AsterixDB Dashboard located at:
- asterix-app/src/main/resources/dashboard/src/
- 
asterix-app/src/main/resources/dashboard/static/main.37b7b7cad656490b195a.bundle.js
- 
asterix-app/src/main/resources/dashboard/static/styles.9f50282210bba5318775.bundle
- 
asterix-app/src/main/resources/dashboard/static/scripts.da68998bdd77aff4e764.bundle.js
- 
asterix-app/src/main/resources/dashboard/static/polyfills.32ca5670d6503e090789.bundle.js
- 
asterix-app/src/main/resources/dashboard/static/inline.66bd6b83f86cf773a001.bundle.js
- asterix-app/src/main/resources/dashboard/static/index.html
-   are available under The MIT License:
----
-MIT
-The MIT License
-
-Copyright (c) 2016 Google, Inc.
-
-Permission is hereby granted, free of charge, to any person obtaining a copy
-of this software and associated documentation files (the "Software"), to deal
-in the Software without restriction, including without limitation the rights
-to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-copies of the Software, and to permit persons to whom the Software is
-furnished to do so, subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in
-all copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-THE SOFTWARE.
-
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index d22ec54..802635b 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -80,20 +80,20 @@
             return false;
         }
 
-        computeDefaultPhysicalOp(op, context);
+        computeDefaultPhysicalOp(op, true, context);
         context.addToDontApplySet(this, op);
         return true;
     }
 
-    private static void setPhysicalOperators(ILogicalPlan plan, 
IOptimizationContext context)
+    private static void setPhysicalOperators(ILogicalPlan plan, boolean 
topLevelOp, IOptimizationContext context)
             throws AlgebricksException {
         for (Mutable<ILogicalOperator> root : plan.getRoots()) {
-            computeDefaultPhysicalOp((AbstractLogicalOperator) 
root.getValue(), context);
+            computeDefaultPhysicalOp((AbstractLogicalOperator) 
root.getValue(), topLevelOp, context);
         }
     }
 
-    private static void computeDefaultPhysicalOp(AbstractLogicalOperator op, 
IOptimizationContext context)
-            throws AlgebricksException {
+    private static void computeDefaultPhysicalOp(AbstractLogicalOperator op, 
boolean topLevelOp,
+            IOptimizationContext context) throws AlgebricksException {
         PhysicalOptimizationConfig physicalOptimizationConfig = 
context.getPhysicalOptimizationConfig();
         if (op.getOperatorTag().equals(LogicalOperatorTag.GROUP)) {
             GroupByOperator gby = (GroupByOperator) op;
@@ -209,11 +209,11 @@
         if (op.getPhysicalOperator() == null) {
             switch (op.getOperatorTag()) {
                 case INNERJOIN: {
-                    
JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, context);
+                    
JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, topLevelOp, 
context);
                     break;
                 }
                 case LEFTOUTERJOIN: {
-                    
JoinUtils.setJoinAlgorithmAndExchangeAlgo((LeftOuterJoinOperator) op, context);
+                    
JoinUtils.setJoinAlgorithmAndExchangeAlgo((LeftOuterJoinOperator) op, 
topLevelOp, context);
                     break;
                 }
                 case UNNEST_MAP:
@@ -279,11 +279,11 @@
         if (op.hasNestedPlans()) {
             AbstractOperatorWithNestedPlans nested = 
(AbstractOperatorWithNestedPlans) op;
             for (ILogicalPlan p : nested.getNestedPlans()) {
-                setPhysicalOperators(p, context);
+                setPhysicalOperators(p, false, context);
             }
         }
         for (Mutable<ILogicalOperator> opRef : op.getInputs()) {
-            computeDefaultPhysicalOp((AbstractLogicalOperator) 
opRef.getValue(), context);
+            computeDefaultPhysicalOp((AbstractLogicalOperator) 
opRef.getValue(), topLevelOp, context);
         }
     }
 
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
index 8ab8b1d..461614a 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.optimizer.rules.subplan;
 
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -56,10 +57,8 @@
 import 
org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
-import com.google.common.collect.ImmutableSet;
-
 /*
-This rule  is to remove SubplanOperators containing DataScan, InnerJoin, 
LeftOuterJoin, UnionAll or Distinct. Given a qualified Subplan operator called 
S1,
+This rule  is to remove SubplanOperators containing DataScan, InnerJoin, 
LeftOuterJoin. Given a qualified Subplan operator called S1,
 Let's call its input operator O1.
 
 General Cases
@@ -348,9 +347,8 @@
             IOptimizationContext context) throws AlgebricksException {
         SubplanOperator subplanOp = (SubplanOperator) opRef.getValue();
         if (!SubplanFlatteningUtil.containsOperators(subplanOp,
-                ImmutableSet.of(LogicalOperatorTag.DATASOURCESCAN, 
LogicalOperatorTag.INNERJOIN,
-                        // We don't have nested runtime for union-all and 
distinct hence we have to include them here.
-                        LogicalOperatorTag.LEFTOUTERJOIN, 
LogicalOperatorTag.UNIONALL, LogicalOperatorTag.DISTINCT))) {
+                EnumSet.of(LogicalOperatorTag.DATASOURCESCAN, 
LogicalOperatorTag.INNERJOIN,
+                        LogicalOperatorTag.LEFTOUTERJOIN))) {
             return new Pair<>(false, new LinkedHashMap<>());
         }
         Mutable<ILogicalOperator> inputOpRef = subplanOp.getInputs().get(0);
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java
index 377e96d..fa70c1e 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.optimizer.rules.subplan;
 
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -208,7 +209,7 @@
         if (!OperatorManipulationUtil.ancestorOfOperators(
                 subplanOp.getNestedPlans().get(0).getRoots().get(0).getValue(),
                 // we don't need to check recursively for this special 
rewriting.
-                ImmutableSet.of(LogicalOperatorTag.INNERJOIN, 
LogicalOperatorTag.LEFTOUTERJOIN))) {
+                EnumSet.of(LogicalOperatorTag.INNERJOIN, 
LogicalOperatorTag.LEFTOUTERJOIN))) {
             return new Pair<Boolean, ILogicalOperator>(false, null);
         }
         SubplanSpecialFlatteningCheckVisitor visitor = new 
SubplanSpecialFlatteningCheckVisitor();
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 23a3eda..bd02fd0 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -233,7 +233,7 @@
                     projectionList[projCount++] = i;
                 }
                 IPushRuntime assignOp = new AssignRuntimeFactory(outColumns, 
secondaryFieldAccessEvalFactories,
-                        projectionList, true).createPushRuntime(ctx);
+                        projectionList, true).createPushRuntime(ctx)[0];
                 insertOp.setOutputFrameWriter(0, assignOp, 
primaryIndexInfo.rDesc);
                 assignOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc);
                 SecondaryIndexInfo secondaryIndexInfo = new 
SecondaryIndexInfo(primaryIndexInfo, secondaryIndex);
@@ -268,7 +268,7 @@
             NoMergePolicyFactory mergePolicyFactory, Map<String, String> 
mergePolicyProperties, int[] filterFields,
             int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
             StorageComponentProvider storageComponentProvider) throws 
HyracksDataException, AlgebricksException {
-        IPushRuntime emptyTupleOp = new 
EmptyTupleSourceRuntimeFactory().createPushRuntime(ctx);
+        IPushRuntime emptyTupleOp = new 
EmptyTupleSourceRuntimeFactory().createPushRuntime(ctx)[0];
         JobSpecification spec = new JobSpecification();
         PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, 
primaryKeyTypes, recordType, metaType,
                 mergePolicyFactory, mergePolicyProperties, filterFields, 
primaryKeyIndexes, primaryKeyIndicators);
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.1.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.1.ddl.sqlpp
new file mode 100644
index 0000000..c1ce4a3
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.1.ddl.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+create type TType as open
+{ id: bigint };
+
+create dataset TData (TType) primary key id;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.2.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.2.update.sqlpp
new file mode 100644
index 0000000..3ce9554
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.2.update.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+insert into TData ( [
+{'id':1, 'x':1, 'f':19},
+{'id':2, 'x':2, 'f':12},
+{'id':3, 'x':1, 'f':10},
+{'id':4, 'x':2, 'f':17},
+{'id':5, 'x':1, 'f':12}
+]);
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.3.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.3.query.sqlpp
new file mode 100644
index 0000000..b825157
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.3.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+select x,
+array_sum((
+   select value a.f
+   from g as p
+   union all
+   select value a.f
+   from g as w
+)) s
+from TData as a
+group by a.x as x group as g
+order by x
+;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/non_unary_subplan_01/non_unary_subplan_01.1.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/non_unary_subplan_01/non_unary_subplan_01.1.adm
new file mode 100644
index 0000000..5071183
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/non_unary_subplan_01/non_unary_subplan_01.1.adm
@@ -0,0 +1,2 @@
+{ "x": 1, "s": 82 }
+{ "x": 2, "s": 58 }
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 14784d6..6779bad 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -6422,6 +6422,11 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="subquery">
+      <compilation-unit name="non_unary_subplan_01">
+        <output-dir compare="Text">non_unary_subplan_01</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="subquery">
       <compilation-unit name="query-ASTERIXDB-1571">
         <output-dir compare="Text">query-ASTERIXDB-1571</output-dir>
       </compilation-unit>
diff --git 
a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties 
b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 3ad7fd5..8341a33 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -125,7 +125,7 @@
 1060 = Field \"%1$s\" in the with clause must be of type %2$s
 1061 = Field \"%1$s\" in the with clause must contain sub field \"%2$s\"
 1062 = Merge policy parameters cannot be of type %1$s
-1063 = There is no dataverse with name %1$s
+1063 = There is no dataverse with name \"%1$s\"
 
 # Feed Errors
 3001 = Illegal state.
diff --git 
a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java
 
b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java
index 1a54b92..f7c4425 100644
--- 
a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java
+++ 
b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java
@@ -23,7 +23,6 @@
 import java.util.List;
 
 import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.lang.common.base.Clause.ClauseType;
 import org.apache.asterix.lang.common.base.Expression;
 import org.apache.asterix.lang.common.clause.GroupbyClause;
 import org.apache.asterix.lang.common.clause.LetClause;
@@ -105,19 +104,15 @@
 
         // Visits join/unnest/nest clauses.
         for (AbstractBinaryCorrelateClause correlateClause : 
fromTerm.getCorrelateClauses()) {
+            // Adds binding variables.
+            bindingVariables.add(correlateClause.getRightVariable());
+            if (correlateClause.hasPositionalVariable()) {
+                bindingVariables.add(correlateClause.getPositionalVariable());
+            }
+
             Collection<VariableExpr> correlateFreeVars = new HashSet<>();
             correlateClause.accept(this, correlateFreeVars);
-            if (correlateClause.getClauseType() != ClauseType.JOIN_CLAUSE) {
-                // Correlation is allowed if the clause is not a join clause,
-                // therefore we remove left-side binding variables for these 
cases.
-                correlateFreeVars.removeAll(bindingVariables);
-
-                // Adds binding variables.
-                bindingVariables.add(correlateClause.getRightVariable());
-                if (correlateClause.hasPositionalVariable()) {
-                    
bindingVariables.add(correlateClause.getPositionalVariable());
-                }
-            }
+            correlateFreeVars.removeAll(bindingVariables);
             freeVars.addAll(correlateFreeVars);
         }
         return null;
@@ -410,7 +405,7 @@
     public Void visit(IndexAccessor ia, Collection<VariableExpr> freeVars) 
throws CompilationException {
         ia.getExpr().accept(this, freeVars);
         if (ia.getIndexExpr() != null) {
-            ia.getIndexExpr();
+            ia.getIndexExpr().accept(this, freeVars);
         }
         return null;
     }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
index 445ad4a..1971eb0 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
@@ -51,10 +51,11 @@
     }
 
     @Override
-    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
+    public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
         IJobletEventListenerFactory fact = 
ctx.getJobletContext().getJobletEventListenerFactory();
-        return new CommitRuntime(ctx, ((IJobEventListenerFactory) 
fact).getTxnId(datasetId), datasetId,
-                primaryKeyFields, isWriteTransaction,
-                
datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink);
+        return new IPushRuntime[] {
+                new CommitRuntime(ctx, ((IJobEventListenerFactory) 
fact).getTxnId(datasetId), datasetId,
+                        primaryKeyFields, isWriteTransaction,
+                        
datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink) };
     }
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
 
b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
index 79b8f38..2d6123e 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
@@ -104,7 +104,7 @@
                                 clusterLocations);
 
                         PlanCompiler pc = new PlanCompiler(context);
-                        return pc.compilePlan(plan, null, 
jobEventListenerFactory);
+                        return pc.compilePlan(plan, jobEventListenerFactory);
                     }
                 };
             }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index 75b63f1..db9728b 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -44,6 +44,7 @@
     MATERIALIZE,
     MICRO_PRE_CLUSTERED_GROUP_BY,
     MICRO_PRE_SORTED_DISTINCT_BY,
+    MICRO_UNION_ALL,
     NESTED_LOOP,
     NESTED_TUPLE_SOURCE,
     ONE_TO_ONE_EXCHANGE,
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
index 0fb667a..4b9ae89 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.operators.physical;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
@@ -72,8 +74,8 @@
     }
 
     protected PhysicalRequirements emptyUnaryRequirements() {
-        StructuralPropertiesVector[] req = new StructuralPropertiesVector[] {
-                StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR };
+        StructuralPropertiesVector[] req =
+                new StructuralPropertiesVector[] { 
StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR };
         return new PhysicalRequirements(req, 
IPartitioningRequirementsCoordinator.NO_COORDINATION);
     }
 
@@ -97,7 +99,7 @@
 
     /**
      * @return labels (0 or 1) for each input and output indicating the 
dependency between them.
-     *         The edges labeled as 1 must wait for the edges with label 0.
+     * The edges labeled as 1 must wait for the edges with label 0.
      */
     @Override
     public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator 
op) {
@@ -117,48 +119,63 @@
 
     protected AlgebricksPipeline[] compileSubplans(IOperatorSchema 
outerPlanSchema,
             AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema, 
JobGenContext context)
-                    throws AlgebricksException {
-        AlgebricksPipeline[] subplans = new 
AlgebricksPipeline[npOp.getNestedPlans().size()];
+            throws AlgebricksException {
+        List<List<AlgebricksPipeline>> subplans = 
compileSubplansImpl(outerPlanSchema, npOp, opSchema, context);
+        int n = subplans.size();
+        AlgebricksPipeline[] result = new AlgebricksPipeline[n];
+        for (int i = 0; i < n; i++) {
+            List<AlgebricksPipeline> subplanOps = subplans.get(i);
+            if (subplanOps.size() != 1) {
+                throw new AlgebricksException("Attempting to construct a 
nested plan with " + subplanOps.size()
+                        + " operator descriptors. Currently, nested plans can 
only consist in linear pipelines of Asterix micro operators.");
+            }
+            result[i] = subplanOps.get(0);
+        }
+        return result;
+    }
+
+    protected List<List<AlgebricksPipeline>> 
compileSubplansImpl(IOperatorSchema outerPlanSchema,
+            AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema, 
JobGenContext context)
+            throws AlgebricksException {
+        List<List<AlgebricksPipeline>> subplans = new 
ArrayList<>(npOp.getNestedPlans().size());
         PlanCompiler pc = new PlanCompiler(context);
-        int i = 0;
         for (ILogicalPlan p : npOp.getNestedPlans()) {
-            subplans[i++] = buildPipelineWithProjection(p, outerPlanSchema, 
npOp, opSchema, pc);
+            subplans.add(buildPipelineWithProjection(p, outerPlanSchema, npOp, 
opSchema, pc));
         }
         return subplans;
     }
 
-    private AlgebricksPipeline buildPipelineWithProjection(ILogicalPlan p, 
IOperatorSchema outerPlanSchema,
+    private List<AlgebricksPipeline> buildPipelineWithProjection(ILogicalPlan 
p, IOperatorSchema outerPlanSchema,
             AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema, 
PlanCompiler pc)
-                    throws AlgebricksException {
+            throws AlgebricksException {
         if (p.getRoots().size() > 1) {
             throw new NotImplementedException("Nested plans with several roots 
are not supported.");
         }
-        JobSpecification nestedJob = pc.compilePlan(p, outerPlanSchema, null);
+        JobSpecification nestedJob = pc.compileNestedPlan(p, outerPlanSchema);
         ILogicalOperator topOpInSubplan = p.getRoots().get(0).getValue();
         JobGenContext context = pc.getContext();
         IOperatorSchema topOpInSubplanScm = context.getSchema(topOpInSubplan);
         opSchema.addAllVariables(topOpInSubplanScm);
 
         Map<OperatorDescriptorId, IOperatorDescriptor> opMap = 
nestedJob.getOperatorMap();
-        if (opMap.size() != 1) {
-            throw new AlgebricksException("Attempting to construct a nested 
plan with " + opMap.size()
-                    + " operator descriptors. Currently, nested plans can only 
consist in linear pipelines of Asterix micro operators.");
-        }
-
-        for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> opEntry : 
opMap.entrySet()) {
-            IOperatorDescriptor opd = opEntry.getValue();
-            if (!(opd instanceof AlgebricksMetaOperatorDescriptor)) {
-                throw new AlgebricksException(
-                        "Can only generate Hyracks jobs for pipelinable 
Asterix nested plans, not for "
-                                + opd.getClass().getName());
+        List<? extends IOperatorDescriptor> metaOps = nestedJob.getMetaOps();
+        if (opMap.size() != metaOps.size()) {
+            for (IOperatorDescriptor opd : opMap.values()) {
+                if (!(opd instanceof AlgebricksMetaOperatorDescriptor)) {
+                    throw new AlgebricksException(
+                            "Can only generate Hyracks jobs for pipelinable 
Asterix nested plans, not for " + opd
+                                    .getClass().getName());
+                }
             }
-            AlgebricksMetaOperatorDescriptor amod = 
(AlgebricksMetaOperatorDescriptor) opd;
-
-            return amod.getPipeline();
-            // we suppose that the top operator in the subplan already does the
-            // projection for us
+            throw new IllegalStateException();
         }
 
-        throw new IllegalStateException();
+        List<AlgebricksPipeline> result = new ArrayList<>(metaOps.size());
+        for (IOperatorDescriptor opd : metaOps) {
+            AlgebricksMetaOperatorDescriptor amod = 
(AlgebricksMetaOperatorDescriptor) opd;
+            result.add(amod.getPipeline());
+
+        }
+        return result;
     }
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnionAllPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnionAllPOperator.java
new file mode 100644
index 0000000..5586d8e
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnionAllPOperator.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
+
+public abstract class AbstractUnionAllPOperator extends 
AbstractPhysicalOperator {
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, 
IOptimizationContext context) {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) 
op.getInputs().get(0).getValue();
+        IPartitioningProperty pp = 
op2.getDeliveredPhysicalProperties().getPartitioningProperty();
+        this.deliveredProperties = new StructuralPropertiesVector(pp, new 
ArrayList<>(0));
+    }
+
+    @Override
+    public PhysicalRequirements 
getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext 
context) {
+        StructuralPropertiesVector pv0 = 
OperatorPropertiesUtil.checkUnpartitionedAndGetPropertiesVector(op,
+                new StructuralPropertiesVector(new 
RandomPartitioningProperty(context.getComputationNodeDomain()),
+                        null));
+        StructuralPropertiesVector pv1 = 
OperatorPropertiesUtil.checkUnpartitionedAndGetPropertiesVector(op,
+                new StructuralPropertiesVector(new 
RandomPartitioningProperty(context.getComputationNodeDomain()),
+                        null));
+        return new PhysicalRequirements(new StructuralPropertiesVector[] { 
pv0, pv1 },
+                IPartitioningRequirementsCoordinator.NO_COORDINATION);
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+}
\ No newline at end of file
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroUnionAllPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroUnionAllPOperator.java
new file mode 100644
index 0000000..bbefaaa
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroUnionAllPOperator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import 
org.apache.hyracks.algebricks.runtime.operators.union.MicroUnionAllRuntimeFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class MicroUnionAllPOperator extends AbstractUnionAllPOperator {
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.MICRO_UNION_ALL;
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return true;
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, 
JobGenContext context, ILogicalOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, 
IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+
+        RecordDescriptor recordDescriptor =
+                
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, 
context);
+
+        List<Mutable<ILogicalOperator>> inputs = op.getInputs();
+        int nInputs = inputs.size();
+
+        MicroUnionAllRuntimeFactory runtime = new 
MicroUnionAllRuntimeFactory(nInputs);
+        builder.contributeMicroOperator(op, runtime, recordDescriptor);
+
+        for (int i = 0; i < nInputs; i++) {
+            ILogicalOperator src = inputs.get(i).getValue();
+            builder.contributeGraphEdge(src, 0, op, i);
+        }
+    }
+}
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
index d43ddab..6f7a300 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
@@ -89,18 +89,17 @@
         if (subplan.getNestedPlans().size() != 1) {
             throw new NotImplementedException("Subplan currently works only 
for one nested plan with one root.");
         }
-        AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0], 
subplan, opSchema, context);
-        assert subplans.length == 1;
-        AlgebricksPipeline np = subplans[0];
+        List<List<AlgebricksPipeline>> subplans = 
compileSubplansImpl(inputSchemas[0], subplan, opSchema, context);
+        assert subplans.size() == 1;
+        List<AlgebricksPipeline> np = subplans.get(0);
         RecordDescriptor inputRecordDesc = JobGenHelper.mkRecordDescriptor(
                 context.getTypeEnvironment(op.getInputs().get(0).getValue()), 
inputSchemas[0], context);
-        IMissingWriterFactory[] missingWriterFactories = new 
IMissingWriterFactory[np.getOutputWidth()];
+        IMissingWriterFactory[] missingWriterFactories = new 
IMissingWriterFactory[np.get(0).getOutputWidth()];
         for (int i = 0; i < missingWriterFactories.length; i++) {
             missingWriterFactories[i] = context.getMissingWriterFactory();
         }
-        SubplanRuntimeFactory runtime = new SubplanRuntimeFactory(np, 
missingWriterFactories, inputRecordDesc, null);
-
         RecordDescriptor recDesc = 
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, 
context);
+        SubplanRuntimeFactory runtime = new SubplanRuntimeFactory(np, 
missingWriterFactories, inputRecordDesc, recDesc, null);
         builder.contributeMicroOperator(subplan, runtime, recDesc);
 
         ILogicalOperator src = op.getInputs().get(0).getValue();
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
index d184161..85dc0e0 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
@@ -18,29 +18,22 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.operators.physical;
 
-import java.util.ArrayList;
+import java.util.List;
 
+import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.dataflow.std.union.UnionAllOperatorDescriptor;
 
-public class UnionAllPOperator extends AbstractPhysicalOperator {
+public class UnionAllPOperator extends AbstractUnionAllPOperator {
 
     @Override
     public PhysicalOperatorTag getOperatorTag() {
@@ -53,26 +46,6 @@
     }
 
     @Override
-    public void computeDeliveredProperties(ILogicalOperator op, 
IOptimizationContext context) {
-        AbstractLogicalOperator op2 = (AbstractLogicalOperator) 
op.getInputs().get(0).getValue();
-        IPartitioningProperty pp = 
op2.getDeliveredPhysicalProperties().getPartitioningProperty();
-        this.deliveredProperties = new StructuralPropertiesVector(pp, new 
ArrayList<>(0));
-    }
-
-    @Override
-    public PhysicalRequirements 
getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent, IOptimizationContext 
context) {
-        StructuralPropertiesVector pv0 = 
OperatorPropertiesUtil.checkUnpartitionedAndGetPropertiesVector(op,
-                new StructuralPropertiesVector(new 
RandomPartitioningProperty(context.getComputationNodeDomain()),
-                        null));
-        StructuralPropertiesVector pv1 = 
OperatorPropertiesUtil.checkUnpartitionedAndGetPropertiesVector(op,
-                new StructuralPropertiesVector(new 
RandomPartitioningProperty(context.getComputationNodeDomain()),
-                        null));
-        return new PhysicalRequirements(new StructuralPropertiesVector[] { 
pv0, pv1 },
-                IPartitioningRequirementsCoordinator.NO_COORDINATION);
-    }
-
-    @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, 
JobGenContext context, ILogicalOperator op,
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, 
IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
@@ -81,20 +54,15 @@
         RecordDescriptor recordDescriptor =
                 
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, 
context);
 
-        // at algebricks level, union all only accepts two inputs, although at
-        // hyracks
-        // level, there is no restrictions
-        UnionAllOperatorDescriptor opDesc = new 
UnionAllOperatorDescriptor(spec, 2, recordDescriptor);
+        List<Mutable<ILogicalOperator>> inputs = op.getInputs();
+        int nInputs = inputs.size();
+
+        UnionAllOperatorDescriptor opDesc = new 
UnionAllOperatorDescriptor(spec, nInputs, recordDescriptor);
         contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
-        ILogicalOperator src1 = op.getInputs().get(0).getValue();
-        builder.contributeGraphEdge(src1, 0, op, 0);
-        ILogicalOperator src2 = op.getInputs().get(1).getValue();
-        builder.contributeGraphEdge(src2, 0, op, 1);
-    }
 
-    @Override
-    public boolean expensiveThanMaterialization() {
-        return false;
+        for (int i = 0; i < nInputs; i++) {
+            ILogicalOperator src = inputs.get(i).getValue();
+            builder.contributeGraphEdge(src, 0, op, i);
+        }
     }
-
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index 8d00696..6c79466 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -18,7 +18,10 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.util;
 
+import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -40,8 +43,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.
-        LogicalOperatorDeepCopyWithNewVariablesVisitor;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalOperatorDeepCopyWithNewVariablesVisitor;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.OperatorDeepCopyVisitor;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
@@ -202,8 +204,8 @@
 
     public static Pair<ILogicalOperator, Map<LogicalVariable, 
LogicalVariable>> deepCopyWithNewVars(
             ILogicalOperator root, IOptimizationContext ctx) throws 
AlgebricksException {
-        LogicalOperatorDeepCopyWithNewVariablesVisitor deepCopyVisitor = new
-                LogicalOperatorDeepCopyWithNewVariablesVisitor(ctx, null, 
true);
+        LogicalOperatorDeepCopyWithNewVariablesVisitor deepCopyVisitor =
+                new LogicalOperatorDeepCopyWithNewVariablesVisitor(ctx, ctx, 
true);
         ILogicalOperator newRoot = deepCopyVisitor.deepCopy(root);
         return Pair.of(newRoot, 
deepCopyVisitor.getInputToOutputVariableMapping());
     }
@@ -328,4 +330,45 @@
         return false;
     }
 
+    /**
+     * Returns all descendants of an operator that are leaf operators
+     *
+     * @param opRef given operator
+     * @return list containing all leaf descendants
+     */
+    public static List<Mutable<ILogicalOperator>> 
findLeafDescendants(Mutable<ILogicalOperator> opRef) {
+        List<Mutable<ILogicalOperator>> result = Collections.emptyList();
+
+        Deque<Mutable<ILogicalOperator>> queue = new ArrayDeque<>();
+        queue.add(opRef);
+        Mutable<ILogicalOperator> currentOpRef;
+        while ((currentOpRef = queue.pollLast()) != null) {
+            List<Mutable<ILogicalOperator>> inputs = 
currentOpRef.getValue().getInputs();
+            if (inputs.size() == 0) {
+                if (result.isEmpty()) {
+                    result = new ArrayList<>();
+                }
+                result.add(currentOpRef);
+            } else {
+                queue.addAll(inputs);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Find operator in a given list of operator references
+     *
+     * @param list list to search in
+     * @param op   operator to find
+     * @return operator position in the given list or {@code -1} if not found
+     */
+    public static int indexOf(List<Mutable<ILogicalOperator>> list, 
ILogicalOperator op) {
+        for (int i = 0, ln = list.size(); i < ln; i++) {
+            if (list.get(i).getValue() == op) {
+                return i;
+            }
+        }
+        return -1;
+    }
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
index 4c42db8..6281124 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
@@ -23,6 +23,7 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.lang3.ArrayUtils;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -34,6 +35,8 @@
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+import 
org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
+import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
@@ -95,7 +98,7 @@
     @Override
     public void contributeMicroOperator(ILogicalOperator op, 
IPushRuntimeFactory runtime, RecordDescriptor recDesc,
             AlgebricksPartitionConstraint pc) {
-        microOps.put(op, new Pair<IPushRuntimeFactory, 
RecordDescriptor>(runtime, recDesc));
+        microOps.put(op, new Pair<>(runtime, recDesc));
         revMicroOpMap.put(runtime, op);
         if (pc != null) {
             pcForMicroOps.put(op, pc);
@@ -168,6 +171,17 @@
             jobSpec.addRoot(opDesc);
         }
         setAllPartitionConstraints(tgtConstraints);
+    }
+
+    public List<AlgebricksMetaOperatorDescriptor> getGeneratedMetaOps() {
+        List<AlgebricksMetaOperatorDescriptor> resultOps = new ArrayList<>();
+        for (IOperatorDescriptor opd : jobSpec.getOperatorMap().values()) {
+            if (opd instanceof AlgebricksMetaOperatorDescriptor) {
+                resultOps.add((AlgebricksMetaOperatorDescriptor)opd);
+            }
+        }
+        resultOps.sort((op1, op2) -> sendsOutput(op1, op2) ? 1 : 
sendsOutput(op2, op1) ? -1 : 0);
+        return resultOps;
     }
 
     private void setAllPartitionConstraints(Map<IConnectorDescriptor, 
TargetConstraint> tgtConstraints) {
@@ -316,20 +330,30 @@
         int n = opContents.size();
         IPushRuntimeFactory[] runtimeFactories = new IPushRuntimeFactory[n];
         RecordDescriptor[] internalRecordDescriptors = new RecordDescriptor[n];
-        int i = 0;
-        for (Pair<IPushRuntimeFactory, RecordDescriptor> p : opContents) {
+        for (int i = 0, ln = opContents.size(); i < ln; i++) {
+            Pair<IPushRuntimeFactory, RecordDescriptor> p = opContents.get(i);
             runtimeFactories[i] = p.first;
             internalRecordDescriptors[i] = p.second;
-            i++;
         }
         ILogicalOperator lastLogicalOp = revMicroOpMap.get(runtimeFactories[n 
- 1]);
         ArrayList<ILogicalOperator> outOps = outEdges.get(lastLogicalOp);
-        int outArity = (outOps == null) ? 0 : outOps.size();
+        int outArity = outOps == null ? 0 : outOps.size();
+        int[] outPositions = new int[outArity];
+        IPushRuntimeFactory[] outRuntimeFactories = new 
IPushRuntimeFactory[outArity];
+        if (outOps != null) {
+            for (int i = 0, ln = outOps.size(); i < ln; i++ ) {
+                ILogicalOperator outOp = outOps.get(i);
+                outPositions[i] = 
OperatorManipulationUtil.indexOf(outOp.getInputs(), lastLogicalOp);
+                Pair<IPushRuntimeFactory, RecordDescriptor> microOpPair = 
microOps.get(outOp);
+                outRuntimeFactories[i] = microOpPair != null ? 
microOpPair.first : null;
+            }
+        }
+
         ILogicalOperator firstLogicalOp = 
revMicroOpMap.get(runtimeFactories[0]);
         ArrayList<ILogicalOperator> inOps = inEdges.get(firstLogicalOp);
         int inArity = (inOps == null) ? 0 : inOps.size();
         return new AlgebricksMetaOperatorDescriptor(jobSpec, inArity, 
outArity, runtimeFactories,
-                internalRecordDescriptors);
+                internalRecordDescriptors, outRuntimeFactories, outPositions);
     }
 
     private void addMicroOpToMetaRuntimeOp(ILogicalOperator aop) {
@@ -343,7 +367,12 @@
             return;
         }
         ILogicalOperator dest = destList.get(0);
+        int destInputPos = OperatorManipulationUtil.indexOf(dest.getInputs(), 
aop);
         Integer j = algebraicOpBelongingToMetaAsterixOp.get(dest);
+        if (destInputPos != 0) {
+            return;
+        }
+
         if (j == null && microOps.get(dest) != null) {
             algebraicOpBelongingToMetaAsterixOp.put(dest, k);
             List<Pair<IPushRuntimeFactory, RecordDescriptor>> aodContent1 = 
metaAsterixOpSkeletons.get(k);
@@ -361,7 +390,6 @@
                 }
             }
         }
-
     }
 
     private int createNewMetaOpInfo(ILogicalOperator aop) {
@@ -386,4 +414,28 @@
         }
     }
 
+    private boolean sendsOutput(AlgebricksMetaOperatorDescriptor src, 
AlgebricksMetaOperatorDescriptor trg) {
+        AlgebricksPipeline srcPipeline = src.getPipeline();
+        IPushRuntimeFactory[] srcOutRts = 
srcPipeline.getOutputRuntimeFactories();
+        if (srcOutRts == null) {
+            return false;
+        }
+        IPushRuntimeFactory[] trgRts = trg.getPipeline().getRuntimeFactories();
+        for (IPushRuntimeFactory srcOutRt : srcOutRts) {
+            if (ArrayUtils.contains(trgRts, srcOutRt)) {
+                return true;
+            }
+            ILogicalOperator srcOutOp = revMicroOpMap.get(srcOutRt);
+            if (srcOutOp != null) {
+                Integer k = algebraicOpBelongingToMetaAsterixOp.get(srcOutOp);
+                if (k != null) {
+                    AlgebricksMetaOperatorDescriptor srcOutMetaOp = 
metaAsterixOps.get(k);
+                    if (srcOutMetaOp != null && sendsOutput(srcOutMetaOp, 
trg)) {
+                        return true;
+                    }
+                }
+            }
+        }
+        return false;
+    }
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
index f817cd6..4a89d15 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
@@ -47,14 +47,24 @@
         return context;
     }
 
-    public JobSpecification compilePlan(ILogicalPlan plan, IOperatorSchema 
outerPlanSchema,
+    public JobSpecification compilePlan(ILogicalPlan plan, 
IJobletEventListenerFactory jobEventListenerFactory)
+            throws AlgebricksException {
+        return compilePlanImpl(plan, false, null, jobEventListenerFactory);
+    }
+
+    public JobSpecification compileNestedPlan(ILogicalPlan plan, 
IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        return compilePlanImpl(plan, true, outerPlanSchema, null);
+    }
+
+    private JobSpecification compilePlanImpl(ILogicalPlan plan, boolean 
isNestedPlan, IOperatorSchema outerPlanSchema,
             IJobletEventListenerFactory jobEventListenerFactory) throws 
AlgebricksException {
         JobSpecification spec = new JobSpecification(context.getFrameSize());
         if (jobEventListenerFactory != null) {
             spec.setJobletEventListenerFactory(jobEventListenerFactory);
         }
-        List<ILogicalOperator> rootOps = new ArrayList<ILogicalOperator>();
-        IHyracksJobBuilder builder = new JobBuilder(spec, 
context.getClusterLocations());
+        List<ILogicalOperator> rootOps = new ArrayList<>();
+        JobBuilder builder = new JobBuilder(spec, 
context.getClusterLocations());
         for (Mutable<ILogicalOperator> opRef : plan.getRoots()) {
             compileOpRef(opRef, spec, builder, outerPlanSchema);
             rootOps.add(opRef.getValue());
@@ -65,6 +75,9 @@
         spec.setConnectorPolicyAssignmentPolicy(new 
ConnectorPolicyAssignmentPolicy());
         // Do not do activity cluster planning because it is slow on large 
clusters
         spec.setUseConnectorPolicyForScheduling(false);
+        if (isNestedPlan) {
+            spec.setMetaOps(builder.getGeneratedMetaOps());
+        }
         return spec;
     }
 
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 84961d6..72c0928 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -67,6 +67,7 @@
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.LeftOuterUnnestPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroPreSortedDistinctByPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroPreclusteredGroupByPOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroUnionAllPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedTupleSourcePOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.PreSortedDistinctByPOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator;
@@ -201,11 +202,11 @@
                     break;
                 }
                 case INNERJOIN: {
-                    
JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, context);
+                    
JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, topLevelOp, 
context);
                     break;
                 }
                 case LEFTOUTERJOIN: {
-                    
JoinUtils.setJoinAlgorithmAndExchangeAlgo((LeftOuterJoinOperator) op, context);
+                    
JoinUtils.setJoinAlgorithmAndExchangeAlgo((LeftOuterJoinOperator) op, 
topLevelOp, context);
                     break;
                 }
                 case LIMIT: {
@@ -260,11 +261,19 @@
                     break;
                 }
                 case UNIONALL: {
-                    op.setPhysicalOperator(new UnionAllPOperator());
+                    if (topLevelOp) {
+                        op.setPhysicalOperator(new UnionAllPOperator());
+                    } else {
+                        op.setPhysicalOperator(new MicroUnionAllPOperator());
+                    }
                     break;
                 }
                 case INTERSECT: {
-                    op.setPhysicalOperator(new IntersectPOperator());
+                    if (topLevelOp) {
+                        op.setPhysicalOperator(new IntersectPOperator());
+                    } else {
+                        throw new IllegalStateException("Micro operator not 
implemented for: " + op.getOperatorTag());
+                    }
                     break;
                 }
                 case UNNEST: {
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
index af95ecd..ebcfba4 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
@@ -139,6 +139,10 @@
                 while (upperSubplanRootRefIterator.hasNext()) {
                     Mutable<ILogicalOperator> rootOpRef = 
upperSubplanRootRefIterator.next();
 
+                    if (downToNts(rootOpRef) == null) {
+                        continue;
+                    }
+
                     // Collects free variables in the root operator of a 
nested plan and its descent.
                     Set<LogicalVariable> freeVars = new ListSet<>();
                     
OperatorPropertiesUtil.getFreeVariablesInSelfOrDesc((AbstractLogicalOperator) 
rootOpRef.getValue(),
@@ -265,11 +269,13 @@
     }
 
     private Mutable<ILogicalOperator> downToNts(Mutable<ILogicalOperator> 
opRef) {
-        Mutable<ILogicalOperator> currentOpRef = opRef;
-        while (currentOpRef.getValue().getInputs().size() > 0) {
-            currentOpRef = currentOpRef.getValue().getInputs().get(0);
+        List<Mutable<ILogicalOperator>> leafOps = 
OperatorManipulationUtil.findLeafDescendants(opRef);
+        if (leafOps.size() == 1) {
+            Mutable<ILogicalOperator> leafOp = leafOps.get(0);
+            if (leafOp.getValue().getOperatorTag() == 
LogicalOperatorTag.NESTEDTUPLESOURCE) {
+                return leafOp;
+            }
         }
-        return currentOpRef;
+        return null;
     }
-
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
index 6efda52..33862e3 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
@@ -51,8 +51,12 @@
     private JoinUtils() {
     }
 
-    public static void 
setJoinAlgorithmAndExchangeAlgo(AbstractBinaryJoinOperator op, 
IOptimizationContext context)
+    public static void 
setJoinAlgorithmAndExchangeAlgo(AbstractBinaryJoinOperator op, boolean 
topLevelOp,
+            IOptimizationContext context)
             throws AlgebricksException {
+        if (!topLevelOp) {
+            throw new IllegalStateException("Micro operator not implemented 
for: " + op.getOperatorTag());
+        }
         List<LogicalVariable> sideLeft = new LinkedList<>();
         List<LogicalVariable> sideRight = new LinkedList<>();
         List<LogicalVariable> varsLeft = 
op.getInputs().get(0).getValue().getSchema();
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/AlgebricksPipeline.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/AlgebricksPipeline.java
index 379944b..f24d38d 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/AlgebricksPipeline.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/AlgebricksPipeline.java
@@ -27,10 +27,15 @@
     private static final long serialVersionUID = 1L;
     private final IPushRuntimeFactory[] runtimeFactories;
     private final RecordDescriptor[] recordDescriptors;
+    private final IPushRuntimeFactory[] outputRuntimeFactories;
+    private final int[] outputPositions;
 
-    public AlgebricksPipeline(IPushRuntimeFactory[] runtimeFactories, 
RecordDescriptor[] recordDescriptors) {
+    public AlgebricksPipeline(IPushRuntimeFactory[] runtimeFactories, 
RecordDescriptor[] recordDescriptors,
+            IPushRuntimeFactory[] outputRuntimeFactories, int[] 
outputPositions) {
         this.runtimeFactories = runtimeFactories;
         this.recordDescriptors = recordDescriptors;
+        this.outputRuntimeFactories = outputRuntimeFactories;
+        this.outputPositions = outputPositions;
         // this.projectedColumns = projectedColumns;
     }
 
@@ -46,8 +51,15 @@
         return recordDescriptors[recordDescriptors.length - 1].getFieldCount();
     }
 
+    public IPushRuntimeFactory[] getOutputRuntimeFactories() {
+        return outputRuntimeFactories;
+    }
+
+    public int[] getOutputPositions() {
+        return outputPositions;
+    }
+
     // public int[] getProjectedColumns() {
     // return projectedColumns;
     // }
-
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
index de6cddd..f90de81 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
@@ -24,5 +24,5 @@
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IPushRuntimeFactory extends Serializable {
-    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException;
+    IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException;
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index 94af04f..0a578f6 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -168,7 +168,7 @@
         // should enforce protocol
         boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
         for (int i = runtimeFactories.length - 1; i >= 0; i--) {
-            IPushRuntime newRuntime = 
runtimeFactories[i].createPushRuntime(ctx);
+            IPushRuntime newRuntime = 
runtimeFactories[i].createPushRuntime(ctx)[0];
             newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : 
newRuntime;
             start = enforce ? EnforcePushRuntime.enforce(start) : start;
             newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]);
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
index c261df8..75b2fb2 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -149,7 +149,7 @@
         IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
         RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
         for (int i = runtimeFactories.length - 1; i >= 0; i--) {
-            IPushRuntime newRuntime = 
runtimeFactories[i].createPushRuntime(ctx);
+            IPushRuntime newRuntime = 
runtimeFactories[i].createPushRuntime(ctx)[0];
             newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : 
newRuntime;
             start = enforce ? EnforceFrameWriter.enforce(start) : start;
             newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]);
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
index 32eff3a..7b3fb46 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
@@ -34,8 +34,8 @@
     }
 
     @Override
-    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
-        return createOneOutputPushRuntime(ctx);
+    public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
+        return new IPushRuntime[] { createOneOutputPushRuntime(ctx) };
     }
 
     public abstract AbstractOneInputOneOutputPushRuntime 
createOneOutputPushRuntime(IHyracksTaskContext ctx)
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
index a838557..f0e9406 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
@@ -38,8 +38,8 @@
     }
 
     @Override
-    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
-        return new AbstractOneInputSinkPushRuntime() {
+    public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
+        return new IPushRuntime[] { new AbstractOneInputSinkPushRuntime() {
 
             @Override
             public void open() throws HyracksDataException {
@@ -61,7 +61,6 @@
             public void flush() throws HyracksDataException {
                 // flush() is meaningless for sink operators
             }
-        };
+        } };
     }
-
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index 25a4aea..6e4ecce 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -45,11 +45,18 @@
 
     public AlgebricksMetaOperatorDescriptor(IOperatorDescriptorRegistry spec, 
int inputArity, int outputArity,
             IPushRuntimeFactory[] runtimeFactories, RecordDescriptor[] 
internalRecordDescriptors) {
+        this(spec, inputArity, outputArity, runtimeFactories, 
internalRecordDescriptors, null, null);
+    }
+
+    public AlgebricksMetaOperatorDescriptor(IOperatorDescriptorRegistry spec, 
int inputArity, int outputArity,
+            IPushRuntimeFactory[] runtimeFactories, RecordDescriptor[] 
internalRecordDescriptors,
+            IPushRuntimeFactory[] outputRuntimeFactories, int[] 
outputPositions) {
         super(spec, inputArity, outputArity);
         if (outputArity == 1) {
             this.outRecDescs[0] = 
internalRecordDescriptors[internalRecordDescriptors.length - 1];
         }
-        this.pipeline = new AlgebricksPipeline(runtimeFactories, 
internalRecordDescriptors);
+        this.pipeline = new AlgebricksPipeline(runtimeFactories, 
internalRecordDescriptors, outputRuntimeFactories,
+                outputPositions);
     }
 
     public AlgebricksPipeline getPipeline() {
@@ -68,7 +75,7 @@
         StringBuilder sb = new StringBuilder();
         sb.append("Asterix { \n");
         for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) {
-            sb.append("  " + f.toString() + ";\n");
+            sb.append("  ").append(f).append(";\n");
         }
         sb.append("}");
         return sb.toString();
@@ -87,7 +94,7 @@
     private class SourcePushRuntime extends 
AbstractUnaryOutputSourceOperatorNodePushable {
         private final IHyracksTaskContext ctx;
 
-        public SourcePushRuntime(IHyracksTaskContext ctx) {
+        SourcePushRuntime(IHyracksTaskContext ctx) {
             this.ctx = ctx;
         }
 
@@ -136,8 +143,9 @@
                             outputArity > 0 ? 
AlgebricksMetaOperatorDescriptor.this.outRecDescs[0] : null;
                     RecordDescriptor pipelineInputRecordDescriptor = 
recordDescProvider
                             
.getInputRecordDescriptor(AlgebricksMetaOperatorDescriptor.this.getActivityId(),
 0);
-                    PipelineAssembler pa = new PipelineAssembler(pipeline, 
inputArity, outputArity,
-                            pipelineInputRecordDescriptor, 
pipelineOutputRecordDescriptor);
+                    PipelineAssembler pa =
+                            new PipelineAssembler(pipeline, inputArity, 
outputArity, pipelineInputRecordDescriptor,
+                                    pipelineOutputRecordDescriptor);
                     startOfPipeline = pa.assemblePipeline(writer, ctx);
                 }
                 opened = true;
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
index e1081e0..a717794 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
@@ -18,9 +18,13 @@
  */
 package org.apache.hyracks.algebricks.runtime.operators.meta;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
 import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
@@ -37,6 +41,7 @@
     private final int inputArity;
     private final int outputArity;
     private final AlgebricksPipeline pipeline;
+    private final Map<IPushRuntimeFactory, IPushRuntime[]> runtimeMap;
 
     public PipelineAssembler(AlgebricksPipeline pipeline, int inputArity, int 
outputArity,
             RecordDescriptor pipelineInputRecordDescriptor, RecordDescriptor 
pipelineOutputRecordDescriptor) {
@@ -45,6 +50,7 @@
         this.pipelineOutputRecordDescriptor = pipelineOutputRecordDescriptor;
         this.inputArity = inputArity;
         this.outputArity = outputArity;
+        this.runtimeMap = new HashMap<>();
     }
 
     public IFrameWriter assemblePipeline(IFrameWriter writer, 
IHyracksTaskContext ctx) throws HyracksDataException {
@@ -52,19 +58,30 @@
         boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
         // plug the operators
         IFrameWriter start = writer;// this.writer;
-        for (int i = pipeline.getRuntimeFactories().length - 1; i >= 0; i--) {
-            IPushRuntime newRuntime = 
pipeline.getRuntimeFactories()[i].createPushRuntime(ctx);
-            newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) : 
newRuntime;
+        IPushRuntimeFactory[] runtimeFactories = 
pipeline.getRuntimeFactories();
+        RecordDescriptor[] recordDescriptors = pipeline.getRecordDescriptors();
+        for (int i = runtimeFactories.length - 1; i >= 0; i--) {
             start = enforce ? EnforceFrameWriter.enforce(start) : start;
-            if (i == pipeline.getRuntimeFactories().length - 1) {
-                if (outputArity == 1) {
-                    newRuntime.setOutputFrameWriter(0, start, 
pipelineOutputRecordDescriptor);
+
+            IPushRuntimeFactory runtimeFactory = runtimeFactories[i];
+            IPushRuntime[] newRuntimes = runtimeFactory.createPushRuntime(ctx);
+            for (int j = 0; j < newRuntimes.length; j++) {
+                if (enforce) {
+                    newRuntimes[j] = 
EnforcePushRuntime.enforce(newRuntimes[j]);
                 }
-            } else {
-                newRuntime.setOutputFrameWriter(0, start, 
pipeline.getRecordDescriptors()[i]);
+                if (i == runtimeFactories.length - 1) {
+                    if (outputArity == 1) {
+                        newRuntimes[j].setOutputFrameWriter(0, start, 
pipelineOutputRecordDescriptor);
+                    }
+                } else {
+                    newRuntimes[j].setOutputFrameWriter(0, start, 
recordDescriptors[i]);
+                }
             }
+            runtimeMap.put(runtimeFactory, newRuntimes);
+
+            IPushRuntime newRuntime = newRuntimes[0];
             if (i > 0) {
-                newRuntime.setInputRecordDescriptor(0, 
pipeline.getRecordDescriptors()[i - 1]);
+                newRuntime.setInputRecordDescriptor(0, recordDescriptors[i - 
1]);
             } else if (inputArity > 0) {
                 newRuntime.setInputRecordDescriptor(0, 
pipelineInputRecordDescriptor);
             }
@@ -72,4 +89,8 @@
         }
         return start;
     }
+
+    public IPushRuntime[] getPushRuntime(IPushRuntimeFactory runtimeFactory) {
+        return runtimeMap.get(runtimeFactory);
+    }
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index f6a349f..5f255f0 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -20,9 +20,11 @@
 
 import java.io.DataOutput;
 import java.nio.ByteBuffer;
+import java.util.List;
 
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
 import 
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputPushRuntime;
@@ -41,16 +43,21 @@
 
     private static final long serialVersionUID = 1L;
 
-    private final AlgebricksPipeline pipeline;
+    private final List<AlgebricksPipeline> pipelines;
+
     private final RecordDescriptor inputRecordDesc;
+
+    private final RecordDescriptor outputRecordDesc;
+
     private final IMissingWriterFactory[] missingWriterFactories;
 
-    public SubplanRuntimeFactory(AlgebricksPipeline pipeline, 
IMissingWriterFactory[] missingWriterFactories,
-            RecordDescriptor inputRecordDesc, int[] projectionList) {
+    public SubplanRuntimeFactory(List<AlgebricksPipeline> pipelines, 
IMissingWriterFactory[] missingWriterFactories,
+            RecordDescriptor inputRecordDesc, RecordDescriptor 
outputRecordDesc, int[] projectionList) {
         super(projectionList);
-        this.pipeline = pipeline;
+        this.pipelines = pipelines;
         this.missingWriterFactories = missingWriterFactories;
         this.inputRecordDesc = inputRecordDesc;
+        this.outputRecordDesc = outputRecordDesc;
         if (projectionList != null) {
             throw new NotImplementedException();
         }
@@ -60,8 +67,12 @@
     public String toString() {
         StringBuilder sb = new StringBuilder();
         sb.append("Subplan { \n");
-        for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) {
-            sb.append("  " + f.toString() + ";\n");
+        for (AlgebricksPipeline pipeline : pipelines) {
+            sb.append('{');
+            for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) {
+                sb.append("  ").append(f).append(";\n");
+            }
+            sb.append('}');
         }
         sb.append("}");
         return sb.toString();
@@ -70,110 +81,177 @@
     @Override
     public AbstractOneInputOneOutputPushRuntime 
createOneOutputPushRuntime(final IHyracksTaskContext ctx)
             throws HyracksDataException {
+        return new SubplanPushRuntime(ctx);
+    }
 
-        RecordDescriptor pipelineOutputRecordDescriptor = null;
+    private class SubplanPushRuntime extends 
AbstractOneInputOneOutputOneFramePushRuntime {
 
-        final PipelineAssembler pa = new PipelineAssembler(pipeline, 1, 1, 
inputRecordDesc,
-                pipelineOutputRecordDescriptor);
-        final IMissingWriter[] nullWriters = new 
IMissingWriter[missingWriterFactories.length];
-        for (int i = 0; i < missingWriterFactories.length; i++) {
-            nullWriters[i] = missingWriterFactories[i].createMissingWriter();
+        final IHyracksTaskContext ctx;
+
+        final NestedTupleSourceRuntime[] startOfPipelines;
+
+        boolean first;
+
+        SubplanPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
+            this.ctx = ctx;
+            this.first = true;
+
+            IMissingWriter[] missingWriters = new 
IMissingWriter[missingWriterFactories.length];
+            for (int i = 0; i < missingWriterFactories.length; i++) {
+                missingWriters[i] = 
missingWriterFactories[i].createMissingWriter();
+            }
+
+            int pipelineCount = pipelines.size();
+            startOfPipelines = new NestedTupleSourceRuntime[pipelineCount];
+            PipelineAssembler[] pipelineAssemblers = new 
PipelineAssembler[pipelineCount];
+            for (int i = 0; i < pipelineCount; i++) {
+                AlgebricksPipeline pipeline = pipelines.get(i);
+                RecordDescriptor pipelineLastRecordDescriptor =
+                        
pipeline.getRecordDescriptors()[pipeline.getRecordDescriptors().length - 1];
+
+                RecordDescriptor outputRecordDescriptor;
+                IFrameWriter outputWriter;
+                if (i == 0) {
+                    // primary pipeline
+                    outputWriter = new 
TupleOuterProduct(pipelineLastRecordDescriptor, missingWriters);
+                    outputRecordDescriptor = 
SubplanRuntimeFactory.this.outputRecordDesc;
+                } else {
+                    // secondary pipeline
+                    IPushRuntime outputPushRuntime = 
linkSecondaryPipeline(pipeline, pipelineAssemblers, i);
+                    if (outputPushRuntime == null) {
+                        throw new IllegalStateException();
+                    }
+                    outputPushRuntime.setInputRecordDescriptor(0, 
pipelineLastRecordDescriptor);
+                    outputWriter = outputPushRuntime;
+                    outputRecordDescriptor = pipelineLastRecordDescriptor;
+                }
+
+                PipelineAssembler pa = new PipelineAssembler(pipeline, 1, 1, 
inputRecordDesc, outputRecordDescriptor);
+                startOfPipelines[i] = (NestedTupleSourceRuntime) 
pa.assemblePipeline(outputWriter, ctx);
+                pipelineAssemblers[i] = pa;
+            }
         }
 
-        return new AbstractOneInputOneOutputOneFramePushRuntime() {
+        IPushRuntime linkSecondaryPipeline(AlgebricksPipeline pipeline, 
PipelineAssembler[] pipelineAssemblers,
+                int pipelineAssemblersCount) {
+            IPushRuntimeFactory[] outputRuntimeFactories = 
pipeline.getOutputRuntimeFactories();
+            if (outputRuntimeFactories == null || 
outputRuntimeFactories.length != 1) {
+                throw new IllegalStateException();
+            }
+            IPushRuntimeFactory outRuntimeFactory = outputRuntimeFactories[0];
+            int outputPosition = pipeline.getOutputPositions()[0];
+            for (int i = 0; i < pipelineAssemblersCount; i++) {
+                IPushRuntime[] p = 
pipelineAssemblers[i].getPushRuntime(outRuntimeFactory);
+                if (p != null) {
+                    return p[outputPosition];
+                }
+            }
+            return null;
+        }
 
-            /**
-             * Computes the outer product between a given tuple and the frames
-             * passed.
-             */
-            class TupleOuterProduct implements IFrameWriter {
+        @Override
+        public void open() throws HyracksDataException {
+            writer.open();
+            if (first) {
+                first = false;
+                initAccessAppendRef(ctx);
+            }
+        }
 
-                private boolean smthWasWritten = false;
-                private FrameTupleAccessor ta = new FrameTupleAccessor(
-                        
pipeline.getRecordDescriptors()[pipeline.getRecordDescriptors().length - 1]);
-                private ArrayTupleBuilder tb = new ArrayTupleBuilder(
-                        nullWriters.length + 
SubplanRuntimeFactory.this.inputRecordDesc.getFieldCount());
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            tAccess.reset(buffer);
+            int nTuple = tAccess.getTupleCount();
+            for (int t = 0; t < nTuple; t++) {
+                tRef.reset(tAccess, t);
 
-                @Override
-                public void open() throws HyracksDataException {
-                    smthWasWritten = false;
+                for (NestedTupleSourceRuntime nts : startOfPipelines) {
+                    nts.writeTuple(buffer, t);
                 }
 
-                @Override
-                public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
-                    ta.reset(buffer);
-                    int nTuple = ta.getTupleCount();
-                    for (int t = 0; t < nTuple; t++) {
-                        appendConcat(tRef.getFrameTupleAccessor(), 
tRef.getTupleIndex(), ta, t);
+                int n = 0;
+                try {
+                    for (; n < startOfPipelines.length; n++) {
+                        NestedTupleSourceRuntime nts = startOfPipelines[n];
+                        try {
+                            nts.open();
+                        } catch (Exception e) {
+                            nts.fail();
+                            throw e;
+                        }
                     }
-                    smthWasWritten = true;
-                }
-
-                @Override
-                public void close() throws HyracksDataException {
-                    if (!smthWasWritten && !failed) {
-                        // the case when we need to write nulls
-                        appendNullsToTuple();
-                        appendToFrameFromTupleBuilder(tb);
-                    }
-                }
-
-                @Override
-                public void fail() throws HyracksDataException {
-                    // writer.fail() is called by the outer class' 
writer.fail().
-                }
-
-                private void appendNullsToTuple() throws HyracksDataException {
-                    tb.reset();
-                    int n0 = tRef.getFieldCount();
-                    for (int f = 0; f < n0; f++) {
-                        tb.addField(tRef.getFrameTupleAccessor(), 
tRef.getTupleIndex(), f);
-                    }
-                    DataOutput dos = tb.getDataOutput();
-                    for (int i = 0; i < nullWriters.length; i++) {
-                        nullWriters[i].writeMissing(dos);
-                        tb.addFieldEndOffset();
+                } finally {
+                    for (int i = n - 1; i >= 0; i--) {
+                        startOfPipelines[i].close();
                     }
                 }
             }
+        }
 
-            IFrameWriter endPipe = new TupleOuterProduct();
+        @Override
+        public void flush() throws HyracksDataException {
+            writer.flush();
+        }
 
-            NestedTupleSourceRuntime startOfPipeline = 
(NestedTupleSourceRuntime) pa.assemblePipeline(endPipe, ctx);
+        /**
+         * Computes the outer product between a given tuple and the frames
+         * passed.
+         */
+        class TupleOuterProduct implements IFrameWriter {
 
-            boolean first = true;
+            private boolean smthWasWritten;
+            private final FrameTupleAccessor ta;
+            private final ArrayTupleBuilder tb;
+            private final IMissingWriter[] missingWriters;
+
+            private TupleOuterProduct(RecordDescriptor recordDescriptor, 
IMissingWriter[] missingWriters) {
+                ta = new FrameTupleAccessor(recordDescriptor);
+                tb = new ArrayTupleBuilder(
+                        missingWriters.length + 
SubplanRuntimeFactory.this.inputRecordDesc.getFieldCount());
+                this.missingWriters = missingWriters;
+            }
 
             @Override
             public void open() throws HyracksDataException {
-                writer.open();
-                if (first) {
-                    first = false;
-                    initAccessAppendRef(ctx);
-                }
+                smthWasWritten = false;
             }
 
             @Override
             public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
-                tAccess.reset(buffer);
-                int nTuple = tAccess.getTupleCount();
+                ta.reset(buffer);
+                int nTuple = ta.getTupleCount();
                 for (int t = 0; t < nTuple; t++) {
-                    tRef.reset(tAccess, t);
-                    startOfPipeline.writeTuple(buffer, t);
-                    try {
-                        startOfPipeline.open();
-                    } catch (Exception e) {
-                        startOfPipeline.fail();
-                        throw e;
-                    } finally {
-                        startOfPipeline.close();
-                    }
+                    appendConcat(tRef.getFrameTupleAccessor(), 
tRef.getTupleIndex(), ta, t);
+                }
+                smthWasWritten = true;
+            }
+
+            @Override
+            public void close() throws HyracksDataException {
+                if (!smthWasWritten && !failed) {
+                    // the case when we need to write nulls
+                    appendNullsToTuple();
+                    appendToFrameFromTupleBuilder(tb);
                 }
             }
 
             @Override
-            public void flush() throws HyracksDataException {
-                writer.flush();
+            public void fail() throws HyracksDataException {
+                // writer.fail() is called by the outer class' writer.fail().
             }
-        };
+
+            private void appendNullsToTuple() throws HyracksDataException {
+                tb.reset();
+                int n0 = tRef.getFieldCount();
+                for (int f = 0; f < n0; f++) {
+                    tb.addField(tRef.getFrameTupleAccessor(), 
tRef.getTupleIndex(), f);
+                }
+                DataOutput dos = tb.getDataOutput();
+                for (IMissingWriter missingWriter : missingWriters) {
+                    missingWriter.writeMissing(dos);
+                    tb.addFieldEndOffset();
+                }
+            }
+        }
     }
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
index 3ccceed..67f4a77 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
@@ -40,8 +40,8 @@
     }
 
     @Override
-    public IPushRuntime createPushRuntime(final IHyracksTaskContext ctx) 
throws HyracksDataException {
-        return new AbstractOneInputSourcePushRuntime() {
+    public IPushRuntime[] createPushRuntime(final IHyracksTaskContext ctx) 
throws HyracksDataException {
+        return new IPushRuntime[] { new AbstractOneInputSourcePushRuntime() {
 
             private final ArrayTupleBuilder tb = new ArrayTupleBuilder(0);
             private final FrameTupleAppender appender = new 
FrameTupleAppender(new VSizeFrame(ctx));
@@ -69,6 +69,6 @@
             public void flush() throws HyracksDataException {
                 appender.flush(writer);
             }
-        };
+        } };
     }
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
index 496679f..8e64092 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
@@ -39,8 +39,8 @@
     }
 
     @Override
-    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
-        return new NestedTupleSourceRuntime(ctx);
+    public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
+        return new IPushRuntime[] { new NestedTupleSourceRuntime(ctx) };
     }
 
     public static class NestedTupleSourceRuntime extends 
AbstractOneInputOneOutputOneFramePushRuntime {
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
index 021784a..8a06ecf 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
@@ -56,9 +56,9 @@
     }
 
     @Override
-    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) {
+    public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) {
         IAWriter w = 
PrinterBasedWriterFactory.INSTANCE.createWriter(printColumns, System.out, 
printerFactories,
                 inputRecordDesc);
-        return new SinkWriterRuntime(w, System.out, inputRecordDesc);
+        return new IPushRuntime[] { new SinkWriterRuntime(w, System.out, 
inputRecordDesc) };
     }
 }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
index d52ceee..536a769 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
@@ -67,11 +67,11 @@
     }
 
     @Override
-    public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
+    public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
         try {
             PrintStream filePrintStream = new PrintStream(new 
BufferedOutputStream(new FileOutputStream(outputFile)));
             IAWriter w = writerFactory.createWriter(fields, filePrintStream, 
printerFactories, inputRecordDesc);
-            return new SinkWriterRuntime(w, filePrintStream, inputRecordDesc, 
true);
+            return new IPushRuntime[] { new SinkWriterRuntime(w, 
filePrintStream, inputRecordDesc, true) };
         } catch (IOException e) {
             throw new HyracksDataException(e);
         }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/union/MicroUnionAllRuntimeFactory.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/union/MicroUnionAllRuntimeFactory.java
new file mode 100644
index 0000000..5d94cdb
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/union/MicroUnionAllRuntimeFactory.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.union;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class MicroUnionAllRuntimeFactory implements IPushRuntimeFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private final int inputArity;
+
+    public MicroUnionAllRuntimeFactory(int inputArity) {
+        this.inputArity = inputArity;
+    }
+
+    @Override
+    public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) {
+        MicroUnionAllState sharedState = new MicroUnionAllState();
+        IPushRuntime[] result = new IPushRuntime[inputArity];
+        for (int i = 0; i < inputArity; i++) {
+            result[i] = new MicroUnionAllPushRuntime(i, sharedState);
+        }
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "union-all";
+    }
+
+    private final class MicroUnionAllState {
+        private boolean failed;
+    }
+
+    private final class MicroUnionAllPushRuntime implements IPushRuntime {
+
+        private final int idx;
+
+        private final MicroUnionAllState sharedState;
+
+        private IFrameWriter writer;
+
+        MicroUnionAllPushRuntime(int idx, MicroUnionAllState sharedState) {
+            this.idx = idx;
+            this.sharedState = sharedState;
+        }
+
+        @Override
+        public void setOutputFrameWriter(int index, IFrameWriter writer, 
RecordDescriptor recordDesc) {
+            if (index != 0) {
+                throw new IllegalArgumentException(String.valueOf(index));
+            }
+            this.writer = writer;
+        }
+
+        @Override
+        public void setInputRecordDescriptor(int index, RecordDescriptor 
recordDescriptor) {
+        }
+
+        @Override
+        public void open() throws HyracksDataException {
+            if (idx == 0) {
+                writer.open();
+            }
+        }
+
+        @Override
+        public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+            writer.nextFrame(buffer);
+        }
+
+        @Override
+        public void fail() throws HyracksDataException {
+            boolean failed = sharedState.failed;
+            sharedState.failed = true;
+            if (!failed) {
+                writer.fail();
+            }
+        }
+
+        @Override
+        public void close() throws HyracksDataException {
+            if (idx == 0) {
+                writer.close();
+            }
+        }
+    }
+}
diff --git 
a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
 
b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
index cc4c1b9..1113ab6 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
@@ -22,6 +22,7 @@
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.io.FileUtils;
@@ -439,7 +440,7 @@
         RecordDescriptor aggDesc = new RecordDescriptor(
                 new ISerializerDeserializer[] { 
IntegerSerializerDeserializer.INSTANCE });
         AlgebricksPipeline pipeline = new AlgebricksPipeline(new 
IPushRuntimeFactory[] { nts, agg },
-                new RecordDescriptor[] { ntsDesc, aggDesc });
+                new RecordDescriptor[] { ntsDesc, aggDesc }, null, null);
         NestedPlansAccumulatingAggregatorFactory npaaf = new 
NestedPlansAccumulatingAggregatorFactory(
                 new AlgebricksPipeline[] { pipeline }, new int[] { 3 }, new 
int[] {});
         RecordDescriptor gbyDesc = new RecordDescriptor(new 
ISerializerDeserializer[] {
@@ -779,10 +780,10 @@
                 new ISerializerDeserializer[] { 
IntegerSerializerDeserializer.INSTANCE });
 
         AlgebricksPipeline pipeline = new AlgebricksPipeline(new 
IPushRuntimeFactory[] { nts, assign2, project1 },
-                new RecordDescriptor[] { assign1Desc, assign2Desc, 
project1Desc });
+                new RecordDescriptor[] { assign1Desc, assign2Desc, 
project1Desc }, null, null);
 
-        SubplanRuntimeFactory subplan = new SubplanRuntimeFactory(pipeline,
-                new IMissingWriterFactory[] { 
NoopMissingWriterFactory.INSTANCE }, assign1Desc, null);
+        SubplanRuntimeFactory subplan = new 
SubplanRuntimeFactory(Collections.singletonList(pipeline),
+                new IMissingWriterFactory[] { 
NoopMissingWriterFactory.INSTANCE }, assign1Desc, null, null);
 
         RecordDescriptor subplanDesc = new RecordDescriptor(new 
ISerializerDeserializer[] {
                 IntegerSerializerDeserializer.INSTANCE, 
IntegerSerializerDeserializer.INSTANCE });
@@ -850,7 +851,7 @@
         RecordDescriptor aggDesc = new RecordDescriptor(
                 new ISerializerDeserializer[] { 
IntegerSerializerDeserializer.INSTANCE });
         AlgebricksPipeline pipeline = new AlgebricksPipeline(new 
IPushRuntimeFactory[] { nts, agg },
-                new RecordDescriptor[] { ntsDesc, aggDesc });
+                new RecordDescriptor[] { ntsDesc, aggDesc }, null, null);
         NestedPlansAccumulatingAggregatorFactory npaaf = new 
NestedPlansAccumulatingAggregatorFactory(
                 new AlgebricksPipeline[] { pipeline }, new int[] { 3 }, new 
int[] {});
         RecordDescriptor gbyDesc = new RecordDescriptor(new 
ISerializerDeserializer[] {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
index 9f66080..13ebd6e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
@@ -89,6 +89,8 @@
 
     private transient int connectorIdCounter;
 
+    private transient List<? extends IOperatorDescriptor> metaOps;
+
     // This constructor uses the default frame size. It is for test purposes 
only.
     // For other use cases, use the one which sets the frame size.
     public JobSpecification() {
@@ -308,6 +310,14 @@
         return requiredClusterCapacity;
     }
 
+    public void setMetaOps(List<? extends IOperatorDescriptor> metaOps) {
+        this.metaOps = metaOps;
+    }
+
+    public List<? extends IOperatorDescriptor> getMetaOps() {
+        return metaOps;
+    }
+
     private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int 
index, V value) {
         List<V> vList = map.computeIfAbsent(key, k -> new ArrayList<>());
         extend(vList, index);

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2277
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I11be926f175889978c144dd4483ec565d3d86e2d
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Dmitry Lychagin <[email protected]>

Reply via email to