Dmitry Lychagin has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2277
Change subject: [ASTERIXDB-2244][RTM] Implement micro union-all operator
......................................................................
[ASTERIXDB-2244][RTM] Implement micro union-all operator
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- implement support for binary micro operators in subplans
- implement micro union-all operator
- fix free variables visitor
Change-Id: I11be926f175889978c144dd4483ec565d3d86e2d
---
M asterixdb/LICENSE
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.1.ddl.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.2.update.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.3.query.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/non_unary_subplan_01/non_unary_subplan_01.1.adm
M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
M asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
M
asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
M
hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
A
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnionAllPOperator.java
A
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroUnionAllPOperator.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
M
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
M
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
M
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
M
hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
M
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/AlgebricksPipeline.java
M
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
M
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
M
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
M
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
M
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
M
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
M
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
M
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
M
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
M
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
M
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
M
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
A
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/union/MicroUnionAllRuntimeFactory.java
M
hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
42 files changed, 1,007 insertions(+), 612 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/77/2277/1
diff --git a/asterixdb/LICENSE b/asterixdb/LICENSE
index bff778e..18245c5 100644
--- a/asterixdb/LICENSE
+++ b/asterixdb/LICENSE
@@ -273,6 +273,184 @@
Bootstrap-based projects, I would certainly appreciate any form of support,
even a nice Tweet is enough. Of course if you want, you can say thank you
and
support me by buying more icons on GLYPHICONS.com.
+---
+ Portions of the AsterixDB QueryUI
+ located at:
+ asterix-app/src/main/resources/queryui/js/json-formatter.min.js,
+ and
+ asterix-app/src/main/resources/queryui/css/json-formatter.min.css
+
+ are available under the following license:
+---
+ Copyright 2014 Mohsen Azimi
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+---
+ Portions of the AsterixDB QueryUI
+ located at:
+ asterix-app/src/main/resources/queryui/js/bootstrap.min.js,
+ asterix-app/src/main/resources/queryui/css/bootstrap.min.css,
+ asterix-app/src/main/resources/queryui/css/bootstrap-theme.min.css,
+ asterix-app/src/main/resources/queryui/css/bootstrap-theme.min.css,
+
asterix-app/src/main/resources/queryui/fonts/glyphicons-halflings-regular.eot,
+
asterix-app/src/main/resources/queryui/fonts/glyphicons-halflings-regular.svg,
+
asterix-app/src/main/resources/queryui/fonts/glyphicons-halflings-regular.ttf,
+
asterix-app/src/main/resources/queryui/fonts/glyphicons-halflings-regular.woff,
+ and
+
asterix-app/src/main/resources/queryui/fonts/glyphicons-halflings-regular.woff2
+
+ are available under the following license:
+---
+ The MIT License (MIT)
+
+ Copyright (c) 2011-2015 Twitter, Inc
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"), to
deal
+ in the Software without restriction, including without limitation the rights
+ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ copies of the Software, and to permit persons to whom the Software is
+ furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included in
+ all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM,
+ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ THE SOFTWARE.
+
+ queryui/css/bootstrap.min.css also contains normalize.css v3.0.3
+ (http://github.com/necolas/normalize.css), having the following copyright:
+ --
+ Copyright © Nicolas Gallagher and Jonathan Neal
+
+ Permission is hereby granted, free of charge, to any person obtaining a
copy of
+ this software and associated documentation files (the "Software"), to deal
in
+ the Software without restriction, including without limitation the rights to
+ use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of
+ the Software, and to permit persons to whom the Software is furnished to do
so,
+ subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included in
all
+ copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS
+ FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS
OR
+ COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
WHETHER
+ IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ --
+ Credit for font files: queryui/fonts/glyphicons-halflings-regular.eot,
+ queryui/fonts/glyphicons-halflings-regular.svg,
+ queryui/fonts/glyphicons-halflings-regular.ttf,
+ queryui/fonts/glyphicons-halflings-regular.woff,
+ and
+ queryui/fonts/glyphicons-halflings-regular.woff2
+
+ GLYPHICONS Halflings font is also released as an extension of a Bootstrap
+ (www.getbootstrap.com) for free and it is released under the same license as
+ Bootstrap. While you are not required to include attribution on your
+ Bootstrap-based projects, I would certainly appreciate any form of support,
+ even a nice Tweet is enough. Of course if you want, you can say thank you
and
+ support me by buying more icons on GLYPHICONS.com.
+---
+
+ Portions of the AsterixDB QueryUI
+ located at:
+ asterix-app/src/main/resources/queryui/js/angular.min.js
+
+ are available under The MIT License:
+---
+ Copyright (c) 2010-2016 Google, Inc. http://angularjs.org
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"), to
deal
+ in the Software without restriction, including without limitation the rights
+ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ copies of the Software, and to permit persons to whom the Software is
+ furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included in
+ all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM,
+ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ THE SOFTWARE.
+---
+ Portions of the AsterixDB QueryUI
+ located at:
+ asterix-app/src/main/resources/queryui/js/codemirror.js,
+ asterix-app/src/main/resources/queryui/js/javascript.js,
+ and
+ asterix-app/src/main/resources/queryui/css/codemirror.css
+
+ are available under The MIT License:
+---
+ Copyright (C) 2016 by Marijn Haverbeke <[email protected]> and others
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"), to
deal
+ in the Software without restriction, including without limitation the rights
+ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ copies of the Software, and to permit persons to whom the Software is
+ furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included in
+ all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM,
+ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ THE SOFTWARE.
+---
+ Portions of the AsterixDB QueryUI
+ located at:
+ asterix-app/src/main/resources/queryui/js/ui-codemirror.js
+
+ are available under The MIT License:
+---
+ Copyright (c) 2012 the AngularUI Team, http://angular-ui.github.com
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"), to
deal
+ in the Software without restriction, including without limitation the rights
+ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ copies of the Software, and to permit persons to whom the Software is
+ furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included in
+ all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM,
+ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ THE SOFTWARE.
+---
Portions of the AsterixDB runtime
located at:
asterix-hivecompat/src/main/java/org/apache/asterix/hivecompat/io/*
@@ -673,357 +851,3 @@
such warranty or additional liability.
END OF TERMS AND CONDITIONS
---
-File Saver Portions of the AsterixDB Dashboard located at:
-
asterix-app/src/main/resources/dashboard/src/app/dashboard/query/output.component.ts
-
asterix-app/src/main/resources/dashboard/static/main.37b7b7cad656490b195a.bundle.js
-
asterix-app/src/main/resources/dashboard/static/styles.9f50282210bba5318775.bundle
-
asterix-app/src/main/resources/dashboard/static/scripts.da68998bdd77aff4e764.bundle.js
-
asterix-app/src/main/resources/dashboard/static/polyfills.32ca5670d6503e090789.bundle.js
-
asterix-app/src/main/resources/dashboard/static/inline.66bd6b83f86cf773a001.bundle.js
- asterix-app/src/main/resources/dashboard/static/index.html
-
- are available under The MIT License:
----
-The MIT License
-
-Copyright © 2016 Eli Grey.
-
-Permission is hereby granted, free of charge, to any person obtaining a copy of
-this software and associated documentation files (the "Software"), to deal in
-the Software without restriction, including without limitation the rights to
-use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of
-the Software, and to permit persons to whom the Software is furnished to do so,
-subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in all
-copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS
-FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
-COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
-IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
-CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
----
-ReactiveX/rxjs located at:
- asterix-app/src/main/resources/dashboard/src/app/dashboard/metadata/
-
asterix-app/src/main/resources/dashboard/src/app/dashboard/metadata/datasets-collection/
-
asterix-app/src/main/resources/dashboard/src/app/dashboard/metadata/datatypes-collection/
-
asterix-app/src/main/resources/dashboard/src/app/dashboard/metadata/dataverses-collection/
- asterix-app/src/main/resources/dashboard/src/app/dashboard/query/
- asterix-app/src/main/resources/dashboard/src/app/dashboard/
- asterix-app/src/main/resources/dashboard/src/app/
-
asterix-app/src/main/resources/dashboard/static/main.37b7b7cad656490b195a.bundle.js
-
asterix-app/src/main/resources/dashboard/static/styles.9f50282210bba5318775.bundle
-
asterix-app/src/main/resources/dashboard/static/scripts.da68998bdd77aff4e764.bundle.js
-
asterix-app/src/main/resources/dashboard/static/polyfills.32ca5670d6503e090789.bundle.js
-
asterix-app/src/main/resources/dashboard/static/inline.66bd6b83f86cf773a001.bundle.js
- asterix-app/src/main/resources/dashboard/static/index.html
-is licensed under the Apache License 2.0
----
-PrimeNG Portions of the AsterixDB Dashboard located at:
-
asterix-app/src/main/resources/dashboard/src/app/dashboard/query/output.component.ts
-
asterix-app/src/main/resources/dashboard/src/app/dashboard/query/output.component.html
-
asterix-app/src/main/resources/dashboard/src/app/dashboard/query/output.component.scss
-
asterix-app/src/main/resources/dashboard/src/app/dashboard/query/metadata.component.ts
-
asterix-app/src/main/resources/dashboard/src/app/dashboard/query/metadata.component.html
-
asterix-app/src/main/resources/dashboard/src/app/dashboard/query/metadata.component.scss
-
asterix-app/src/main/resources/dashboard/static/main.37b7b7cad656490b195a.bundle.js
-
asterix-app/src/main/resources/dashboard/static/styles.9f50282210bba5318775.bundle
-
asterix-app/src/main/resources/dashboard/static/scripts.da68998bdd77aff4e764.bundle.js
-
asterix-app/src/main/resources/dashboard/static/polyfills.32ca5670d6503e090789.bundle.js
-
asterix-app/src/main/resources/dashboard/static/inline.66bd6b83f86cf773a001.bundle.js
- asterix-app/src/main/resources/dashboard/static/index.html
-
-are available under The MIT License:
----
-The MIT License (MIT)
-
-Copyright (c) 2016-2017 PrimeTek
-
-Permission is hereby granted, free of charge, to any person obtaining a copy of
-this software and associated documentation files (the "Software"), to deal in
-the Software without restriction, including without limitation the rights to
-use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of
-the Software, and to permit persons to whom the Software is furnished to do so,
-subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in all
-copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS
-FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
-COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
-IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
-CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
----
-
-NGRX Portions of the AsterixDB Dashboard located at:
- asterix-app/src/main/resources/dashboard/src/app/dashboard/metadata/
-
asterix-app/src/main/resources/dashboard/src/app/dashboard/metadata/datasets-collection/
-
asterix-app/src/main/resources/dashboard/src/app/dashboard/metadata/datatypes-collection/
-
asterix-app/src/main/resources/dashboard/src/app/dashboard/metadata/dataverses-collection/
- asterix-app/src/main/resources/dashboard/src/app/dashboard/query/
- asterix-app/src/main/resources/dashboard/src/app/dashboard/
- asterix-app/src/main/resources/dashboard/src/app/dashboard/
- asterix-app/src/main/resources/dashboard/src/app/
-
asterix-app/src/main/resources/dashboard/static/main.37b7b7cad656490b195a.bundle.js
-
asterix-app/src/main/resources/dashboard/static/styles.9f50282210bba5318775.bundle
-
asterix-app/src/main/resources/dashboard/static/scripts.da68998bdd77aff4e764.bundle.js
-
asterix-app/src/main/resources/dashboard/static/polyfills.32ca5670d6503e090789.bundle.js
-
asterix-app/src/main/resources/dashboard/static/inline.66bd6b83f86cf773a001.bundle.js
- asterix-app/src/main/resources/dashboard/static/index.html
-
-are available under The MIT License:
----
-The MIT License (MIT)
-
-Copyright (c) 2017 Brandon Roberts, Mike Ryan, Victor Savkin, Rob Wormald
-
-Permission is hereby granted, free of charge, to any person obtaining a copy
-of this software and associated documentation files (the "Software"), to deal
-in the Software without restriction, including without limitation the rights
-to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-copies of the Software, and to permit persons to whom the Software is
-furnished to do so, subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in all
-copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-SOFTWARE.
----
-Codemirror Portions of the AsterixDB Dashboard located at:
-
asterix-app/src/main/resources/dashboard/src/app/dashboard/query/input.component.ts
-
asterix-app/src/main/resources/dashboard/src/app/dashboard/query/input.component.html
-
asterix-app/src/main/resources/dashboard/src/app/dashboard/query/input.component.scss
-
asterix-app/src/main/resources/dashboard/src/app/dashboard/query/codemirror.component.ts
-
asterix-app/src/main/resources/dashboard/src/app/dashboard/query/codemirror.component.scss
-
asterix-app/src/main/resources/dashboard/src/app/dashboard/metadata/input-metadata.component.ts
-
asterix-app/src/main/resources/dashboard/src/app/dashboard/metadata/input-metadata.component.html
-
asterix-app/src/main/resources/dashboard/src/app/dashboard/metadata/input-metadata.component.scss
-
asterix-app/src/main/resources/dashboard/src/app/dashboard/metadata/codemirror-metadata.component.ts
-
asterix-app/src/main/resources/dashboard/src/app/dashboard/metadata/codemirror-metadata.component.scss
-
asterix-app/src/main/resources/dashboard/static/main.37b7b7cad656490b195a.bundle.js
-
asterix-app/src/main/resources/dashboard/static/styles.9f50282210bba5318775.bundle
-
asterix-app/src/main/resources/dashboard/static/scripts.da68998bdd77aff4e764.bundle.js
-
asterix-app/src/main/resources/dashboard/static/polyfills.32ca5670d6503e090789.bundle.js
-
asterix-app/src/main/resources/dashboard/static/inline.66bd6b83f86cf773a001.bundle.js
- asterix-app/src/main/resources/dashboard/static/index.html
-
- are available under The MIT License:
----
- MIT License
-
-Copyright (C) 2017 by Marijn Haverbeke <[email protected]> and others
-
-Permission is hereby granted, free of charge, to any person obtaining a copy
-of this software and associated documentation files (the "Software"), to deal
-in the Software without restriction, including without limitation the rights
-to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-copies of the Software, and to permit persons to whom the Software is
-furnished to do so, subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in
-all copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-THE SOFTWARE.
-
----
-Angular Portions of the AsterixDB Dashboard located at:
- asterix-app/src/main/resources/dashboard/src/
-
asterix-app/src/main/resources/dashboard/static/main.37b7b7cad656490b195a.bundle.js
-
asterix-app/src/main/resources/dashboard/static/styles.9f50282210bba5318775.bundle
-
asterix-app/src/main/resources/dashboard/static/scripts.da68998bdd77aff4e764.bundle.js
-
asterix-app/src/main/resources/dashboard/static/polyfills.32ca5670d6503e090789.bundle.js
-
asterix-app/src/main/resources/dashboard/static/inline.66bd6b83f86cf773a001.bundle.js
- asterix-app/src/main/resources/dashboard/static/index.html
-
- are available under The MIT License:
----
-The MIT License
-
-Copyright (c) 2014-2017 Google, Inc. http://angular.io
-
-Permission is hereby granted, free of charge, to any person obtaining a copy
-of this software and associated documentation files (the "Software"), to deal
-in the Software without restriction, including without limitation the rights
-to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-copies of the Software, and to permit persons to whom the Software is
-furnished to do so, subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in
-all copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-THE SOFTWARE.
-
----
-Angular Material, hammerjs Portions of the AsterixDB Dashboard located at:
- asterix-app/src/main/resources/dashboard/src/app/dashboard/
- asterix-app/src/main/resources/dashboard/src/app/material.module.ts
- asterix-app/src/main/resources/dashboard/src/main.ts
-
asterix-app/src/main/resources/dashboard/static/main.37b7b7cad656490b195a.bundle.js
-
asterix-app/src/main/resources/dashboard/static/styles.9f50282210bba5318775.bundle
-
asterix-app/src/main/resources/dashboard/static/scripts.da68998bdd77aff4e764.bundle.js
-
asterix-app/src/main/resources/dashboard/static/polyfills.32ca5670d6503e090789.bundle.js
-
asterix-app/src/main/resources/dashboard/static/inline.66bd6b83f86cf773a001.bundle.js
- asterix-app/src/main/resources/dashboard/static/index.html
-
- are available under The MIT License:
----
- The MIT License
-
-Copyright (c) 2017 Google LLC.
-
-Permission is hereby granted, free of charge, to any person obtaining a copy
-of this software and associated documentation files (the "Software"), to deal
-in the Software without restriction, including without limitation the rights
-to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-copies of the Software, and to permit persons to whom the Software is
-furnished to do so, subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in
-all copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-THE SOFTWARE.
-
-
----
-Core JS Material Portions of the AsterixDB Dashboard located at:
- asterix-app/src/main/resources/dashboard/src/
-
asterix-app/src/main/resources/dashboard/static/main.37b7b7cad656490b195a.bundle.js
-
asterix-app/src/main/resources/dashboard/static/styles.9f50282210bba5318775.bundle
-
asterix-app/src/main/resources/dashboard/static/scripts.da68998bdd77aff4e764.bundle.js
-
asterix-app/src/main/resources/dashboard/static/polyfills.32ca5670d6503e090789.bundle.js
-
asterix-app/src/main/resources/dashboard/static/inline.66bd6b83f86cf773a001.bundle.js
- asterix-app/src/main/resources/dashboard/static/index.html
-
- are available under following License:
- https://github.com/zloirock/core-js/blob/master/LICENSE
----
-
-Copyright (c) 2014-2017 Denis Pushkarev
-
-Permission is hereby granted, free of charge, to any person obtaining a copy
-of this software and associated documentation files (the "Software"), to deal
-in the Software without restriction, including without limitation the rights
-to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-copies of the Software, and to permit persons to whom the Software is
-furnished to do so, subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in
-all copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-THE SOFTWARE.
----
-Roboto Font Portions of the AsterixDB Dashboard located at:
- asterix-app/src/main/resources/dashboard/src/index.html
-
asterix-app/src/main/resources/dashboard/static/roboto-v15-latin-regular.3d3a53586bd78d1069ae.svg
-
asterix-app/src/main/resources/dashboard/static/roboto-v15-latin-regular.7e367be02cd17a96d513.woff2
-
asterix-app/src/main/resources/dashboard/static/roboto-v15-latin-regular.9f916e330c478bbfa2a0.eot
-
asterix-app/src/main/resources/dashboard/static/roboto-v15-latin-regular.16e1d930cf13fb7a9563.woff
-
asterix-app/src/main/resources/dashboard/static/roboto-v15-latin-regular.38861cba61c66739c145.ttf
- asterix-app/src/main/resources/dashboard/static/index.html
- are available under following License: Apache2.0
- https://github.com/google/roboto/blob/master/LICENSE
----
-Font Awesome Portions of the AsterixDB Dashboard, used in primeng components
located at:
- asterix-app/src/main/resources/dashboard/src/index.html
- asterix-app/src/main/resources/dashboard/static/index.html
-
- http://fontawesome.io/license/ Applies to all CSS and LESS files in the
- following directories if exist: font-awesome/css/, font-awesome/less/, and
- font-awesome/scss/. and downloaded from CDN network services and loaded in
- index.html License: MIT License URL:
- http://opensource.org/licenses/mit-license.html
----
[email protected] portions of the AsterixDB Dashboard
- located at:
- asterix-app/src/main/resources/dashboard/
- are available under The MIT License:
----
-MIT
-Copyright JS Foundation and other contributors
-
-Permission is hereby granted, free of charge, to any person obtaining
-a copy of this software and associated documentation files (the
-'Software'), to deal in the Software without restriction, including
-without limitation the rights to use, copy, modify, merge, publish,
-distribute, sublicense, and/or sell copies of the Software, and to
-permit persons to whom the Software is furnished to do so, subject to
-the following conditions:
-
-The above copyright notice and this permission notice shall be
-included in all copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND,
-EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
-MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
-IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
-CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
-TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
-SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
----
[email protected] Portions of the AsterixDB Dashboard located at:
- asterix-app/src/main/resources/dashboard/src/
-
asterix-app/src/main/resources/dashboard/static/main.37b7b7cad656490b195a.bundle.js
-
asterix-app/src/main/resources/dashboard/static/styles.9f50282210bba5318775.bundle
-
asterix-app/src/main/resources/dashboard/static/scripts.da68998bdd77aff4e764.bundle.js
-
asterix-app/src/main/resources/dashboard/static/polyfills.32ca5670d6503e090789.bundle.js
-
asterix-app/src/main/resources/dashboard/static/inline.66bd6b83f86cf773a001.bundle.js
- asterix-app/src/main/resources/dashboard/static/index.html
- are available under The MIT License:
----
-MIT
-The MIT License
-
-Copyright (c) 2016 Google, Inc.
-
-Permission is hereby granted, free of charge, to any person obtaining a copy
-of this software and associated documentation files (the "Software"), to deal
-in the Software without restriction, including without limitation the rights
-to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-copies of the Software, and to permit persons to whom the Software is
-furnished to do so, subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in
-all copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-THE SOFTWARE.
-
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
index d22ec54..802635b 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetAsterixPhysicalOperatorsRule.java
@@ -80,20 +80,20 @@
return false;
}
- computeDefaultPhysicalOp(op, context);
+ computeDefaultPhysicalOp(op, true, context);
context.addToDontApplySet(this, op);
return true;
}
- private static void setPhysicalOperators(ILogicalPlan plan,
IOptimizationContext context)
+ private static void setPhysicalOperators(ILogicalPlan plan, boolean
topLevelOp, IOptimizationContext context)
throws AlgebricksException {
for (Mutable<ILogicalOperator> root : plan.getRoots()) {
- computeDefaultPhysicalOp((AbstractLogicalOperator)
root.getValue(), context);
+ computeDefaultPhysicalOp((AbstractLogicalOperator)
root.getValue(), topLevelOp, context);
}
}
- private static void computeDefaultPhysicalOp(AbstractLogicalOperator op,
IOptimizationContext context)
- throws AlgebricksException {
+ private static void computeDefaultPhysicalOp(AbstractLogicalOperator op,
boolean topLevelOp,
+ IOptimizationContext context) throws AlgebricksException {
PhysicalOptimizationConfig physicalOptimizationConfig =
context.getPhysicalOptimizationConfig();
if (op.getOperatorTag().equals(LogicalOperatorTag.GROUP)) {
GroupByOperator gby = (GroupByOperator) op;
@@ -209,11 +209,11 @@
if (op.getPhysicalOperator() == null) {
switch (op.getOperatorTag()) {
case INNERJOIN: {
-
JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, context);
+
JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, topLevelOp,
context);
break;
}
case LEFTOUTERJOIN: {
-
JoinUtils.setJoinAlgorithmAndExchangeAlgo((LeftOuterJoinOperator) op, context);
+
JoinUtils.setJoinAlgorithmAndExchangeAlgo((LeftOuterJoinOperator) op,
topLevelOp, context);
break;
}
case UNNEST_MAP:
@@ -279,11 +279,11 @@
if (op.hasNestedPlans()) {
AbstractOperatorWithNestedPlans nested =
(AbstractOperatorWithNestedPlans) op;
for (ILogicalPlan p : nested.getNestedPlans()) {
- setPhysicalOperators(p, context);
+ setPhysicalOperators(p, false, context);
}
}
for (Mutable<ILogicalOperator> opRef : op.getInputs()) {
- computeDefaultPhysicalOp((AbstractLogicalOperator)
opRef.getValue(), context);
+ computeDefaultPhysicalOp((AbstractLogicalOperator)
opRef.getValue(), topLevelOp, context);
}
}
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
index 8ab8b1d..461614a 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
@@ -19,6 +19,7 @@
package org.apache.asterix.optimizer.rules.subplan;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -56,10 +57,8 @@
import
org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-import com.google.common.collect.ImmutableSet;
-
/*
-This rule is to remove SubplanOperators containing DataScan, InnerJoin,
LeftOuterJoin, UnionAll or Distinct. Given a qualified Subplan operator called
S1,
+This rule is to remove SubplanOperators containing DataScan, InnerJoin,
LeftOuterJoin. Given a qualified Subplan operator called S1,
Let's call its input operator O1.
General Cases
@@ -348,9 +347,8 @@
IOptimizationContext context) throws AlgebricksException {
SubplanOperator subplanOp = (SubplanOperator) opRef.getValue();
if (!SubplanFlatteningUtil.containsOperators(subplanOp,
- ImmutableSet.of(LogicalOperatorTag.DATASOURCESCAN,
LogicalOperatorTag.INNERJOIN,
- // We don't have nested runtime for union-all and
distinct hence we have to include them here.
- LogicalOperatorTag.LEFTOUTERJOIN,
LogicalOperatorTag.UNIONALL, LogicalOperatorTag.DISTINCT))) {
+ EnumSet.of(LogicalOperatorTag.DATASOURCESCAN,
LogicalOperatorTag.INNERJOIN,
+ LogicalOperatorTag.LEFTOUTERJOIN))) {
return new Pair<>(false, new LinkedHashMap<>());
}
Mutable<ILogicalOperator> inputOpRef = subplanOp.getInputs().get(0);
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java
index 377e96d..fa70c1e 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.optimizer.rules.subplan;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -208,7 +209,7 @@
if (!OperatorManipulationUtil.ancestorOfOperators(
subplanOp.getNestedPlans().get(0).getRoots().get(0).getValue(),
// we don't need to check recursively for this special
rewriting.
- ImmutableSet.of(LogicalOperatorTag.INNERJOIN,
LogicalOperatorTag.LEFTOUTERJOIN))) {
+ EnumSet.of(LogicalOperatorTag.INNERJOIN,
LogicalOperatorTag.LEFTOUTERJOIN))) {
return new Pair<Boolean, ILogicalOperator>(false, null);
}
SubplanSpecialFlatteningCheckVisitor visitor = new
SubplanSpecialFlatteningCheckVisitor();
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 23a3eda..bd02fd0 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -233,7 +233,7 @@
projectionList[projCount++] = i;
}
IPushRuntime assignOp = new AssignRuntimeFactory(outColumns,
secondaryFieldAccessEvalFactories,
- projectionList, true).createPushRuntime(ctx);
+ projectionList, true).createPushRuntime(ctx)[0];
insertOp.setOutputFrameWriter(0, assignOp,
primaryIndexInfo.rDesc);
assignOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc);
SecondaryIndexInfo secondaryIndexInfo = new
SecondaryIndexInfo(primaryIndexInfo, secondaryIndex);
@@ -268,7 +268,7 @@
NoMergePolicyFactory mergePolicyFactory, Map<String, String>
mergePolicyProperties, int[] filterFields,
int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
StorageComponentProvider storageComponentProvider) throws
HyracksDataException, AlgebricksException {
- IPushRuntime emptyTupleOp = new
EmptyTupleSourceRuntimeFactory().createPushRuntime(ctx);
+ IPushRuntime emptyTupleOp = new
EmptyTupleSourceRuntimeFactory().createPushRuntime(ctx)[0];
JobSpecification spec = new JobSpecification();
PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset,
primaryKeyTypes, recordType, metaType,
mergePolicyFactory, mergePolicyProperties, filterFields,
primaryKeyIndexes, primaryKeyIndicators);
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.1.ddl.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.1.ddl.sqlpp
new file mode 100644
index 0000000..c1ce4a3
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.1.ddl.sqlpp
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use test;
+
+create type TType as open
+{ id: bigint };
+
+create dataset TData (TType) primary key id;
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.2.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.2.update.sqlpp
new file mode 100644
index 0000000..3ce9554
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.2.update.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+insert into TData ( [
+{'id':1, 'x':1, 'f':19},
+{'id':2, 'x':2, 'f':12},
+{'id':3, 'x':1, 'f':10},
+{'id':4, 'x':2, 'f':17},
+{'id':5, 'x':1, 'f':12}
+]);
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.3.query.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.3.query.sqlpp
new file mode 100644
index 0000000..b825157
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/non_unary_subplan_01/non_unary_subplan_01.3.query.sqlpp
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+select x,
+array_sum((
+ select value a.f
+ from g as p
+ union all
+ select value a.f
+ from g as w
+)) s
+from TData as a
+group by a.x as x group as g
+order by x
+;
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/non_unary_subplan_01/non_unary_subplan_01.1.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/non_unary_subplan_01/non_unary_subplan_01.1.adm
new file mode 100644
index 0000000..5071183
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/subquery/non_unary_subplan_01/non_unary_subplan_01.1.adm
@@ -0,0 +1,2 @@
+{ "x": 1, "s": 82 }
+{ "x": 2, "s": 58 }
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 14784d6..6779bad 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -6422,6 +6422,11 @@
</compilation-unit>
</test-case>
<test-case FilePath="subquery">
+ <compilation-unit name="non_unary_subplan_01">
+ <output-dir compare="Text">non_unary_subplan_01</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="subquery">
<compilation-unit name="query-ASTERIXDB-1571">
<output-dir compare="Text">query-ASTERIXDB-1571</output-dir>
</compilation-unit>
diff --git
a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 3ad7fd5..8341a33 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -125,7 +125,7 @@
1060 = Field \"%1$s\" in the with clause must be of type %2$s
1061 = Field \"%1$s\" in the with clause must contain sub field \"%2$s\"
1062 = Merge policy parameters cannot be of type %1$s
-1063 = There is no dataverse with name %1$s
+1063 = There is no dataverse with name \"%1$s\"
# Feed Errors
3001 = Illegal state.
diff --git
a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java
b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java
index 1a54b92..f7c4425 100644
---
a/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java
+++
b/asterixdb/asterix-lang-sqlpp/src/main/java/org/apache/asterix/lang/sqlpp/visitor/FreeVariableVisitor.java
@@ -23,7 +23,6 @@
import java.util.List;
import org.apache.asterix.common.exceptions.CompilationException;
-import org.apache.asterix.lang.common.base.Clause.ClauseType;
import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.clause.GroupbyClause;
import org.apache.asterix.lang.common.clause.LetClause;
@@ -105,19 +104,15 @@
// Visits join/unnest/nest clauses.
for (AbstractBinaryCorrelateClause correlateClause :
fromTerm.getCorrelateClauses()) {
+ // Adds binding variables.
+ bindingVariables.add(correlateClause.getRightVariable());
+ if (correlateClause.hasPositionalVariable()) {
+ bindingVariables.add(correlateClause.getPositionalVariable());
+ }
+
Collection<VariableExpr> correlateFreeVars = new HashSet<>();
correlateClause.accept(this, correlateFreeVars);
- if (correlateClause.getClauseType() != ClauseType.JOIN_CLAUSE) {
- // Correlation is allowed if the clause is not a join clause,
- // therefore we remove left-side binding variables for these
cases.
- correlateFreeVars.removeAll(bindingVariables);
-
- // Adds binding variables.
- bindingVariables.add(correlateClause.getRightVariable());
- if (correlateClause.hasPositionalVariable()) {
-
bindingVariables.add(correlateClause.getPositionalVariable());
- }
- }
+ correlateFreeVars.removeAll(bindingVariables);
freeVars.addAll(correlateFreeVars);
}
return null;
@@ -410,7 +405,7 @@
public Void visit(IndexAccessor ia, Collection<VariableExpr> freeVars)
throws CompilationException {
ia.getExpr().accept(this, freeVars);
if (ia.getIndexExpr() != null) {
- ia.getIndexExpr();
+ ia.getIndexExpr().accept(this, freeVars);
}
return null;
}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
index 445ad4a..1971eb0 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
@@ -51,10 +51,11 @@
}
@Override
- public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws
HyracksDataException {
+ public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws
HyracksDataException {
IJobletEventListenerFactory fact =
ctx.getJobletContext().getJobletEventListenerFactory();
- return new CommitRuntime(ctx, ((IJobEventListenerFactory)
fact).getTxnId(datasetId), datasetId,
- primaryKeyFields, isWriteTransaction,
-
datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink);
+ return new IPushRuntime[] {
+ new CommitRuntime(ctx, ((IJobEventListenerFactory)
fact).getTxnId(datasetId), datasetId,
+ primaryKeyFields, isWriteTransaction,
+
datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink) };
}
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
index 79b8f38..2d6123e 100644
---
a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
+++
b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java
@@ -104,7 +104,7 @@
clusterLocations);
PlanCompiler pc = new PlanCompiler(context);
- return pc.compilePlan(plan, null,
jobEventListenerFactory);
+ return pc.compilePlan(plan, jobEventListenerFactory);
}
};
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index 75b63f1..db9728b 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -44,6 +44,7 @@
MATERIALIZE,
MICRO_PRE_CLUSTERED_GROUP_BY,
MICRO_PRE_SORTED_DISTINCT_BY,
+ MICRO_UNION_ALL,
NESTED_LOOP,
NESTED_TUPLE_SOURCE,
ONE_TO_ONE_EXCHANGE,
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
index 0fb667a..4b9ae89 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
@@ -72,8 +74,8 @@
}
protected PhysicalRequirements emptyUnaryRequirements() {
- StructuralPropertiesVector[] req = new StructuralPropertiesVector[] {
- StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR };
+ StructuralPropertiesVector[] req =
+ new StructuralPropertiesVector[] {
StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR };
return new PhysicalRequirements(req,
IPartitioningRequirementsCoordinator.NO_COORDINATION);
}
@@ -97,7 +99,7 @@
/**
* @return labels (0 or 1) for each input and output indicating the
dependency between them.
- * The edges labeled as 1 must wait for the edges with label 0.
+ * The edges labeled as 1 must wait for the edges with label 0.
*/
@Override
public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator
op) {
@@ -117,48 +119,63 @@
protected AlgebricksPipeline[] compileSubplans(IOperatorSchema
outerPlanSchema,
AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema,
JobGenContext context)
- throws AlgebricksException {
- AlgebricksPipeline[] subplans = new
AlgebricksPipeline[npOp.getNestedPlans().size()];
+ throws AlgebricksException {
+ List<List<AlgebricksPipeline>> subplans =
compileSubplansImpl(outerPlanSchema, npOp, opSchema, context);
+ int n = subplans.size();
+ AlgebricksPipeline[] result = new AlgebricksPipeline[n];
+ for (int i = 0; i < n; i++) {
+ List<AlgebricksPipeline> subplanOps = subplans.get(i);
+ if (subplanOps.size() != 1) {
+ throw new AlgebricksException("Attempting to construct a
nested plan with " + subplanOps.size()
+ + " operator descriptors. Currently, nested plans can
only consist in linear pipelines of Asterix micro operators.");
+ }
+ result[i] = subplanOps.get(0);
+ }
+ return result;
+ }
+
+ protected List<List<AlgebricksPipeline>>
compileSubplansImpl(IOperatorSchema outerPlanSchema,
+ AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema,
JobGenContext context)
+ throws AlgebricksException {
+ List<List<AlgebricksPipeline>> subplans = new
ArrayList<>(npOp.getNestedPlans().size());
PlanCompiler pc = new PlanCompiler(context);
- int i = 0;
for (ILogicalPlan p : npOp.getNestedPlans()) {
- subplans[i++] = buildPipelineWithProjection(p, outerPlanSchema,
npOp, opSchema, pc);
+ subplans.add(buildPipelineWithProjection(p, outerPlanSchema, npOp,
opSchema, pc));
}
return subplans;
}
- private AlgebricksPipeline buildPipelineWithProjection(ILogicalPlan p,
IOperatorSchema outerPlanSchema,
+ private List<AlgebricksPipeline> buildPipelineWithProjection(ILogicalPlan
p, IOperatorSchema outerPlanSchema,
AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema,
PlanCompiler pc)
- throws AlgebricksException {
+ throws AlgebricksException {
if (p.getRoots().size() > 1) {
throw new NotImplementedException("Nested plans with several roots
are not supported.");
}
- JobSpecification nestedJob = pc.compilePlan(p, outerPlanSchema, null);
+ JobSpecification nestedJob = pc.compileNestedPlan(p, outerPlanSchema);
ILogicalOperator topOpInSubplan = p.getRoots().get(0).getValue();
JobGenContext context = pc.getContext();
IOperatorSchema topOpInSubplanScm = context.getSchema(topOpInSubplan);
opSchema.addAllVariables(topOpInSubplanScm);
Map<OperatorDescriptorId, IOperatorDescriptor> opMap =
nestedJob.getOperatorMap();
- if (opMap.size() != 1) {
- throw new AlgebricksException("Attempting to construct a nested
plan with " + opMap.size()
- + " operator descriptors. Currently, nested plans can only
consist in linear pipelines of Asterix micro operators.");
- }
-
- for (Map.Entry<OperatorDescriptorId, IOperatorDescriptor> opEntry :
opMap.entrySet()) {
- IOperatorDescriptor opd = opEntry.getValue();
- if (!(opd instanceof AlgebricksMetaOperatorDescriptor)) {
- throw new AlgebricksException(
- "Can only generate Hyracks jobs for pipelinable
Asterix nested plans, not for "
- + opd.getClass().getName());
+ List<? extends IOperatorDescriptor> metaOps = nestedJob.getMetaOps();
+ if (opMap.size() != metaOps.size()) {
+ for (IOperatorDescriptor opd : opMap.values()) {
+ if (!(opd instanceof AlgebricksMetaOperatorDescriptor)) {
+ throw new AlgebricksException(
+ "Can only generate Hyracks jobs for pipelinable
Asterix nested plans, not for " + opd
+ .getClass().getName());
+ }
}
- AlgebricksMetaOperatorDescriptor amod =
(AlgebricksMetaOperatorDescriptor) opd;
-
- return amod.getPipeline();
- // we suppose that the top operator in the subplan already does the
- // projection for us
+ throw new IllegalStateException();
}
- throw new IllegalStateException();
+ List<AlgebricksPipeline> result = new ArrayList<>(metaOps.size());
+ for (IOperatorDescriptor opd : metaOps) {
+ AlgebricksMetaOperatorDescriptor amod =
(AlgebricksMetaOperatorDescriptor) opd;
+ result.add(amod.getPipeline());
+
+ }
+ return result;
}
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnionAllPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnionAllPOperator.java
new file mode 100644
index 0000000..5586d8e
--- /dev/null
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractUnionAllPOperator.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import
org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import
org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
+import
org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
+
+public abstract class AbstractUnionAllPOperator extends
AbstractPhysicalOperator {
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator op,
IOptimizationContext context) {
+ AbstractLogicalOperator op2 = (AbstractLogicalOperator)
op.getInputs().get(0).getValue();
+ IPartitioningProperty pp =
op2.getDeliveredPhysicalProperties().getPartitioningProperty();
+ this.deliveredProperties = new StructuralPropertiesVector(pp, new
ArrayList<>(0));
+ }
+
+ @Override
+ public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
+ IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
+ StructuralPropertiesVector pv0 =
OperatorPropertiesUtil.checkUnpartitionedAndGetPropertiesVector(op,
+ new StructuralPropertiesVector(new
RandomPartitioningProperty(context.getComputationNodeDomain()),
+ null));
+ StructuralPropertiesVector pv1 =
OperatorPropertiesUtil.checkUnpartitionedAndGetPropertiesVector(op,
+ new StructuralPropertiesVector(new
RandomPartitioningProperty(context.getComputationNodeDomain()),
+ null));
+ return new PhysicalRequirements(new StructuralPropertiesVector[] {
pv0, pv1 },
+ IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
+}
\ No newline at end of file
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroUnionAllPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroUnionAllPOperator.java
new file mode 100644
index 0000000..bbefaaa
--- /dev/null
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MicroUnionAllPOperator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import
org.apache.hyracks.algebricks.runtime.operators.union.MicroUnionAllRuntimeFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+
+public class MicroUnionAllPOperator extends AbstractUnionAllPOperator {
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.MICRO_UNION_ALL;
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return true;
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder,
JobGenContext context, ILogicalOperator op,
+ IOperatorSchema opSchema, IOperatorSchema[] inputSchemas,
IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+
+ RecordDescriptor recordDescriptor =
+
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
context);
+
+ List<Mutable<ILogicalOperator>> inputs = op.getInputs();
+ int nInputs = inputs.size();
+
+ MicroUnionAllRuntimeFactory runtime = new
MicroUnionAllRuntimeFactory(nInputs);
+ builder.contributeMicroOperator(op, runtime, recordDescriptor);
+
+ for (int i = 0; i < nInputs; i++) {
+ ILogicalOperator src = inputs.get(i).getValue();
+ builder.contributeGraphEdge(src, 0, op, i);
+ }
+ }
+}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
index d43ddab..6f7a300 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SubplanPOperator.java
@@ -89,18 +89,17 @@
if (subplan.getNestedPlans().size() != 1) {
throw new NotImplementedException("Subplan currently works only
for one nested plan with one root.");
}
- AlgebricksPipeline[] subplans = compileSubplans(inputSchemas[0],
subplan, opSchema, context);
- assert subplans.length == 1;
- AlgebricksPipeline np = subplans[0];
+ List<List<AlgebricksPipeline>> subplans =
compileSubplansImpl(inputSchemas[0], subplan, opSchema, context);
+ assert subplans.size() == 1;
+ List<AlgebricksPipeline> np = subplans.get(0);
RecordDescriptor inputRecordDesc = JobGenHelper.mkRecordDescriptor(
context.getTypeEnvironment(op.getInputs().get(0).getValue()),
inputSchemas[0], context);
- IMissingWriterFactory[] missingWriterFactories = new
IMissingWriterFactory[np.getOutputWidth()];
+ IMissingWriterFactory[] missingWriterFactories = new
IMissingWriterFactory[np.get(0).getOutputWidth()];
for (int i = 0; i < missingWriterFactories.length; i++) {
missingWriterFactories[i] = context.getMissingWriterFactory();
}
- SubplanRuntimeFactory runtime = new SubplanRuntimeFactory(np,
missingWriterFactories, inputRecordDesc, null);
-
RecordDescriptor recDesc =
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
context);
+ SubplanRuntimeFactory runtime = new SubplanRuntimeFactory(np,
missingWriterFactories, inputRecordDesc, recDesc, null);
builder.contributeMicroOperator(subplan, runtime, recDesc);
ILogicalOperator src = op.getInputs().get(0).getValue();
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
index d184161..85dc0e0 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
@@ -18,29 +18,22 @@
*/
package org.apache.hyracks.algebricks.core.algebra.operators.physical;
-import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
-import
org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
-import
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import
org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import
org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
-import
org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.dataflow.std.union.UnionAllOperatorDescriptor;
-public class UnionAllPOperator extends AbstractPhysicalOperator {
+public class UnionAllPOperator extends AbstractUnionAllPOperator {
@Override
public PhysicalOperatorTag getOperatorTag() {
@@ -53,26 +46,6 @@
}
@Override
- public void computeDeliveredProperties(ILogicalOperator op,
IOptimizationContext context) {
- AbstractLogicalOperator op2 = (AbstractLogicalOperator)
op.getInputs().get(0).getValue();
- IPartitioningProperty pp =
op2.getDeliveredPhysicalProperties().getPartitioningProperty();
- this.deliveredProperties = new StructuralPropertiesVector(pp, new
ArrayList<>(0));
- }
-
- @Override
- public PhysicalRequirements
getRequiredPropertiesForChildren(ILogicalOperator op,
- IPhysicalPropertiesVector reqdByParent, IOptimizationContext
context) {
- StructuralPropertiesVector pv0 =
OperatorPropertiesUtil.checkUnpartitionedAndGetPropertiesVector(op,
- new StructuralPropertiesVector(new
RandomPartitioningProperty(context.getComputationNodeDomain()),
- null));
- StructuralPropertiesVector pv1 =
OperatorPropertiesUtil.checkUnpartitionedAndGetPropertiesVector(op,
- new StructuralPropertiesVector(new
RandomPartitioningProperty(context.getComputationNodeDomain()),
- null));
- return new PhysicalRequirements(new StructuralPropertiesVector[] {
pv0, pv1 },
- IPartitioningRequirementsCoordinator.NO_COORDINATION);
- }
-
- @Override
public void contributeRuntimeOperator(IHyracksJobBuilder builder,
JobGenContext context, ILogicalOperator op,
IOperatorSchema opSchema, IOperatorSchema[] inputSchemas,
IOperatorSchema outerPlanSchema)
throws AlgebricksException {
@@ -81,20 +54,15 @@
RecordDescriptor recordDescriptor =
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
context);
- // at algebricks level, union all only accepts two inputs, although at
- // hyracks
- // level, there is no restrictions
- UnionAllOperatorDescriptor opDesc = new
UnionAllOperatorDescriptor(spec, 2, recordDescriptor);
+ List<Mutable<ILogicalOperator>> inputs = op.getInputs();
+ int nInputs = inputs.size();
+
+ UnionAllOperatorDescriptor opDesc = new
UnionAllOperatorDescriptor(spec, nInputs, recordDescriptor);
contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
- ILogicalOperator src1 = op.getInputs().get(0).getValue();
- builder.contributeGraphEdge(src1, 0, op, 0);
- ILogicalOperator src2 = op.getInputs().get(1).getValue();
- builder.contributeGraphEdge(src2, 0, op, 1);
- }
- @Override
- public boolean expensiveThanMaterialization() {
- return false;
+ for (int i = 0; i < nInputs; i++) {
+ ILogicalOperator src = inputs.get(i).getValue();
+ builder.contributeGraphEdge(src, 0, op, i);
+ }
}
-
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index 8d00696..6c79466 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -18,7 +18,10 @@
*/
package org.apache.hyracks.algebricks.core.algebra.util;
+import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -40,8 +43,7 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.
- LogicalOperatorDeepCopyWithNewVariablesVisitor;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalOperatorDeepCopyWithNewVariablesVisitor;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.OperatorDeepCopyVisitor;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
@@ -202,8 +204,8 @@
public static Pair<ILogicalOperator, Map<LogicalVariable,
LogicalVariable>> deepCopyWithNewVars(
ILogicalOperator root, IOptimizationContext ctx) throws
AlgebricksException {
- LogicalOperatorDeepCopyWithNewVariablesVisitor deepCopyVisitor = new
- LogicalOperatorDeepCopyWithNewVariablesVisitor(ctx, null,
true);
+ LogicalOperatorDeepCopyWithNewVariablesVisitor deepCopyVisitor =
+ new LogicalOperatorDeepCopyWithNewVariablesVisitor(ctx, ctx,
true);
ILogicalOperator newRoot = deepCopyVisitor.deepCopy(root);
return Pair.of(newRoot,
deepCopyVisitor.getInputToOutputVariableMapping());
}
@@ -328,4 +330,45 @@
return false;
}
+ /**
+ * Returns all descendants of an operator that are leaf operators
+ *
+ * @param opRef given operator
+ * @return list containing all leaf descendants
+ */
+ public static List<Mutable<ILogicalOperator>>
findLeafDescendants(Mutable<ILogicalOperator> opRef) {
+ List<Mutable<ILogicalOperator>> result = Collections.emptyList();
+
+ Deque<Mutable<ILogicalOperator>> queue = new ArrayDeque<>();
+ queue.add(opRef);
+ Mutable<ILogicalOperator> currentOpRef;
+ while ((currentOpRef = queue.pollLast()) != null) {
+ List<Mutable<ILogicalOperator>> inputs =
currentOpRef.getValue().getInputs();
+ if (inputs.size() == 0) {
+ if (result.isEmpty()) {
+ result = new ArrayList<>();
+ }
+ result.add(currentOpRef);
+ } else {
+ queue.addAll(inputs);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Find operator in a given list of operator references
+ *
+ * @param list list to search in
+ * @param op operator to find
+ * @return operator position in the given list or {@code -1} if not found
+ */
+ public static int indexOf(List<Mutable<ILogicalOperator>> list,
ILogicalOperator op) {
+ for (int i = 0, ln = list.size(); i < ln; i++) {
+ if (list.get(i).getValue() == op) {
+ return i;
+ }
+ }
+ return -1;
+ }
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
index 4c42db8..6281124 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
@@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -34,6 +35,8 @@
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+import
org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
+import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import
org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
@@ -95,7 +98,7 @@
@Override
public void contributeMicroOperator(ILogicalOperator op,
IPushRuntimeFactory runtime, RecordDescriptor recDesc,
AlgebricksPartitionConstraint pc) {
- microOps.put(op, new Pair<IPushRuntimeFactory,
RecordDescriptor>(runtime, recDesc));
+ microOps.put(op, new Pair<>(runtime, recDesc));
revMicroOpMap.put(runtime, op);
if (pc != null) {
pcForMicroOps.put(op, pc);
@@ -168,6 +171,17 @@
jobSpec.addRoot(opDesc);
}
setAllPartitionConstraints(tgtConstraints);
+ }
+
+ public List<AlgebricksMetaOperatorDescriptor> getGeneratedMetaOps() {
+ List<AlgebricksMetaOperatorDescriptor> resultOps = new ArrayList<>();
+ for (IOperatorDescriptor opd : jobSpec.getOperatorMap().values()) {
+ if (opd instanceof AlgebricksMetaOperatorDescriptor) {
+ resultOps.add((AlgebricksMetaOperatorDescriptor)opd);
+ }
+ }
+ resultOps.sort((op1, op2) -> sendsOutput(op1, op2) ? 1 :
sendsOutput(op2, op1) ? -1 : 0);
+ return resultOps;
}
private void setAllPartitionConstraints(Map<IConnectorDescriptor,
TargetConstraint> tgtConstraints) {
@@ -316,20 +330,30 @@
int n = opContents.size();
IPushRuntimeFactory[] runtimeFactories = new IPushRuntimeFactory[n];
RecordDescriptor[] internalRecordDescriptors = new RecordDescriptor[n];
- int i = 0;
- for (Pair<IPushRuntimeFactory, RecordDescriptor> p : opContents) {
+ for (int i = 0, ln = opContents.size(); i < ln; i++) {
+ Pair<IPushRuntimeFactory, RecordDescriptor> p = opContents.get(i);
runtimeFactories[i] = p.first;
internalRecordDescriptors[i] = p.second;
- i++;
}
ILogicalOperator lastLogicalOp = revMicroOpMap.get(runtimeFactories[n
- 1]);
ArrayList<ILogicalOperator> outOps = outEdges.get(lastLogicalOp);
- int outArity = (outOps == null) ? 0 : outOps.size();
+ int outArity = outOps == null ? 0 : outOps.size();
+ int[] outPositions = new int[outArity];
+ IPushRuntimeFactory[] outRuntimeFactories = new
IPushRuntimeFactory[outArity];
+ if (outOps != null) {
+ for (int i = 0, ln = outOps.size(); i < ln; i++ ) {
+ ILogicalOperator outOp = outOps.get(i);
+ outPositions[i] =
OperatorManipulationUtil.indexOf(outOp.getInputs(), lastLogicalOp);
+ Pair<IPushRuntimeFactory, RecordDescriptor> microOpPair =
microOps.get(outOp);
+ outRuntimeFactories[i] = microOpPair != null ?
microOpPair.first : null;
+ }
+ }
+
ILogicalOperator firstLogicalOp =
revMicroOpMap.get(runtimeFactories[0]);
ArrayList<ILogicalOperator> inOps = inEdges.get(firstLogicalOp);
int inArity = (inOps == null) ? 0 : inOps.size();
return new AlgebricksMetaOperatorDescriptor(jobSpec, inArity,
outArity, runtimeFactories,
- internalRecordDescriptors);
+ internalRecordDescriptors, outRuntimeFactories, outPositions);
}
private void addMicroOpToMetaRuntimeOp(ILogicalOperator aop) {
@@ -343,7 +367,12 @@
return;
}
ILogicalOperator dest = destList.get(0);
+ int destInputPos = OperatorManipulationUtil.indexOf(dest.getInputs(),
aop);
Integer j = algebraicOpBelongingToMetaAsterixOp.get(dest);
+ if (destInputPos != 0) {
+ return;
+ }
+
if (j == null && microOps.get(dest) != null) {
algebraicOpBelongingToMetaAsterixOp.put(dest, k);
List<Pair<IPushRuntimeFactory, RecordDescriptor>> aodContent1 =
metaAsterixOpSkeletons.get(k);
@@ -361,7 +390,6 @@
}
}
}
-
}
private int createNewMetaOpInfo(ILogicalOperator aop) {
@@ -386,4 +414,28 @@
}
}
+ private boolean sendsOutput(AlgebricksMetaOperatorDescriptor src,
AlgebricksMetaOperatorDescriptor trg) {
+ AlgebricksPipeline srcPipeline = src.getPipeline();
+ IPushRuntimeFactory[] srcOutRts =
srcPipeline.getOutputRuntimeFactories();
+ if (srcOutRts == null) {
+ return false;
+ }
+ IPushRuntimeFactory[] trgRts = trg.getPipeline().getRuntimeFactories();
+ for (IPushRuntimeFactory srcOutRt : srcOutRts) {
+ if (ArrayUtils.contains(trgRts, srcOutRt)) {
+ return true;
+ }
+ ILogicalOperator srcOutOp = revMicroOpMap.get(srcOutRt);
+ if (srcOutOp != null) {
+ Integer k = algebraicOpBelongingToMetaAsterixOp.get(srcOutOp);
+ if (k != null) {
+ AlgebricksMetaOperatorDescriptor srcOutMetaOp =
metaAsterixOps.get(k);
+ if (srcOutMetaOp != null && sendsOutput(srcOutMetaOp,
trg)) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
index f817cd6..4a89d15 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
@@ -47,14 +47,24 @@
return context;
}
- public JobSpecification compilePlan(ILogicalPlan plan, IOperatorSchema
outerPlanSchema,
+ public JobSpecification compilePlan(ILogicalPlan plan,
IJobletEventListenerFactory jobEventListenerFactory)
+ throws AlgebricksException {
+ return compilePlanImpl(plan, false, null, jobEventListenerFactory);
+ }
+
+ public JobSpecification compileNestedPlan(ILogicalPlan plan,
IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ return compilePlanImpl(plan, true, outerPlanSchema, null);
+ }
+
+ private JobSpecification compilePlanImpl(ILogicalPlan plan, boolean
isNestedPlan, IOperatorSchema outerPlanSchema,
IJobletEventListenerFactory jobEventListenerFactory) throws
AlgebricksException {
JobSpecification spec = new JobSpecification(context.getFrameSize());
if (jobEventListenerFactory != null) {
spec.setJobletEventListenerFactory(jobEventListenerFactory);
}
- List<ILogicalOperator> rootOps = new ArrayList<ILogicalOperator>();
- IHyracksJobBuilder builder = new JobBuilder(spec,
context.getClusterLocations());
+ List<ILogicalOperator> rootOps = new ArrayList<>();
+ JobBuilder builder = new JobBuilder(spec,
context.getClusterLocations());
for (Mutable<ILogicalOperator> opRef : plan.getRoots()) {
compileOpRef(opRef, spec, builder, outerPlanSchema);
rootOps.add(opRef.getValue());
@@ -65,6 +75,9 @@
spec.setConnectorPolicyAssignmentPolicy(new
ConnectorPolicyAssignmentPolicy());
// Do not do activity cluster planning because it is slow on large
clusters
spec.setUseConnectorPolicyForScheduling(false);
+ if (isNestedPlan) {
+ spec.setMetaOps(builder.getGeneratedMetaOps());
+ }
return spec;
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index 84961d6..72c0928 100644
---
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -67,6 +67,7 @@
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.LeftOuterUnnestPOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroPreSortedDistinctByPOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroPreclusteredGroupByPOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroUnionAllPOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedTupleSourcePOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.PreSortedDistinctByPOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator;
@@ -201,11 +202,11 @@
break;
}
case INNERJOIN: {
-
JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, context);
+
JoinUtils.setJoinAlgorithmAndExchangeAlgo((InnerJoinOperator) op, topLevelOp,
context);
break;
}
case LEFTOUTERJOIN: {
-
JoinUtils.setJoinAlgorithmAndExchangeAlgo((LeftOuterJoinOperator) op, context);
+
JoinUtils.setJoinAlgorithmAndExchangeAlgo((LeftOuterJoinOperator) op,
topLevelOp, context);
break;
}
case LIMIT: {
@@ -260,11 +261,19 @@
break;
}
case UNIONALL: {
- op.setPhysicalOperator(new UnionAllPOperator());
+ if (topLevelOp) {
+ op.setPhysicalOperator(new UnionAllPOperator());
+ } else {
+ op.setPhysicalOperator(new MicroUnionAllPOperator());
+ }
break;
}
case INTERSECT: {
- op.setPhysicalOperator(new IntersectPOperator());
+ if (topLevelOp) {
+ op.setPhysicalOperator(new IntersectPOperator());
+ } else {
+ throw new IllegalStateException("Micro operator not
implemented for: " + op.getOperatorTag());
+ }
break;
}
case UNNEST: {
diff --git
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
index af95ecd..ebcfba4 100644
---
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
+++
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/PushSubplanIntoGroupByRule.java
@@ -139,6 +139,10 @@
while (upperSubplanRootRefIterator.hasNext()) {
Mutable<ILogicalOperator> rootOpRef =
upperSubplanRootRefIterator.next();
+ if (downToNts(rootOpRef) == null) {
+ continue;
+ }
+
// Collects free variables in the root operator of a
nested plan and its descent.
Set<LogicalVariable> freeVars = new ListSet<>();
OperatorPropertiesUtil.getFreeVariablesInSelfOrDesc((AbstractLogicalOperator)
rootOpRef.getValue(),
@@ -265,11 +269,13 @@
}
private Mutable<ILogicalOperator> downToNts(Mutable<ILogicalOperator>
opRef) {
- Mutable<ILogicalOperator> currentOpRef = opRef;
- while (currentOpRef.getValue().getInputs().size() > 0) {
- currentOpRef = currentOpRef.getValue().getInputs().get(0);
+ List<Mutable<ILogicalOperator>> leafOps =
OperatorManipulationUtil.findLeafDescendants(opRef);
+ if (leafOps.size() == 1) {
+ Mutable<ILogicalOperator> leafOp = leafOps.get(0);
+ if (leafOp.getValue().getOperatorTag() ==
LogicalOperatorTag.NESTEDTUPLESOURCE) {
+ return leafOp;
+ }
}
- return currentOpRef;
+ return null;
}
-
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
index 6efda52..33862e3 100644
---
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
+++
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/util/JoinUtils.java
@@ -51,8 +51,12 @@
private JoinUtils() {
}
- public static void
setJoinAlgorithmAndExchangeAlgo(AbstractBinaryJoinOperator op,
IOptimizationContext context)
+ public static void
setJoinAlgorithmAndExchangeAlgo(AbstractBinaryJoinOperator op, boolean
topLevelOp,
+ IOptimizationContext context)
throws AlgebricksException {
+ if (!topLevelOp) {
+ throw new IllegalStateException("Micro operator not implemented
for: " + op.getOperatorTag());
+ }
List<LogicalVariable> sideLeft = new LinkedList<>();
List<LogicalVariable> sideRight = new LinkedList<>();
List<LogicalVariable> varsLeft =
op.getInputs().get(0).getValue().getSchema();
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/AlgebricksPipeline.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/AlgebricksPipeline.java
index 379944b..f24d38d 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/AlgebricksPipeline.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/AlgebricksPipeline.java
@@ -27,10 +27,15 @@
private static final long serialVersionUID = 1L;
private final IPushRuntimeFactory[] runtimeFactories;
private final RecordDescriptor[] recordDescriptors;
+ private final IPushRuntimeFactory[] outputRuntimeFactories;
+ private final int[] outputPositions;
- public AlgebricksPipeline(IPushRuntimeFactory[] runtimeFactories,
RecordDescriptor[] recordDescriptors) {
+ public AlgebricksPipeline(IPushRuntimeFactory[] runtimeFactories,
RecordDescriptor[] recordDescriptors,
+ IPushRuntimeFactory[] outputRuntimeFactories, int[]
outputPositions) {
this.runtimeFactories = runtimeFactories;
this.recordDescriptors = recordDescriptors;
+ this.outputRuntimeFactories = outputRuntimeFactories;
+ this.outputPositions = outputPositions;
// this.projectedColumns = projectedColumns;
}
@@ -46,8 +51,15 @@
return recordDescriptors[recordDescriptors.length - 1].getFieldCount();
}
+ public IPushRuntimeFactory[] getOutputRuntimeFactories() {
+ return outputRuntimeFactories;
+ }
+
+ public int[] getOutputPositions() {
+ return outputPositions;
+ }
+
// public int[] getProjectedColumns() {
// return projectedColumns;
// }
-
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
index de6cddd..f90de81 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/base/IPushRuntimeFactory.java
@@ -24,5 +24,5 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
public interface IPushRuntimeFactory extends Serializable {
- public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws
HyracksDataException;
+ IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws
HyracksDataException;
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
index 94af04f..0a578f6 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansAccumulatingAggregatorFactory.java
@@ -168,7 +168,7 @@
// should enforce protocol
boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
for (int i = runtimeFactories.length - 1; i >= 0; i--) {
- IPushRuntime newRuntime =
runtimeFactories[i].createPushRuntime(ctx);
+ IPushRuntime newRuntime =
runtimeFactories[i].createPushRuntime(ctx)[0];
newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) :
newRuntime;
start = enforce ? EnforcePushRuntime.enforce(start) : start;
newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]);
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
index c261df8..75b2fb2 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/aggreg/NestedPlansRunningAggregatorFactory.java
@@ -149,7 +149,7 @@
IPushRuntimeFactory[] runtimeFactories = subplan.getRuntimeFactories();
RecordDescriptor[] recordDescriptors = subplan.getRecordDescriptors();
for (int i = runtimeFactories.length - 1; i >= 0; i--) {
- IPushRuntime newRuntime =
runtimeFactories[i].createPushRuntime(ctx);
+ IPushRuntime newRuntime =
runtimeFactories[i].createPushRuntime(ctx)[0];
newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) :
newRuntime;
start = enforce ? EnforceFrameWriter.enforce(start) : start;
newRuntime.setOutputFrameWriter(0, start, recordDescriptors[i]);
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
index 32eff3a..7b3fb46 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputRuntimeFactory.java
@@ -34,8 +34,8 @@
}
@Override
- public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws
HyracksDataException {
- return createOneOutputPushRuntime(ctx);
+ public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws
HyracksDataException {
+ return new IPushRuntime[] { createOneOutputPushRuntime(ctx) };
}
public abstract AbstractOneInputOneOutputPushRuntime
createOneOutputPushRuntime(IHyracksTaskContext ctx)
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
index a838557..f0e9406 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/SinkRuntimeFactory.java
@@ -38,8 +38,8 @@
}
@Override
- public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws
HyracksDataException {
- return new AbstractOneInputSinkPushRuntime() {
+ public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws
HyracksDataException {
+ return new IPushRuntime[] { new AbstractOneInputSinkPushRuntime() {
@Override
public void open() throws HyracksDataException {
@@ -61,7 +61,6 @@
public void flush() throws HyracksDataException {
// flush() is meaningless for sink operators
}
- };
+ } };
}
-
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
index 25a4aea..6e4ecce 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/AlgebricksMetaOperatorDescriptor.java
@@ -45,11 +45,18 @@
public AlgebricksMetaOperatorDescriptor(IOperatorDescriptorRegistry spec,
int inputArity, int outputArity,
IPushRuntimeFactory[] runtimeFactories, RecordDescriptor[]
internalRecordDescriptors) {
+ this(spec, inputArity, outputArity, runtimeFactories,
internalRecordDescriptors, null, null);
+ }
+
+ public AlgebricksMetaOperatorDescriptor(IOperatorDescriptorRegistry spec,
int inputArity, int outputArity,
+ IPushRuntimeFactory[] runtimeFactories, RecordDescriptor[]
internalRecordDescriptors,
+ IPushRuntimeFactory[] outputRuntimeFactories, int[]
outputPositions) {
super(spec, inputArity, outputArity);
if (outputArity == 1) {
this.outRecDescs[0] =
internalRecordDescriptors[internalRecordDescriptors.length - 1];
}
- this.pipeline = new AlgebricksPipeline(runtimeFactories,
internalRecordDescriptors);
+ this.pipeline = new AlgebricksPipeline(runtimeFactories,
internalRecordDescriptors, outputRuntimeFactories,
+ outputPositions);
}
public AlgebricksPipeline getPipeline() {
@@ -68,7 +75,7 @@
StringBuilder sb = new StringBuilder();
sb.append("Asterix { \n");
for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) {
- sb.append(" " + f.toString() + ";\n");
+ sb.append(" ").append(f).append(";\n");
}
sb.append("}");
return sb.toString();
@@ -87,7 +94,7 @@
private class SourcePushRuntime extends
AbstractUnaryOutputSourceOperatorNodePushable {
private final IHyracksTaskContext ctx;
- public SourcePushRuntime(IHyracksTaskContext ctx) {
+ SourcePushRuntime(IHyracksTaskContext ctx) {
this.ctx = ctx;
}
@@ -136,8 +143,9 @@
outputArity > 0 ?
AlgebricksMetaOperatorDescriptor.this.outRecDescs[0] : null;
RecordDescriptor pipelineInputRecordDescriptor =
recordDescProvider
.getInputRecordDescriptor(AlgebricksMetaOperatorDescriptor.this.getActivityId(),
0);
- PipelineAssembler pa = new PipelineAssembler(pipeline,
inputArity, outputArity,
- pipelineInputRecordDescriptor,
pipelineOutputRecordDescriptor);
+ PipelineAssembler pa =
+ new PipelineAssembler(pipeline, inputArity,
outputArity, pipelineInputRecordDescriptor,
+ pipelineOutputRecordDescriptor);
startOfPipeline = pa.assemblePipeline(writer, ctx);
}
opened = true;
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
index e1081e0..a717794 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/PipelineAssembler.java
@@ -18,9 +18,13 @@
*/
package org.apache.hyracks.algebricks.runtime.operators.meta;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import org.apache.hyracks.algebricks.runtime.base.EnforcePushRuntime;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.EnforceFrameWriter;
@@ -37,6 +41,7 @@
private final int inputArity;
private final int outputArity;
private final AlgebricksPipeline pipeline;
+ private final Map<IPushRuntimeFactory, IPushRuntime[]> runtimeMap;
public PipelineAssembler(AlgebricksPipeline pipeline, int inputArity, int
outputArity,
RecordDescriptor pipelineInputRecordDescriptor, RecordDescriptor
pipelineOutputRecordDescriptor) {
@@ -45,6 +50,7 @@
this.pipelineOutputRecordDescriptor = pipelineOutputRecordDescriptor;
this.inputArity = inputArity;
this.outputArity = outputArity;
+ this.runtimeMap = new HashMap<>();
}
public IFrameWriter assemblePipeline(IFrameWriter writer,
IHyracksTaskContext ctx) throws HyracksDataException {
@@ -52,19 +58,30 @@
boolean enforce = ctx.getJobFlags().contains(JobFlag.ENFORCE_CONTRACT);
// plug the operators
IFrameWriter start = writer;// this.writer;
- for (int i = pipeline.getRuntimeFactories().length - 1; i >= 0; i--) {
- IPushRuntime newRuntime =
pipeline.getRuntimeFactories()[i].createPushRuntime(ctx);
- newRuntime = enforce ? EnforcePushRuntime.enforce(newRuntime) :
newRuntime;
+ IPushRuntimeFactory[] runtimeFactories =
pipeline.getRuntimeFactories();
+ RecordDescriptor[] recordDescriptors = pipeline.getRecordDescriptors();
+ for (int i = runtimeFactories.length - 1; i >= 0; i--) {
start = enforce ? EnforceFrameWriter.enforce(start) : start;
- if (i == pipeline.getRuntimeFactories().length - 1) {
- if (outputArity == 1) {
- newRuntime.setOutputFrameWriter(0, start,
pipelineOutputRecordDescriptor);
+
+ IPushRuntimeFactory runtimeFactory = runtimeFactories[i];
+ IPushRuntime[] newRuntimes = runtimeFactory.createPushRuntime(ctx);
+ for (int j = 0; j < newRuntimes.length; j++) {
+ if (enforce) {
+ newRuntimes[j] =
EnforcePushRuntime.enforce(newRuntimes[j]);
}
- } else {
- newRuntime.setOutputFrameWriter(0, start,
pipeline.getRecordDescriptors()[i]);
+ if (i == runtimeFactories.length - 1) {
+ if (outputArity == 1) {
+ newRuntimes[j].setOutputFrameWriter(0, start,
pipelineOutputRecordDescriptor);
+ }
+ } else {
+ newRuntimes[j].setOutputFrameWriter(0, start,
recordDescriptors[i]);
+ }
}
+ runtimeMap.put(runtimeFactory, newRuntimes);
+
+ IPushRuntime newRuntime = newRuntimes[0];
if (i > 0) {
- newRuntime.setInputRecordDescriptor(0,
pipeline.getRecordDescriptors()[i - 1]);
+ newRuntime.setInputRecordDescriptor(0, recordDescriptors[i -
1]);
} else if (inputArity > 0) {
newRuntime.setInputRecordDescriptor(0,
pipelineInputRecordDescriptor);
}
@@ -72,4 +89,8 @@
}
return start;
}
+
+ public IPushRuntime[] getPushRuntime(IPushRuntimeFactory runtimeFactory) {
+ return runtimeMap.get(runtimeFactory);
+ }
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
index f6a349f..5f255f0 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/meta/SubplanRuntimeFactory.java
@@ -20,9 +20,11 @@
import java.io.DataOutput;
import java.nio.ByteBuffer;
+import java.util.List;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
import
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputPushRuntime;
@@ -41,16 +43,21 @@
private static final long serialVersionUID = 1L;
- private final AlgebricksPipeline pipeline;
+ private final List<AlgebricksPipeline> pipelines;
+
private final RecordDescriptor inputRecordDesc;
+
+ private final RecordDescriptor outputRecordDesc;
+
private final IMissingWriterFactory[] missingWriterFactories;
- public SubplanRuntimeFactory(AlgebricksPipeline pipeline,
IMissingWriterFactory[] missingWriterFactories,
- RecordDescriptor inputRecordDesc, int[] projectionList) {
+ public SubplanRuntimeFactory(List<AlgebricksPipeline> pipelines,
IMissingWriterFactory[] missingWriterFactories,
+ RecordDescriptor inputRecordDesc, RecordDescriptor
outputRecordDesc, int[] projectionList) {
super(projectionList);
- this.pipeline = pipeline;
+ this.pipelines = pipelines;
this.missingWriterFactories = missingWriterFactories;
this.inputRecordDesc = inputRecordDesc;
+ this.outputRecordDesc = outputRecordDesc;
if (projectionList != null) {
throw new NotImplementedException();
}
@@ -60,8 +67,12 @@
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Subplan { \n");
- for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) {
- sb.append(" " + f.toString() + ";\n");
+ for (AlgebricksPipeline pipeline : pipelines) {
+ sb.append('{');
+ for (IPushRuntimeFactory f : pipeline.getRuntimeFactories()) {
+ sb.append(" ").append(f).append(";\n");
+ }
+ sb.append('}');
}
sb.append("}");
return sb.toString();
@@ -70,110 +81,177 @@
@Override
public AbstractOneInputOneOutputPushRuntime
createOneOutputPushRuntime(final IHyracksTaskContext ctx)
throws HyracksDataException {
+ return new SubplanPushRuntime(ctx);
+ }
- RecordDescriptor pipelineOutputRecordDescriptor = null;
+ private class SubplanPushRuntime extends
AbstractOneInputOneOutputOneFramePushRuntime {
- final PipelineAssembler pa = new PipelineAssembler(pipeline, 1, 1,
inputRecordDesc,
- pipelineOutputRecordDescriptor);
- final IMissingWriter[] nullWriters = new
IMissingWriter[missingWriterFactories.length];
- for (int i = 0; i < missingWriterFactories.length; i++) {
- nullWriters[i] = missingWriterFactories[i].createMissingWriter();
+ final IHyracksTaskContext ctx;
+
+ final NestedTupleSourceRuntime[] startOfPipelines;
+
+ boolean first;
+
+ SubplanPushRuntime(IHyracksTaskContext ctx) throws
HyracksDataException {
+ this.ctx = ctx;
+ this.first = true;
+
+ IMissingWriter[] missingWriters = new
IMissingWriter[missingWriterFactories.length];
+ for (int i = 0; i < missingWriterFactories.length; i++) {
+ missingWriters[i] =
missingWriterFactories[i].createMissingWriter();
+ }
+
+ int pipelineCount = pipelines.size();
+ startOfPipelines = new NestedTupleSourceRuntime[pipelineCount];
+ PipelineAssembler[] pipelineAssemblers = new
PipelineAssembler[pipelineCount];
+ for (int i = 0; i < pipelineCount; i++) {
+ AlgebricksPipeline pipeline = pipelines.get(i);
+ RecordDescriptor pipelineLastRecordDescriptor =
+
pipeline.getRecordDescriptors()[pipeline.getRecordDescriptors().length - 1];
+
+ RecordDescriptor outputRecordDescriptor;
+ IFrameWriter outputWriter;
+ if (i == 0) {
+ // primary pipeline
+ outputWriter = new
TupleOuterProduct(pipelineLastRecordDescriptor, missingWriters);
+ outputRecordDescriptor =
SubplanRuntimeFactory.this.outputRecordDesc;
+ } else {
+ // secondary pipeline
+ IPushRuntime outputPushRuntime =
linkSecondaryPipeline(pipeline, pipelineAssemblers, i);
+ if (outputPushRuntime == null) {
+ throw new IllegalStateException();
+ }
+ outputPushRuntime.setInputRecordDescriptor(0,
pipelineLastRecordDescriptor);
+ outputWriter = outputPushRuntime;
+ outputRecordDescriptor = pipelineLastRecordDescriptor;
+ }
+
+ PipelineAssembler pa = new PipelineAssembler(pipeline, 1, 1,
inputRecordDesc, outputRecordDescriptor);
+ startOfPipelines[i] = (NestedTupleSourceRuntime)
pa.assemblePipeline(outputWriter, ctx);
+ pipelineAssemblers[i] = pa;
+ }
}
- return new AbstractOneInputOneOutputOneFramePushRuntime() {
+ IPushRuntime linkSecondaryPipeline(AlgebricksPipeline pipeline,
PipelineAssembler[] pipelineAssemblers,
+ int pipelineAssemblersCount) {
+ IPushRuntimeFactory[] outputRuntimeFactories =
pipeline.getOutputRuntimeFactories();
+ if (outputRuntimeFactories == null ||
outputRuntimeFactories.length != 1) {
+ throw new IllegalStateException();
+ }
+ IPushRuntimeFactory outRuntimeFactory = outputRuntimeFactories[0];
+ int outputPosition = pipeline.getOutputPositions()[0];
+ for (int i = 0; i < pipelineAssemblersCount; i++) {
+ IPushRuntime[] p =
pipelineAssemblers[i].getPushRuntime(outRuntimeFactory);
+ if (p != null) {
+ return p[outputPosition];
+ }
+ }
+ return null;
+ }
- /**
- * Computes the outer product between a given tuple and the frames
- * passed.
- */
- class TupleOuterProduct implements IFrameWriter {
+ @Override
+ public void open() throws HyracksDataException {
+ writer.open();
+ if (first) {
+ first = false;
+ initAccessAppendRef(ctx);
+ }
+ }
- private boolean smthWasWritten = false;
- private FrameTupleAccessor ta = new FrameTupleAccessor(
-
pipeline.getRecordDescriptors()[pipeline.getRecordDescriptors().length - 1]);
- private ArrayTupleBuilder tb = new ArrayTupleBuilder(
- nullWriters.length +
SubplanRuntimeFactory.this.inputRecordDesc.getFieldCount());
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ tRef.reset(tAccess, t);
- @Override
- public void open() throws HyracksDataException {
- smthWasWritten = false;
+ for (NestedTupleSourceRuntime nts : startOfPipelines) {
+ nts.writeTuple(buffer, t);
}
- @Override
- public void nextFrame(ByteBuffer buffer) throws
HyracksDataException {
- ta.reset(buffer);
- int nTuple = ta.getTupleCount();
- for (int t = 0; t < nTuple; t++) {
- appendConcat(tRef.getFrameTupleAccessor(),
tRef.getTupleIndex(), ta, t);
+ int n = 0;
+ try {
+ for (; n < startOfPipelines.length; n++) {
+ NestedTupleSourceRuntime nts = startOfPipelines[n];
+ try {
+ nts.open();
+ } catch (Exception e) {
+ nts.fail();
+ throw e;
+ }
}
- smthWasWritten = true;
- }
-
- @Override
- public void close() throws HyracksDataException {
- if (!smthWasWritten && !failed) {
- // the case when we need to write nulls
- appendNullsToTuple();
- appendToFrameFromTupleBuilder(tb);
- }
- }
-
- @Override
- public void fail() throws HyracksDataException {
- // writer.fail() is called by the outer class'
writer.fail().
- }
-
- private void appendNullsToTuple() throws HyracksDataException {
- tb.reset();
- int n0 = tRef.getFieldCount();
- for (int f = 0; f < n0; f++) {
- tb.addField(tRef.getFrameTupleAccessor(),
tRef.getTupleIndex(), f);
- }
- DataOutput dos = tb.getDataOutput();
- for (int i = 0; i < nullWriters.length; i++) {
- nullWriters[i].writeMissing(dos);
- tb.addFieldEndOffset();
+ } finally {
+ for (int i = n - 1; i >= 0; i--) {
+ startOfPipelines[i].close();
}
}
}
+ }
- IFrameWriter endPipe = new TupleOuterProduct();
+ @Override
+ public void flush() throws HyracksDataException {
+ writer.flush();
+ }
- NestedTupleSourceRuntime startOfPipeline =
(NestedTupleSourceRuntime) pa.assemblePipeline(endPipe, ctx);
+ /**
+ * Computes the outer product between a given tuple and the frames
+ * passed.
+ */
+ class TupleOuterProduct implements IFrameWriter {
- boolean first = true;
+ private boolean smthWasWritten;
+ private final FrameTupleAccessor ta;
+ private final ArrayTupleBuilder tb;
+ private final IMissingWriter[] missingWriters;
+
+ private TupleOuterProduct(RecordDescriptor recordDescriptor,
IMissingWriter[] missingWriters) {
+ ta = new FrameTupleAccessor(recordDescriptor);
+ tb = new ArrayTupleBuilder(
+ missingWriters.length +
SubplanRuntimeFactory.this.inputRecordDesc.getFieldCount());
+ this.missingWriters = missingWriters;
+ }
@Override
public void open() throws HyracksDataException {
- writer.open();
- if (first) {
- first = false;
- initAccessAppendRef(ctx);
- }
+ smthWasWritten = false;
}
@Override
public void nextFrame(ByteBuffer buffer) throws
HyracksDataException {
- tAccess.reset(buffer);
- int nTuple = tAccess.getTupleCount();
+ ta.reset(buffer);
+ int nTuple = ta.getTupleCount();
for (int t = 0; t < nTuple; t++) {
- tRef.reset(tAccess, t);
- startOfPipeline.writeTuple(buffer, t);
- try {
- startOfPipeline.open();
- } catch (Exception e) {
- startOfPipeline.fail();
- throw e;
- } finally {
- startOfPipeline.close();
- }
+ appendConcat(tRef.getFrameTupleAccessor(),
tRef.getTupleIndex(), ta, t);
+ }
+ smthWasWritten = true;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (!smthWasWritten && !failed) {
+ // the case when we need to write nulls
+ appendNullsToTuple();
+ appendToFrameFromTupleBuilder(tb);
}
}
@Override
- public void flush() throws HyracksDataException {
- writer.flush();
+ public void fail() throws HyracksDataException {
+ // writer.fail() is called by the outer class' writer.fail().
}
- };
+
+ private void appendNullsToTuple() throws HyracksDataException {
+ tb.reset();
+ int n0 = tRef.getFieldCount();
+ for (int f = 0; f < n0; f++) {
+ tb.addField(tRef.getFrameTupleAccessor(),
tRef.getTupleIndex(), f);
+ }
+ DataOutput dos = tb.getDataOutput();
+ for (IMissingWriter missingWriter : missingWriters) {
+ missingWriter.writeMissing(dos);
+ tb.addFieldEndOffset();
+ }
+ }
+ }
}
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
index 3ccceed..67f4a77 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/EmptyTupleSourceRuntimeFactory.java
@@ -40,8 +40,8 @@
}
@Override
- public IPushRuntime createPushRuntime(final IHyracksTaskContext ctx)
throws HyracksDataException {
- return new AbstractOneInputSourcePushRuntime() {
+ public IPushRuntime[] createPushRuntime(final IHyracksTaskContext ctx)
throws HyracksDataException {
+ return new IPushRuntime[] { new AbstractOneInputSourcePushRuntime() {
private final ArrayTupleBuilder tb = new ArrayTupleBuilder(0);
private final FrameTupleAppender appender = new
FrameTupleAppender(new VSizeFrame(ctx));
@@ -69,6 +69,6 @@
public void flush() throws HyracksDataException {
appender.flush(writer);
}
- };
+ } };
}
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
index 496679f..8e64092 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/NestedTupleSourceRuntimeFactory.java
@@ -39,8 +39,8 @@
}
@Override
- public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws
HyracksDataException {
- return new NestedTupleSourceRuntime(ctx);
+ public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws
HyracksDataException {
+ return new IPushRuntime[] { new NestedTupleSourceRuntime(ctx) };
}
public static class NestedTupleSourceRuntime extends
AbstractOneInputOneOutputOneFramePushRuntime {
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
index 021784a..8a06ecf 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PrinterRuntimeFactory.java
@@ -56,9 +56,9 @@
}
@Override
- public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) {
+ public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) {
IAWriter w =
PrinterBasedWriterFactory.INSTANCE.createWriter(printColumns, System.out,
printerFactories,
inputRecordDesc);
- return new SinkWriterRuntime(w, System.out, inputRecordDesc);
+ return new IPushRuntime[] { new SinkWriterRuntime(w, System.out,
inputRecordDesc) };
}
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
index d52ceee..536a769 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SinkWriterRuntimeFactory.java
@@ -67,11 +67,11 @@
}
@Override
- public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws
HyracksDataException {
+ public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws
HyracksDataException {
try {
PrintStream filePrintStream = new PrintStream(new
BufferedOutputStream(new FileOutputStream(outputFile)));
IAWriter w = writerFactory.createWriter(fields, filePrintStream,
printerFactories, inputRecordDesc);
- return new SinkWriterRuntime(w, filePrintStream, inputRecordDesc,
true);
+ return new IPushRuntime[] { new SinkWriterRuntime(w,
filePrintStream, inputRecordDesc, true) };
} catch (IOException e) {
throw new HyracksDataException(e);
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/union/MicroUnionAllRuntimeFactory.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/union/MicroUnionAllRuntimeFactory.java
new file mode 100644
index 0000000..5d94cdb
--- /dev/null
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/union/MicroUnionAllRuntimeFactory.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.runtime.operators.union;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class MicroUnionAllRuntimeFactory implements IPushRuntimeFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ private final int inputArity;
+
+ public MicroUnionAllRuntimeFactory(int inputArity) {
+ this.inputArity = inputArity;
+ }
+
+ @Override
+ public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) {
+ MicroUnionAllState sharedState = new MicroUnionAllState();
+ IPushRuntime[] result = new IPushRuntime[inputArity];
+ for (int i = 0; i < inputArity; i++) {
+ result[i] = new MicroUnionAllPushRuntime(i, sharedState);
+ }
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "union-all";
+ }
+
+ private final class MicroUnionAllState {
+ private boolean failed;
+ }
+
+ private final class MicroUnionAllPushRuntime implements IPushRuntime {
+
+ private final int idx;
+
+ private final MicroUnionAllState sharedState;
+
+ private IFrameWriter writer;
+
+ MicroUnionAllPushRuntime(int idx, MicroUnionAllState sharedState) {
+ this.idx = idx;
+ this.sharedState = sharedState;
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter writer,
RecordDescriptor recordDesc) {
+ if (index != 0) {
+ throw new IllegalArgumentException(String.valueOf(index));
+ }
+ this.writer = writer;
+ }
+
+ @Override
+ public void setInputRecordDescriptor(int index, RecordDescriptor
recordDescriptor) {
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ if (idx == 0) {
+ writer.open();
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ writer.nextFrame(buffer);
+ }
+
+ @Override
+ public void fail() throws HyracksDataException {
+ boolean failed = sharedState.failed;
+ sharedState.failed = true;
+ if (!failed) {
+ writer.fail();
+ }
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ if (idx == 0) {
+ writer.close();
+ }
+ }
+ }
+}
diff --git
a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
index cc4c1b9..1113ab6 100644
---
a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
+++
b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
@@ -22,6 +22,7 @@
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
+import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
@@ -439,7 +440,7 @@
RecordDescriptor aggDesc = new RecordDescriptor(
new ISerializerDeserializer[] {
IntegerSerializerDeserializer.INSTANCE });
AlgebricksPipeline pipeline = new AlgebricksPipeline(new
IPushRuntimeFactory[] { nts, agg },
- new RecordDescriptor[] { ntsDesc, aggDesc });
+ new RecordDescriptor[] { ntsDesc, aggDesc }, null, null);
NestedPlansAccumulatingAggregatorFactory npaaf = new
NestedPlansAccumulatingAggregatorFactory(
new AlgebricksPipeline[] { pipeline }, new int[] { 3 }, new
int[] {});
RecordDescriptor gbyDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
@@ -779,10 +780,10 @@
new ISerializerDeserializer[] {
IntegerSerializerDeserializer.INSTANCE });
AlgebricksPipeline pipeline = new AlgebricksPipeline(new
IPushRuntimeFactory[] { nts, assign2, project1 },
- new RecordDescriptor[] { assign1Desc, assign2Desc,
project1Desc });
+ new RecordDescriptor[] { assign1Desc, assign2Desc,
project1Desc }, null, null);
- SubplanRuntimeFactory subplan = new SubplanRuntimeFactory(pipeline,
- new IMissingWriterFactory[] {
NoopMissingWriterFactory.INSTANCE }, assign1Desc, null);
+ SubplanRuntimeFactory subplan = new
SubplanRuntimeFactory(Collections.singletonList(pipeline),
+ new IMissingWriterFactory[] {
NoopMissingWriterFactory.INSTANCE }, assign1Desc, null, null);
RecordDescriptor subplanDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
IntegerSerializerDeserializer.INSTANCE,
IntegerSerializerDeserializer.INSTANCE });
@@ -850,7 +851,7 @@
RecordDescriptor aggDesc = new RecordDescriptor(
new ISerializerDeserializer[] {
IntegerSerializerDeserializer.INSTANCE });
AlgebricksPipeline pipeline = new AlgebricksPipeline(new
IPushRuntimeFactory[] { nts, agg },
- new RecordDescriptor[] { ntsDesc, aggDesc });
+ new RecordDescriptor[] { ntsDesc, aggDesc }, null, null);
NestedPlansAccumulatingAggregatorFactory npaaf = new
NestedPlansAccumulatingAggregatorFactory(
new AlgebricksPipeline[] { pipeline }, new int[] { 3 }, new
int[] {});
RecordDescriptor gbyDesc = new RecordDescriptor(new
ISerializerDeserializer[] {
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
index 9f66080..13ebd6e 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java
@@ -89,6 +89,8 @@
private transient int connectorIdCounter;
+ private transient List<? extends IOperatorDescriptor> metaOps;
+
// This constructor uses the default frame size. It is for test purposes
only.
// For other use cases, use the one which sets the frame size.
public JobSpecification() {
@@ -308,6 +310,14 @@
return requiredClusterCapacity;
}
+ public void setMetaOps(List<? extends IOperatorDescriptor> metaOps) {
+ this.metaOps = metaOps;
+ }
+
+ public List<? extends IOperatorDescriptor> getMetaOps() {
+ return metaOps;
+ }
+
private <K, V> void insertIntoIndexedMap(Map<K, List<V>> map, K key, int
index, V value) {
List<V> vList = map.computeIfAbsent(key, k -> new ArrayList<>());
extend(vList, index);
--
To view, visit https://asterix-gerrit.ics.uci.edu/2277
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I11be926f175889978c144dd4483ec565d3d86e2d
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Dmitry Lychagin <[email protected]>