[
https://issues.apache.org/jira/browse/ASTERIXDB-1343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yingyi Bu resolved ASTERIXDB-1343.
----------------------------------
Resolution: Fixed
> Queries over nodegroup-based datasets do not work
> -------------------------------------------------
>
> Key: ASTERIXDB-1343
> URL: https://issues.apache.org/jira/browse/ASTERIXDB-1343
> Project: Apache AsterixDB
> Issue Type: Bug
> Reporter: Yingyi Bu
> Assignee: Yingyi Bu
>
> The following query will fail, if the datasets are created on a node group.
> DDL:
> {noformat}
> drop dataverse tpch if exists;
> create dataverse tpch;
> use dataverse tpch;
> create type LineItemType as closed {
> l_orderkey: int64,
> l_partkey: int64,
> l_suppkey: int64,
> l_linenumber: int64,
> l_quantity: int64,
> l_extendedprice: double,
> l_discount: double,
> l_tax: double,
> l_returnflag: string,
> l_linestatus: string,
> l_shipdate: string,
> l_commitdate: string,
> l_receiptdate: string,
> l_shipinstruct: string,
> l_shipmode: string,
> l_comment: string
> }
> create type OrderType as closed {
> o_orderkey: int64,
> o_custkey: int64,
> o_orderstatus: string,
> o_totalprice: double,
> o_orderdate: string,
> o_orderpriority: string,
> o_clerk: string,
> o_shippriority: int64,
> o_comment: string
> }
> create nodegroup group1 if not exists on
> asterix_nc1;
> create dataset LineItem(LineItemType)
> primary key l_orderkey, l_linenumber on group1;
> create dataset Orders(OrderType)
> primary key o_orderkey on group1;
> {noformat}
> Query:
> {noformat}
> use dataverse tpch;
> declare function tmp()
> {
> for $l in dataset('LineItem')
> where $l.l_commitdate < $l.l_receiptdate
> distinct by $l.l_orderkey
> return { "o_orderkey": $l.l_orderkey }
> }
> for $o in dataset('Orders')
> for $t in tmp()
> where $o.o_orderkey = $t.o_orderkey and
> $o.o_orderdate >= '1993-07-01' and $o.o_orderdate < '1993-10-01'
> group by $o_orderpriority := $o.o_orderpriority with $o
> order by $o_orderpriority
> return {
> "order_priority": $o_orderpriority,
> "count": count($o)
> }
> {noformat}
> Exception:
> {noformat}
> Exception in thread "Thread-1" java.lang.AssertionError: Dependency activity
> partitioned differently from dependent: 4 != 2
> at
> org.apache.hyracks.control.cc.scheduler.ActivityClusterPlanner.buildActivityPlanMap(ActivityClusterPlanner.java:109)
> at
> org.apache.hyracks.control.cc.scheduler.ActivityClusterPlanner.planActivityCluster(ActivityClusterPlanner.java:71)
> at
> org.apache.hyracks.control.cc.scheduler.JobScheduler.findRunnableTaskClusterRoots(JobScheduler.java:139)
> at
> org.apache.hyracks.control.cc.scheduler.JobScheduler.findRunnableTaskClusterRoots(JobScheduler.java:118)
> at
> org.apache.hyracks.control.cc.scheduler.JobScheduler.findRunnableTaskClusterRoots(JobScheduler.java:108)
> at
> org.apache.hyracks.control.cc.scheduler.JobScheduler.startRunnableActivityClusters(JobScheduler.java:164)
> at
> org.apache.hyracks.control.cc.scheduler.JobScheduler.notifyTaskComplete(JobScheduler.java:617)
> at
> org.apache.hyracks.control.cc.work.TaskCompleteWork.performEvent(TaskCompleteWork.java:56)
> at
> org.apache.hyracks.control.cc.work.AbstractTaskLifecycleWork.runWork(AbstractTaskLifecycleWork.java:70)
> at
> org.apache.hyracks.control.cc.work.AbstractHeartbeatWork.doRun(AbstractHeartbeatWork.java:48)
> at
> org.apache.hyracks.control.common.work.SynchronizableWork.run(SynchronizableWork.java:36)
> at
> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread.run(WorkQueue.java:132)
> {noformat}
> {noformat}
> distribute result [%0->$$25]
> -- DISTRIBUTE_RESULT |PARTITIONED|
> exchange
> -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
> project ([$$25])
> -- STREAM_PROJECT |PARTITIONED|
> assign [$$25] <- [function-call: asterix:closed-record-constructor,
> Args:[AString: {order_priority}, %0->$$3, AString: {count}, %0->$$33]]
> -- ASSIGN |PARTITIONED|
> exchange
> -- SORT_MERGE_EXCHANGE [$$3(ASC) ] |PARTITIONED|
> group by ([$$3 := %0->$$39]) decor ([]) {
> aggregate [$$33] <- [function-call: asterix:agg-sum,
> Args:[%0->$$38]]
> -- AGGREGATE |LOCAL|
> nested tuple source
> -- NESTED_TUPLE_SOURCE |LOCAL|
> }
> -- PRE_CLUSTERED_GROUP_BY[$$39] |PARTITIONED|
> exchange
> -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$39(ASC)] HASH:[$$39]
> |PARTITIONED|
> group by ([$$39 := %0->$$27]) decor ([]) {
> aggregate [$$38] <- [function-call:
> asterix:agg-count, Args:[AInt64: {1}]]
> -- AGGREGATE |LOCAL|
> nested tuple source
> -- NESTED_TUPLE_SOURCE |LOCAL|
> }
> -- SORT_GROUP_BY[$$27] |PARTITIONED|
> exchange
> -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
> project ([$$27])
> -- STREAM_PROJECT |PARTITIONED|
> exchange
> -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
> join (function-call: algebricks:eq, Args:[%0->$$30,
> %0->$$31])
> -- HYBRID_HASH_JOIN [$$30][$$31] |PARTITIONED|
> exchange
> -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
> project ([$$27, $$30])
> -- STREAM_PROJECT |PARTITIONED|
> select (function-call: algebricks:and,
> Args:[function-call: algebricks:lt, Args:[%0->$$29, AString: {1993-10-01}],
> function-call: algebricks:ge, Args:[%0->$$29, AString: {1993-07-01}]])
> -- STREAM_SELECT |PARTITIONED|
> project ([$$27, $$29, $$30])
> -- STREAM_PROJECT |PARTITIONED|
> assign [$$27, $$29] <- [function-call:
> asterix:field-access-by-index, Args:[%0->$$4, AInt32: {5}], function-call:
> asterix:field-access-by-index, Args:[%0->$$4, AInt32: {4}]]
> -- ASSIGN |PARTITIONED|
> exchange
> -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
> data-scan []<-[$$30, $$4] <- tpch:Orders
> -- DATASOURCE_SCAN |PARTITIONED|
> exchange
> -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
> empty-tuple-source
> -- EMPTY_TUPLE_SOURCE |PARTITIONED|
> exchange
> -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
> distinct ([%0->$$31])
> -- PRE_SORTED_DISTINCT_BY |PARTITIONED|
> exchange
> -- HASH_PARTITION_MERGE_EXCHANGE
> MERGE:[$$31(ASC)] HASH:[$$31] |PARTITIONED|
> project ([$$31])
> -- STREAM_PROJECT |PARTITIONED|
> select (function-call: algebricks:lt,
> Args:[function-call: asterix:field-access-by-index, Args:[%0->$$5, AInt32:
> {11}], function-call: asterix:field-access-by-index, Args:[%0->$$5, AInt32:
> {12}]])
> -- STREAM_SELECT |PARTITIONED|
> project ([$$5, $$31])
> -- STREAM_PROJECT |PARTITIONED|
> exchange
> -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
> data-scan []<-[$$31, $$32, $$5] <-
> tpch:LineItem
> -- DATASOURCE_SCAN |PARTITIONED|
> exchange
> -- ONE_TO_ONE_EXCHANGE |PARTITIONED|
> empty-tuple-source
> -- EMPTY_TUPLE_SOURCE |PARTITIONED|
> {noformat}
> The reason is the optimized query plan assumes that computation nodes are the
> same as storage nodes and uses wrong exchange strategies.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)