Re: aggregate EIP: wrong correlation key set for aggregate exchanges

2023-11-20 Thread Dinu Pavithran
This appears to be a bug. The attached test case is failing at line #95

The issue appears to be around the following line of code:

onCompletion(batchKey, originalExchange, batchAnswer, false, aggregateFailed);

https://github.com/apache/camel/blob/cd5c790e18f00288d1ac62aca909efb99a7a4846/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java#L633

This is called for all the correlation keys with the same originalExchange. The 
method onCompletion sets the property CamelAggregatedCorrelationKey on 
originalMessage. So originalMessage  is left with the last update for this key. 
orginalMessage is (appears to be) the stored aggregate for the current message 
that triggered completion. It does not appear to have a reason to be associated 
with other keys. 



On Monday, 20 November, 2023 at 11:01:15 am GST, Dinu Pavithran 
 wrote: 





I am seeing wrong correlation key is set for the first exchange emitted from 
aggregate EIP. 

In the log below, 
Exchange[7F40FFF506AA26C-000D] is aggregating with correlation key: 
batch-1, but when this exchange is emitted, the property 
CamelAggregatedCorrelationKey is set as batch-4

I saw the same result with Camel v 4.2.0 run using camel-jbang and Camel v 
3.18.4 using camel-main started from Groovy or Java
I also saw the same result when using different aggregation strategies.

Is there something wrong with the way I have configured the EIP?


--- zip-files-2.yaml ---

- route:
    id: zip-files
    from:
      uri: file:{{indir}}
      parameters:
        sort-by: reverse:file:modified #last modified will be the first 
        pre-sort: true
        recursive: true
        max-depth: 3
        min-depth: 3
        antInclude: batches/**
        antExclude: '*/processing-*/*'
        include-ext: csv
        repeatCount: 1
        synchronous: true      
      steps:
      - setHeader:
          name: Batch
          simple: 
${header.CamelFileParent.replaceFirst('{{inDir}}/batches/','')}

      - aggregate: 
          correlation-expression: 
            header: Batch
          aggregation-strategy: 
'#class:org.apache.camel.processor.aggregate.zipfile.ZipAggregationStrategy'
          completion-from-batch-consumer: true
          eager-check-completion: true

      #simplified
      - setHeader:   
          name: LatestBatch
          constant: batch-4
      - setHeader:   
          name: RunID
          constant: 20231120063433118 


      - setHeader:
          name: Batch
          exchangeProperty: CamelAggregatedCorrelationKey

      - log: 
          loggingLevel: debug
          message: Processing batch ${headers.Batch} from exchange ${exchangeId}

      - filter:
          simple: ${headers.LatestBatch} == ${headers.Batch}
          steps:
            # for tracing
            - to: 
file:{{indir}}/{{sentDir}}?file-name=Processing-${header.RunID}-${header.Batch}.zip
            - log: Wrote ${headers.Batch} to ${header.CamelFileNameProduced}
        
            - log: Send ${headers.Batch}

--- tree {{indir}}/batches -- 
in
├── batches
│   ├── batch-1
│   │   ├── events.csv
│   │   └── users.csv
│   ├── batch-2
│   │   ├── events.csv
│   │   └── users.csv
│   ├── batch-3
│   │   ├── events.csv
│   │   └── users.csv
│   ├── batch-4
│   │   ├── events.csv
│   │   └── users.csv
│   └── processing-batch-5
│       ├── events.csv
│       └── users.csv

-- log ---
06:34:29.845  INFO [      main] mel.main.MainSupport : Apache Camel (JBang) 
4.2.0 is starting
06:34:30.061  INFO [      main] mel.main.MainSupport : Using Java 17.0.8.1 with 
PID 481790. Started by vscode in /home/vscode/camel
06:34:30.956  INFO [      main] main.BaseMainSupport : Auto-configuration 
summary
06:34:30.956  INFO [      main] main.BaseMainSupport :     
[application.properties]       camel.main.durationMaxIdleSeconds=1
06:34:30.956  INFO [      main] main.BaseMainSupport :     
[application.properties]       camel.main.shutdownTimeout=5
06:34:30.957  INFO [      main] main.BaseMainSupport :     
[application.properties]       camel.server.enabled=false
06:34:30.957  INFO [      main] main.BaseMainSupport :     
[application.properties]       camel.server.healthCheckEnabled=true
06:34:30.957  INFO [      main] main.BaseMainSupport :     
[application.properties]       camel.server.devConsoleEnabled=true
06:34:30.958  INFO [      main] main.BaseMainSupport :     
[application.properties]       camel.health.enabled=false
06:34:30.958  INFO [      main] main.BaseMainSupport :     
[application.properties]       camel.health.exposureLevel=full
06:34:31.364  INFO [      main] or.LocalCliConnector : Camel CLI enabled (local)
06:34:31.744  INFO [      main] AbstractCamelContext : Apache Camel 4.2.0 
(zip-files-2) is starting
06:34:31.915  INFO [      main] e.AggregateProcessor : Defaulting to 
MemoryAggregationRepository
06:34:32.058  INFO [      main] main.BaseMainSupport : Property-placeholders 
summary
06:34:32.058  INFO [      main

aggregate EIP: wrong correlation key set for aggregate exchanges

2023-11-19 Thread Dinu Pavithran
I am seeing wrong correlation key is set for the first exchange emitted from 
aggregate EIP. 

In the log below, 
Exchange[7F40FFF506AA26C-000D] is aggregating with correlation key: 
batch-1, but when this exchange is emitted, the property 
CamelAggregatedCorrelationKey is set as batch-4

I saw the same result with Camel v 4.2.0 run using camel-jbang and Camel v 
3.18.4 using camel-main started from Groovy or Java
I also saw the same result when using different aggregation strategies.

Is there something wrong with the way I have configured the EIP?


--- zip-files-2.yaml ---

- route:
    id: zip-files
    from:
      uri: file:{{indir}}
      parameters:
        sort-by: reverse:file:modified #last modified will be the first 
        pre-sort: true
        recursive: true
        max-depth: 3
        min-depth: 3
        antInclude: batches/**
        antExclude: '*/processing-*/*'
        include-ext: csv
        repeatCount: 1
        synchronous: true      
      steps:
      - setHeader:
          name: Batch
          simple: 
${header.CamelFileParent.replaceFirst('{{inDir}}/batches/','')}

      - aggregate: 
          correlation-expression: 
            header: Batch
          aggregation-strategy: 
'#class:org.apache.camel.processor.aggregate.zipfile.ZipAggregationStrategy'
          completion-from-batch-consumer: true
          eager-check-completion: true

      #simplified
      - setHeader:   
          name: LatestBatch
          constant: batch-4
      - setHeader:   
          name: RunID
          constant: 20231120063433118 


      - setHeader:
          name: Batch
          exchangeProperty: CamelAggregatedCorrelationKey

      - log: 
          loggingLevel: debug
          message: Processing batch ${headers.Batch} from exchange ${exchangeId}

      - filter:
          simple: ${headers.LatestBatch} == ${headers.Batch}
          steps:
            # for tracing
            - to: 
file:{{indir}}/{{sentDir}}?file-name=Processing-${header.RunID}-${header.Batch}.zip
            - log: Wrote ${headers.Batch} to ${header.CamelFileNameProduced}
        
            - log: Send ${headers.Batch}

--- tree {{indir}}/batches -- 
in
├── batches
│   ├── batch-1
│   │   ├── events.csv
│   │   └── users.csv
│   ├── batch-2
│   │   ├── events.csv
│   │   └── users.csv
│   ├── batch-3
│   │   ├── events.csv
│   │   └── users.csv
│   ├── batch-4
│   │   ├── events.csv
│   │   └── users.csv
│   └── processing-batch-5
│       ├── events.csv
│       └── users.csv

-- log ---
06:34:29.845  INFO [      main] mel.main.MainSupport : Apache Camel (JBang) 
4.2.0 is starting
06:34:30.061  INFO [      main] mel.main.MainSupport : Using Java 17.0.8.1 with 
PID 481790. Started by vscode in /home/vscode/camel
06:34:30.956  INFO [      main] main.BaseMainSupport : Auto-configuration 
summary
06:34:30.956  INFO [      main] main.BaseMainSupport :     
[application.properties]       camel.main.durationMaxIdleSeconds=1
06:34:30.956  INFO [      main] main.BaseMainSupport :     
[application.properties]       camel.main.shutdownTimeout=5
06:34:30.957  INFO [      main] main.BaseMainSupport :     
[application.properties]       camel.server.enabled=false
06:34:30.957  INFO [      main] main.BaseMainSupport :     
[application.properties]       camel.server.healthCheckEnabled=true
06:34:30.957  INFO [      main] main.BaseMainSupport :     
[application.properties]       camel.server.devConsoleEnabled=true
06:34:30.958  INFO [      main] main.BaseMainSupport :     
[application.properties]       camel.health.enabled=false
06:34:30.958  INFO [      main] main.BaseMainSupport :     
[application.properties]       camel.health.exposureLevel=full
06:34:31.364  INFO [      main] or.LocalCliConnector : Camel CLI enabled (local)
06:34:31.744  INFO [      main] AbstractCamelContext : Apache Camel 4.2.0 
(zip-files-2) is starting
06:34:31.915  INFO [      main] e.AggregateProcessor : Defaulting to 
MemoryAggregationRepository
06:34:32.058  INFO [      main] main.BaseMainSupport : Property-placeholders 
summary
06:34:32.058  INFO [      main] main.BaseMainSupport :     
[application.properties]       indir=in
06:34:32.059  INFO [      main] main.BaseMainSupport :     
[application.properties]       batchDir=in/batches
06:34:32.059  INFO [      main] main.BaseMainSupport :     
[application.properties]       sentDir=send
06:34:32.060  INFO [      main] main.BaseMainSupport :     
[application.properties]       skippedDir=skipped
06:34:32.096  INFO [      main] AbstractCamelContext : Routes startup 
(started:1)
06:34:32.097  INFO [      main] AbstractCamelContext :     Started zip-files 
(file://in)
06:34:32.097  INFO [      main] AbstractCamelContext : Apache Camel 4.2.0 
(zip-files-2) started in 352ms (build:0ms init:0ms start:352ms)
06:34:32.100  INFO [      main] mel.main.MainSupport : Waiting until complete: 
Duration idle 1 seconds
06:34:33.140 DEBUG [ file://in]  zip-files-2.yaml:62 : File