Ritik Raj created ASTERIXDB-3582:
------------------------------------
Summary: Issues in values and filter pushdown for column
collections
Key: ASTERIXDB-3582
URL: https://issues.apache.org/jira/browse/ASTERIXDB-3582
Project: Apache AsterixDB
Issue Type: Bug
Components: COMP - Compiler, STO - Storage
Reporter: Ritik Raj
Assignee: Ritik Raj
Fix For: 0.9.10
There are few issues identified related to value and filter pushdown for column
collections.
1. For the following query
{code:java}
USE commerce.marketing;
CREATE FUNCTION sent(txt) {
LET pos = ["bomb","needs"], neg=["shrinks","shrunk","smaller"], exp =
split(txt, " ")
SELECT CASE
WHEN (
(SOME w IN pos SATISFIES (w IN exp))
AND
(EVERY w IN neg SATISFIES (w NOT IN exp))
)
THEN "positive"
WHEN (
(SOME w IN neg SATISFIES (w IN exp))
AND
(EVERY w IN pos SATISFIES (w NOT IN exp))
)
THEN "negative"
ELSE "neutral"
END
};
USE commerce.marketing;
SELECT r.*, sent(r.text) FROM reviews r;{code}
The query gives out Internal Error with the following trace
{code:java}
2025-03-17T17:13:50.133+00:00 INFO CBAS.translator.QueryTranslator
[QueryTranslator:4ba4e252-4fde-4a6e-8b97-039b11366919] null
java.util.ConcurrentModificationException: null
at java.base/java.util.HashMap$HashIterator.nextNode(HashMap.java:1597)
~[?:?]
at java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1630)
~[?:?]
at java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1628)
~[?:?]
at
org.apache.asterix.optimizer.rules.pushdown.processor.AbstractFilterPushdownProcessor.putPotentialSelects(AbstractFilterPushdownProcessor.java:194)
~[asterix-algebra-1.0.3-2467.jar:1.0.3-2467]
at
org.apache.asterix.optimizer.rules.pushdown.processor.AbstractFilterPushdownProcessor.process(AbstractFilterPushdownProcessor.java:79)
~[asterix-algebra-1.0.3-2467.jar:1.0.3-2467]
at
org.apache.asterix.optimizer.rules.pushdown.PushdownProcessorsExecutor.execute(PushdownProcessorsExecutor.java:63)
~[asterix-algebra-1.0.3-2467.jar:1.0.3-2467]
at
org.apache.asterix.optimizer.rules.PushValueAccessAndFilterDownRule.rewritePre(PushValueAccessAndFilterDownRule.java:102)
~[asterix-algebra-1.0.3-2467.jar:1.0.3-2467]
at
org.apache.hyracks.algebricks.core.rewriter.base.AbstractRuleController.rewriteOperatorRef(AbstractRuleController.java:79)
~[algebricks-core-1.0.3-2467.jar:1.0.3-2467]
at
org.apache.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialOnceRuleController.rewriteWithRuleCollection(SequentialOnceRuleController.java:43)
~[algebricks-compiler-1.0.3-2467.jar:1.0.3-2467]
at
org.apache.hyracks.algebricks.core.rewriter.base.HeuristicOptimizer.runOptimizationSets(HeuristicOptimizer.java:92)
~[algebricks-core-1.0.3-2467.jar:1.0.3-2467]
at
org.apache.hyracks.algebricks.core.rewriter.base.HeuristicOptimizer.runPhysicalOptimizationSets(HeuristicOptimizer.java:122)
~[algebricks-core-1.0.3-2467.jar:1.0.3-2467]
at
org.apache.hyracks.algebricks.core.rewriter.base.HeuristicOptimizer.optimize(HeuristicOptimizer.java:66)
~[algebricks-core-1.0.3-2467.jar:1.0.3-2467]
at
org.apache.hyracks.algebricks.compiler.api.HeuristicCompilerFactoryBuilder$CompilerImpl.optimize(HeuristicCompilerFactoryBuilder.java:165)
~[algebricks-compiler-1.0.3-2467.jar:1.0.3-2467]
at
org.apache.asterix.api.common.APIFramework.compileQuery(APIFramework.java:289)
~[asterix-app-1.0.3-2467.jar:1.0.3-2467]
at
org.apache.asterix.app.translator.QueryTranslator.rewriteCompileQuery(QueryTranslator.java:4322)
~[asterix-app-1.0.3-2467.jar:1.0.3-2467]
at
org.apache.asterix.app.translator.QueryTranslator.lambda$handleQuery$3(QueryTranslator.java:5280)
~[asterix-app-1.0.3-2467.jar:1.0.3-2467]
at
org.apache.asterix.app.translator.QueryTranslator.createAndRunJob(QueryTranslator.java:5433)
~[asterix-app-1.0.3-2467.jar:1.0.3-2467]
at
org.apache.asterix.app.translator.QueryTranslator.deliverResult(QueryTranslator.java:5326)
~[asterix-app-1.0.3-2467.jar:1.0.3-2467]
at
org.apache.asterix.app.translator.QueryTranslator.handleQuery(QueryTranslator.java:5296)
~[asterix-app-1.0.3-2467.jar:1.0.3-2467]
at
org.apache.asterix.app.translator.QueryTranslator.compileAndExecute(QueryTranslator.java:534)
~[asterix-app-1.0.3-2467.jar:1.0.3-2467]
at
org.apache.asterix.app.message.ExecuteStatementRequestMessage.handle(ExecuteStatementRequestMessage.java:181)
~[asterix-app-1.0.3-2467.jar:1.0.3-2467]
at
org.apache.asterix.messaging.CCMessageBroker.receivedMessage(CCMessageBroker.java:64)
~[asterix-app-1.0.3-2467.jar:1.0.3-2467]
at
org.apache.hyracks.control.cc.work.ApplicationMessageWork.lambda$notifyMessageBroker$0(ApplicationMessageWork.java:74)
~[hyracks-control-cc-1.0.3-2467.jar:1.0.3-2467]
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[?:?]
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[?:?]
at java.base/java.lang.Thread.run(Thread.java:840) [?:?] {code}
The reason behind failure is we are trying to modify a map which tracks the
subplan operators while iterating the map.
if we see the below plan:
{code:java}
distribute result [$$163] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- DISTRIBUTE_RESULT |PARTITIONED|
project ([$$163]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- STREAM_PROJECT |PARTITIONED|
assign [$$163] <- [{"$1": $$162}] [cardinality: 0.0, op-cost: 0.0,
total-cost: 0.0]
-- ASSIGN |PARTITIONED|
project ([$$162]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] --
|UNPARTITIONED|
subplan {
aggregate [$$162] <- [listify($$161)] [cardinality: 0.0,
op-cost: 0.0, total-cost: 0.0]
-- AGGREGATE |LOCAL|
assign [$$161] <- [{"$2": switch-case(true, and($$140,
$$146), "positive", and($$153, $$159), "negative", "neutral")}] [cardinality:
0.0, op-cost: 0.0, total-cost: 0.0]
-- ASSIGN |LOCAL|
subplan {
aggregate [$$159] <- [empty-stream()]
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- AGGREGATE |LOCAL|
select (not(if-missing-or-null($$158,
false))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- STREAM_SELECT |LOCAL|
subplan {
aggregate [$$158] <-
[empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- AGGREGATE |LOCAL|
select
(not(if-missing-or-null(neq($$w, $#6), false))) [cardinality: 0.0, op-cost:
0.0, total-cost: 0.0]
-- STREAM_SELECT |LOCAL|
unnest $#6 <-
scan-collection(split($$166, " ")) [cardinality: 0.0, op-cost: 0.0, total-cost:
0.0]
-- UNNEST |LOCAL|
nested tuple source
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- NESTED_TUPLE_SOURCE
|LOCAL|
} [cardinality: 0.0, op-cost: 0.0,
total-cost: 0.0]
-- SUBPLAN |LOCAL|
unnest $$w <- scan-collection(array: [
"bomb", "needs" ]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- UNNEST |LOCAL|
nested tuple source [cardinality: 0.0,
op-cost: 0.0, total-cost: 0.0]
-- NESTED_TUPLE_SOURCE |LOCAL|
} [cardinality: 0.0, op-cost: 0.0, total-cost:
0.0]
-- SUBPLAN |LOCAL|
subplan {
aggregate [$$153] <- [non-empty-stream()]
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- AGGREGATE |LOCAL|
select ($$152) [cardinality: 0.0, op-cost:
0.0, total-cost: 0.0]
-- STREAM_SELECT |LOCAL|
subplan {
aggregate [$$152] <-
[non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- AGGREGATE |LOCAL|
select (eq($$w, $#5))
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- STREAM_SELECT |LOCAL|
unnest $#5 <-
scan-collection(split($$166, " ")) [cardinality: 0.0, op-cost: 0.0, total-cost:
0.0]
-- UNNEST |LOCAL|
nested tuple source
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- NESTED_TUPLE_SOURCE
|LOCAL|
} [cardinality: 0.0, op-cost: 0.0,
total-cost: 0.0]
-- SUBPLAN |LOCAL|
unnest $$w <- scan-collection(array: [
"shrinks", "shrunk", "smaller" ]) [cardinality: 0.0, op-cost: 0.0, total-cost:
0.0]
-- UNNEST |LOCAL|
nested tuple source [cardinality:
0.0, op-cost: 0.0, total-cost: 0.0]
-- NESTED_TUPLE_SOURCE |LOCAL|
} [cardinality: 0.0, op-cost: 0.0, total-cost:
0.0]
-- SUBPLAN |LOCAL|
subplan {
aggregate [$$146] <- [empty-stream()]
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- AGGREGATE |LOCAL|
select (not(if-missing-or-null($$145,
false))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- STREAM_SELECT |LOCAL|
subplan {
aggregate [$$145] <-
[empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- AGGREGATE |LOCAL|
select
(not(if-missing-or-null(neq($$w, $#4), false))) [cardinality: 0.0, op-cost:
0.0, total-cost: 0.0]
-- STREAM_SELECT |LOCAL|
unnest $#4 <-
scan-collection(split($$166, " ")) [cardinality: 0.0, op-cost: 0.0, total-cost:
0.0]
-- UNNEST |LOCAL|
nested tuple source
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- NESTED_TUPLE_SOURCE
|LOCAL|
} [cardinality: 0.0, op-cost:
0.0, total-cost: 0.0]
-- SUBPLAN |LOCAL|
unnest $$w <- scan-collection(array:
[ "shrinks", "shrunk", "smaller" ]) [cardinality: 0.0, op-cost: 0.0,
total-cost: 0.0]
-- UNNEST |LOCAL|
nested tuple source [cardinality:
0.0, op-cost: 0.0, total-cost: 0.0]
-- NESTED_TUPLE_SOURCE |LOCAL|
} [cardinality: 0.0, op-cost: 0.0, total-cost:
0.0]
-- SUBPLAN |LOCAL|
subplan {
aggregate [$$140] <- [non-empty-stream()]
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- AGGREGATE |LOCAL|
select ($$139) [cardinality: 0.0,
op-cost: 0.0, total-cost: 0.0]
-- STREAM_SELECT |LOCAL|
subplan {
aggregate [$$139] <-
[non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- AGGREGATE |LOCAL|
select (eq($$w, $#3))
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- STREAM_SELECT |LOCAL|
unnest $#3 <-
scan-collection(split($$166, " ")) [cardinality: 0.0, op-cost: 0.0, total-cost:
0.0]
-- UNNEST |LOCAL|
nested tuple source
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
--
NESTED_TUPLE_SOURCE |LOCAL|
} [cardinality: 0.0, op-cost:
0.0, total-cost: 0.0]
-- SUBPLAN |LOCAL|
unnest $$w <-
scan-collection(array: [ "bomb", "needs" ]) [cardinality: 0.0, op-cost: 0.0,
total-cost: 0.0]
-- UNNEST |LOCAL|
nested tuple source [cardinality:
0.0, op-cost: 0.0, total-cost: 0.0]
-- NESTED_TUPLE_SOURCE |LOCAL|
} [cardinality: 0.0, op-cost: 0.0,
total-cost: 0.0]
-- SUBPLAN |LOCAL|
nested tuple source [cardinality: 0.0, op-cost:
0.0, total-cost: 0.0]
-- NESTED_TUPLE_SOURCE |LOCAL|
} [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- SUBPLAN |PARTITIONED|
project ([$$166]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- STREAM_PROJECT |PARTITIONED|
assign [$$166] <- [$$r.getField("text")] [cardinality: 0.0,
op-cost: 0.0, total-cost: 0.0]
-- ASSIGN |PARTITIONED|
project ([$$r]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- |UNPARTITIONED|
data-scan []<-[$$164, $$165, $$r] <- marketing.reviews
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]
-- DATASOURCE_SCAN |PARTITIONED|
empty-tuple-source [cardinality: 0.0, op-cost: 0.0,
total-cost: 0.0]
-- EMPTY_TUPLE_SOURCE |PARTITIONED| {code}
The issue arises because subplans within subplans exist, and while iterating
over the subplan map to identify filters that can be pushed down, we encounter
new subplans that consume the output of the current subplan. To account for
these newly discovered subplans, we attempt to add them to the map during
iteration. However, since a regular HashMap is being used, which does not
support modifications while iterating, this leads to a
ConcurrentModificationException.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)