Ritik Raj created ASTERIXDB-3582:
------------------------------------

             Summary: Issues in values and filter pushdown for column 
collections
                 Key: ASTERIXDB-3582
                 URL: https://issues.apache.org/jira/browse/ASTERIXDB-3582
             Project: Apache AsterixDB
          Issue Type: Bug
          Components: COMP - Compiler, STO - Storage
            Reporter: Ritik Raj
            Assignee: Ritik Raj
             Fix For: 0.9.10


There are few issues identified related to value and filter pushdown for column 
collections.

1. For the following query
{code:java}
USE commerce.marketing;
CREATE FUNCTION sent(txt) {
LET pos = ["bomb","needs"], neg=["shrinks","shrunk","smaller"], exp = 
split(txt, " ")
SELECT CASE 
         WHEN (
               (SOME w IN pos SATISFIES (w IN exp))
               AND
               (EVERY w IN neg SATISFIES (w NOT IN exp))
              )     
         THEN "positive" 
         WHEN (
               (SOME w IN neg SATISFIES (w IN exp)) 
               AND 
               (EVERY w IN pos SATISFIES (w NOT IN exp))
              ) 
         THEN "negative"
         ELSE "neutral"
       END
}; 

USE commerce.marketing;
SELECT r.*, sent(r.text) FROM reviews r;{code}

The query gives out Internal Error with the following trace


{code:java}
2025-03-17T17:13:50.133+00:00 INFO CBAS.translator.QueryTranslator 
[QueryTranslator:4ba4e252-4fde-4a6e-8b97-039b11366919] null
java.util.ConcurrentModificationException: null
        at java.base/java.util.HashMap$HashIterator.nextNode(HashMap.java:1597) 
~[?:?]
        at java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1630) 
~[?:?]
        at java.base/java.util.HashMap$EntryIterator.next(HashMap.java:1628) 
~[?:?]
        at 
org.apache.asterix.optimizer.rules.pushdown.processor.AbstractFilterPushdownProcessor.putPotentialSelects(AbstractFilterPushdownProcessor.java:194)
 ~[asterix-algebra-1.0.3-2467.jar:1.0.3-2467]
        at 
org.apache.asterix.optimizer.rules.pushdown.processor.AbstractFilterPushdownProcessor.process(AbstractFilterPushdownProcessor.java:79)
 ~[asterix-algebra-1.0.3-2467.jar:1.0.3-2467]
        at 
org.apache.asterix.optimizer.rules.pushdown.PushdownProcessorsExecutor.execute(PushdownProcessorsExecutor.java:63)
 ~[asterix-algebra-1.0.3-2467.jar:1.0.3-2467]
        at 
org.apache.asterix.optimizer.rules.PushValueAccessAndFilterDownRule.rewritePre(PushValueAccessAndFilterDownRule.java:102)
 ~[asterix-algebra-1.0.3-2467.jar:1.0.3-2467]
        at 
org.apache.hyracks.algebricks.core.rewriter.base.AbstractRuleController.rewriteOperatorRef(AbstractRuleController.java:79)
 ~[algebricks-core-1.0.3-2467.jar:1.0.3-2467]
        at 
org.apache.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialOnceRuleController.rewriteWithRuleCollection(SequentialOnceRuleController.java:43)
 ~[algebricks-compiler-1.0.3-2467.jar:1.0.3-2467]
        at 
org.apache.hyracks.algebricks.core.rewriter.base.HeuristicOptimizer.runOptimizationSets(HeuristicOptimizer.java:92)
 ~[algebricks-core-1.0.3-2467.jar:1.0.3-2467]
        at 
org.apache.hyracks.algebricks.core.rewriter.base.HeuristicOptimizer.runPhysicalOptimizationSets(HeuristicOptimizer.java:122)
 ~[algebricks-core-1.0.3-2467.jar:1.0.3-2467]
        at 
org.apache.hyracks.algebricks.core.rewriter.base.HeuristicOptimizer.optimize(HeuristicOptimizer.java:66)
 ~[algebricks-core-1.0.3-2467.jar:1.0.3-2467]
        at 
org.apache.hyracks.algebricks.compiler.api.HeuristicCompilerFactoryBuilder$CompilerImpl.optimize(HeuristicCompilerFactoryBuilder.java:165)
 ~[algebricks-compiler-1.0.3-2467.jar:1.0.3-2467]
        at 
org.apache.asterix.api.common.APIFramework.compileQuery(APIFramework.java:289) 
~[asterix-app-1.0.3-2467.jar:1.0.3-2467]
        at 
org.apache.asterix.app.translator.QueryTranslator.rewriteCompileQuery(QueryTranslator.java:4322)
 ~[asterix-app-1.0.3-2467.jar:1.0.3-2467]
        at 
org.apache.asterix.app.translator.QueryTranslator.lambda$handleQuery$3(QueryTranslator.java:5280)
 ~[asterix-app-1.0.3-2467.jar:1.0.3-2467]
        at 
org.apache.asterix.app.translator.QueryTranslator.createAndRunJob(QueryTranslator.java:5433)
 ~[asterix-app-1.0.3-2467.jar:1.0.3-2467]
        at 
org.apache.asterix.app.translator.QueryTranslator.deliverResult(QueryTranslator.java:5326)
 ~[asterix-app-1.0.3-2467.jar:1.0.3-2467]
        at 
org.apache.asterix.app.translator.QueryTranslator.handleQuery(QueryTranslator.java:5296)
 ~[asterix-app-1.0.3-2467.jar:1.0.3-2467]
        at 
org.apache.asterix.app.translator.QueryTranslator.compileAndExecute(QueryTranslator.java:534)
 ~[asterix-app-1.0.3-2467.jar:1.0.3-2467]
        at 
org.apache.asterix.app.message.ExecuteStatementRequestMessage.handle(ExecuteStatementRequestMessage.java:181)
 ~[asterix-app-1.0.3-2467.jar:1.0.3-2467]
        at 
org.apache.asterix.messaging.CCMessageBroker.receivedMessage(CCMessageBroker.java:64)
 ~[asterix-app-1.0.3-2467.jar:1.0.3-2467]
        at 
org.apache.hyracks.control.cc.work.ApplicationMessageWork.lambda$notifyMessageBroker$0(ApplicationMessageWork.java:74)
 ~[hyracks-control-cc-1.0.3-2467.jar:1.0.3-2467]
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
 [?:?]
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
 [?:?]
        at java.base/java.lang.Thread.run(Thread.java:840) [?:?] {code}

The reason behind failure is we are trying to modify a map which tracks the 
subplan operators while iterating the map. 

if we see the below plan:


{code:java}
distribute result [$$163] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]  
-- DISTRIBUTE_RESULT  |PARTITIONED|  
  project ([$$163]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]  
  -- STREAM_PROJECT  |PARTITIONED|  
    assign [$$163] <- [{"$1": $$162}] [cardinality: 0.0, op-cost: 0.0, 
total-cost: 0.0]  
    -- ASSIGN  |PARTITIONED|  
      project ([$$162]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] -- 
|UNPARTITIONED|  
        subplan {  
                  aggregate [$$162] <- [listify($$161)] [cardinality: 0.0, 
op-cost: 0.0, total-cost: 0.0]  
                  -- AGGREGATE  |LOCAL|  
                    assign [$$161] <- [{"$2": switch-case(true, and($$140, 
$$146), "positive", and($$153, $$159), "negative", "neutral")}] [cardinality: 
0.0, op-cost: 0.0, total-cost: 0.0]  
                    -- ASSIGN  |LOCAL|  
                      subplan {  
                                aggregate [$$159] <- [empty-stream()] 
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]  
                                -- AGGREGATE  |LOCAL|  
                                  select (not(if-missing-or-null($$158, 
false))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]  
                                  -- STREAM_SELECT  |LOCAL|  
                                    subplan {  
                                              aggregate [$$158] <- 
[empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]  
                                              -- AGGREGATE  |LOCAL|  
                                                select 
(not(if-missing-or-null(neq($$w, $#6), false))) [cardinality: 0.0, op-cost: 
0.0, total-cost: 0.0]  
                                                -- STREAM_SELECT  |LOCAL|  
                                                  unnest $#6 <- 
scan-collection(split($$166, " ")) [cardinality: 0.0, op-cost: 0.0, total-cost: 
0.0]  
                                                  -- UNNEST  |LOCAL|  
                                                    nested tuple source 
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]  
                                                    -- NESTED_TUPLE_SOURCE  
|LOCAL|  
                                           } [cardinality: 0.0, op-cost: 0.0, 
total-cost: 0.0]  
                                    -- SUBPLAN  |LOCAL|  
                                      unnest $$w <- scan-collection(array: [ 
"bomb", "needs" ]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]  
                                      -- UNNEST  |LOCAL|  
                                        nested tuple source [cardinality: 0.0, 
op-cost: 0.0, total-cost: 0.0]  
                                        -- NESTED_TUPLE_SOURCE  |LOCAL|  
                             } [cardinality: 0.0, op-cost: 0.0, total-cost: 
0.0]  
                      -- SUBPLAN  |LOCAL|  
                        subplan {  
                                  aggregate [$$153] <- [non-empty-stream()] 
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]  
                                  -- AGGREGATE  |LOCAL|  
                                    select ($$152) [cardinality: 0.0, op-cost: 
0.0, total-cost: 0.0]  
                                    -- STREAM_SELECT  |LOCAL|  
                                      subplan {  
                                                aggregate [$$152] <- 
[non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]  
                                                -- AGGREGATE  |LOCAL|  
                                                  select (eq($$w, $#5)) 
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]  
                                                  -- STREAM_SELECT  |LOCAL|  
                                                    unnest $#5 <- 
scan-collection(split($$166, " ")) [cardinality: 0.0, op-cost: 0.0, total-cost: 
0.0]  
                                                    -- UNNEST  |LOCAL|  
                                                      nested tuple source 
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]  
                                                      -- NESTED_TUPLE_SOURCE  
|LOCAL|  
                                             } [cardinality: 0.0, op-cost: 0.0, 
total-cost: 0.0]  
                                      -- SUBPLAN  |LOCAL|  
                                        unnest $$w <- scan-collection(array: [ 
"shrinks", "shrunk", "smaller" ]) [cardinality: 0.0, op-cost: 0.0, total-cost: 
0.0]  
                                        -- UNNEST  |LOCAL|  
                                          nested tuple source [cardinality: 
0.0, op-cost: 0.0, total-cost: 0.0]  
                                          -- NESTED_TUPLE_SOURCE  |LOCAL|  
                               } [cardinality: 0.0, op-cost: 0.0, total-cost: 
0.0]  
                        -- SUBPLAN  |LOCAL|  
                          subplan {  
                                    aggregate [$$146] <- [empty-stream()] 
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]  
                                    -- AGGREGATE  |LOCAL|  
                                      select (not(if-missing-or-null($$145, 
false))) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]  
                                      -- STREAM_SELECT  |LOCAL|  
                                        subplan {  
                                                  aggregate [$$145] <- 
[empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]  
                                                  -- AGGREGATE  |LOCAL|  
                                                    select 
(not(if-missing-or-null(neq($$w, $#4), false))) [cardinality: 0.0, op-cost: 
0.0, total-cost: 0.0]  
                                                    -- STREAM_SELECT  |LOCAL|  
                                                      unnest $#4 <- 
scan-collection(split($$166, " ")) [cardinality: 0.0, op-cost: 0.0, total-cost: 
0.0]  
                                                      -- UNNEST  |LOCAL|  
                                                        nested tuple source 
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]  
                                                        -- NESTED_TUPLE_SOURCE  
|LOCAL|  
                                               } [cardinality: 0.0, op-cost: 
0.0, total-cost: 0.0]  
                                        -- SUBPLAN  |LOCAL|  
                                          unnest $$w <- scan-collection(array: 
[ "shrinks", "shrunk", "smaller" ]) [cardinality: 0.0, op-cost: 0.0, 
total-cost: 0.0]  
                                          -- UNNEST  |LOCAL|  
                                            nested tuple source [cardinality: 
0.0, op-cost: 0.0, total-cost: 0.0]  
                                            -- NESTED_TUPLE_SOURCE  |LOCAL|  
                                 } [cardinality: 0.0, op-cost: 0.0, total-cost: 
0.0]  
                          -- SUBPLAN  |LOCAL|  
                            subplan {  
                                      aggregate [$$140] <- [non-empty-stream()] 
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]  
                                      -- AGGREGATE  |LOCAL|  
                                        select ($$139) [cardinality: 0.0, 
op-cost: 0.0, total-cost: 0.0]  
                                        -- STREAM_SELECT  |LOCAL|  
                                          subplan {  
                                                    aggregate [$$139] <- 
[non-empty-stream()] [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]  
                                                    -- AGGREGATE  |LOCAL|  
                                                      select (eq($$w, $#3)) 
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]  
                                                      -- STREAM_SELECT  |LOCAL| 
 
                                                        unnest $#3 <- 
scan-collection(split($$166, " ")) [cardinality: 0.0, op-cost: 0.0, total-cost: 
0.0]  
                                                        -- UNNEST  |LOCAL|  
                                                          nested tuple source 
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]  
                                                          -- 
NESTED_TUPLE_SOURCE  |LOCAL|  
                                                 } [cardinality: 0.0, op-cost: 
0.0, total-cost: 0.0]  
                                          -- SUBPLAN  |LOCAL|  
                                            unnest $$w <- 
scan-collection(array: [ "bomb", "needs" ]) [cardinality: 0.0, op-cost: 0.0, 
total-cost: 0.0]  
                                            -- UNNEST  |LOCAL|  
                                              nested tuple source [cardinality: 
0.0, op-cost: 0.0, total-cost: 0.0]  
                                              -- NESTED_TUPLE_SOURCE  |LOCAL|  
                                   } [cardinality: 0.0, op-cost: 0.0, 
total-cost: 0.0]  
                            -- SUBPLAN  |LOCAL|  
                              nested tuple source [cardinality: 0.0, op-cost: 
0.0, total-cost: 0.0]  
                              -- NESTED_TUPLE_SOURCE  |LOCAL|  
               } [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]  
        -- SUBPLAN  |PARTITIONED|  
          project ([$$166]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]  
          -- STREAM_PROJECT  |PARTITIONED|  
            assign [$$166] <- [$$r.getField("text")] [cardinality: 0.0, 
op-cost: 0.0, total-cost: 0.0]  
            -- ASSIGN  |PARTITIONED|  
              project ([$$r]) [cardinality: 0.0, op-cost: 0.0, total-cost: 0.0] 
-- |UNPARTITIONED|  
                data-scan []<-[$$164, $$165, $$r] <- marketing.reviews 
[cardinality: 0.0, op-cost: 0.0, total-cost: 0.0]  
                -- DATASOURCE_SCAN  |PARTITIONED|  
                  empty-tuple-source [cardinality: 0.0, op-cost: 0.0, 
total-cost: 0.0]  
                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED| {code}
 

The issue arises because subplans within subplans exist, and while iterating 
over the subplan map to identify filters that can be pushed down, we encounter 
new subplans that consume the output of the current subplan. To account for 
these newly discovered subplans, we attempt to add them to the map during 
iteration. However, since a regular HashMap is being used, which does not 
support modifications while iterating, this leads to a 
ConcurrentModificationException.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to