Re: aggregate EIP: wrong correlation key set for aggregate exchanges
This appears to be a bug. The attached test case is failing at line #95 The issue appears to be around the following line of code: onCompletion(batchKey, originalExchange, batchAnswer, false, aggregateFailed); https://github.com/apache/camel/blob/cd5c790e18f00288d1ac62aca909efb99a7a4846/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java#L633 This is called for all the correlation keys with the same originalExchange. The method onCompletion sets the property CamelAggregatedCorrelationKey on originalMessage. So originalMessage is left with the last update for this key. orginalMessage is (appears to be) the stored aggregate for the current message that triggered completion. It does not appear to have a reason to be associated with other keys. On Monday, 20 November, 2023 at 11:01:15 am GST, Dinu Pavithran wrote: I am seeing wrong correlation key is set for the first exchange emitted from aggregate EIP. In the log below, Exchange[7F40FFF506AA26C-000D] is aggregating with correlation key: batch-1, but when this exchange is emitted, the property CamelAggregatedCorrelationKey is set as batch-4 I saw the same result with Camel v 4.2.0 run using camel-jbang and Camel v 3.18.4 using camel-main started from Groovy or Java I also saw the same result when using different aggregation strategies. Is there something wrong with the way I have configured the EIP? --- zip-files-2.yaml --- - route: id: zip-files from: uri: file:{{indir}} parameters: sort-by: reverse:file:modified #last modified will be the first pre-sort: true recursive: true max-depth: 3 min-depth: 3 antInclude: batches/** antExclude: '*/processing-*/*' include-ext: csv repeatCount: 1 synchronous: true steps: - setHeader: name: Batch simple: ${header.CamelFileParent.replaceFirst('{{inDir}}/batches/','')} - aggregate: correlation-expression: header: Batch aggregation-strategy: '#class:org.apache.camel.processor.aggregate.zipfile.ZipAggregationStrategy' completion-from-batch-consumer: true eager-check-completion: true #simplified - setHeader: name: LatestBatch constant: batch-4 - setHeader: name: RunID constant: 20231120063433118 - setHeader: name: Batch exchangeProperty: CamelAggregatedCorrelationKey - log: loggingLevel: debug message: Processing batch ${headers.Batch} from exchange ${exchangeId} - filter: simple: ${headers.LatestBatch} == ${headers.Batch} steps: # for tracing - to: file:{{indir}}/{{sentDir}}?file-name=Processing-${header.RunID}-${header.Batch}.zip - log: Wrote ${headers.Batch} to ${header.CamelFileNameProduced} - log: Send ${headers.Batch} --- tree {{indir}}/batches -- in ├── batches │ ├── batch-1 │ │ ├── events.csv │ │ └── users.csv │ ├── batch-2 │ │ ├── events.csv │ │ └── users.csv │ ├── batch-3 │ │ ├── events.csv │ │ └── users.csv │ ├── batch-4 │ │ ├── events.csv │ │ └── users.csv │ └── processing-batch-5 │ ├── events.csv │ └── users.csv -- log --- 06:34:29.845 INFO [ main] mel.main.MainSupport : Apache Camel (JBang) 4.2.0 is starting 06:34:30.061 INFO [ main] mel.main.MainSupport : Using Java 17.0.8.1 with PID 481790. Started by vscode in /home/vscode/camel 06:34:30.956 INFO [ main] main.BaseMainSupport : Auto-configuration summary 06:34:30.956 INFO [ main] main.BaseMainSupport : [application.properties] camel.main.durationMaxIdleSeconds=1 06:34:30.956 INFO [ main] main.BaseMainSupport : [application.properties] camel.main.shutdownTimeout=5 06:34:30.957 INFO [ main] main.BaseMainSupport : [application.properties] camel.server.enabled=false 06:34:30.957 INFO [ main] main.BaseMainSupport : [application.properties] camel.server.healthCheckEnabled=true 06:34:30.957 INFO [ main] main.BaseMainSupport : [application.properties] camel.server.devConsoleEnabled=true 06:34:30.958 INFO [ main] main.BaseMainSupport : [application.properties] camel.health.enabled=false 06:34:30.958 INFO [ main] main.BaseMainSupport : [application.properties] camel.health.exposureLevel=full 06:34:31.364 INFO [ main] or.LocalCliConnector : Camel CLI enabled (local) 06:34:31.744 INFO [ main] AbstractCamelContext : Apache Camel 4.2.0 (zip-files-2) is starting 06:34:31.915 INFO [ main] e.AggregateProcessor : Defaulting to MemoryAggregationRepository 06:34:32.058 INFO [ main] main.BaseMainSupport : Property-placeholders summary 06:34:32.058 INFO [ main
aggregate EIP: wrong correlation key set for aggregate exchanges
I am seeing wrong correlation key is set for the first exchange emitted from aggregate EIP. In the log below, Exchange[7F40FFF506AA26C-000D] is aggregating with correlation key: batch-1, but when this exchange is emitted, the property CamelAggregatedCorrelationKey is set as batch-4 I saw the same result with Camel v 4.2.0 run using camel-jbang and Camel v 3.18.4 using camel-main started from Groovy or Java I also saw the same result when using different aggregation strategies. Is there something wrong with the way I have configured the EIP? --- zip-files-2.yaml --- - route: id: zip-files from: uri: file:{{indir}} parameters: sort-by: reverse:file:modified #last modified will be the first pre-sort: true recursive: true max-depth: 3 min-depth: 3 antInclude: batches/** antExclude: '*/processing-*/*' include-ext: csv repeatCount: 1 synchronous: true steps: - setHeader: name: Batch simple: ${header.CamelFileParent.replaceFirst('{{inDir}}/batches/','')} - aggregate: correlation-expression: header: Batch aggregation-strategy: '#class:org.apache.camel.processor.aggregate.zipfile.ZipAggregationStrategy' completion-from-batch-consumer: true eager-check-completion: true #simplified - setHeader: name: LatestBatch constant: batch-4 - setHeader: name: RunID constant: 20231120063433118 - setHeader: name: Batch exchangeProperty: CamelAggregatedCorrelationKey - log: loggingLevel: debug message: Processing batch ${headers.Batch} from exchange ${exchangeId} - filter: simple: ${headers.LatestBatch} == ${headers.Batch} steps: # for tracing - to: file:{{indir}}/{{sentDir}}?file-name=Processing-${header.RunID}-${header.Batch}.zip - log: Wrote ${headers.Batch} to ${header.CamelFileNameProduced} - log: Send ${headers.Batch} --- tree {{indir}}/batches -- in ├── batches │ ├── batch-1 │ │ ├── events.csv │ │ └── users.csv │ ├── batch-2 │ │ ├── events.csv │ │ └── users.csv │ ├── batch-3 │ │ ├── events.csv │ │ └── users.csv │ ├── batch-4 │ │ ├── events.csv │ │ └── users.csv │ └── processing-batch-5 │ ├── events.csv │ └── users.csv -- log --- 06:34:29.845 INFO [ main] mel.main.MainSupport : Apache Camel (JBang) 4.2.0 is starting 06:34:30.061 INFO [ main] mel.main.MainSupport : Using Java 17.0.8.1 with PID 481790. Started by vscode in /home/vscode/camel 06:34:30.956 INFO [ main] main.BaseMainSupport : Auto-configuration summary 06:34:30.956 INFO [ main] main.BaseMainSupport : [application.properties] camel.main.durationMaxIdleSeconds=1 06:34:30.956 INFO [ main] main.BaseMainSupport : [application.properties] camel.main.shutdownTimeout=5 06:34:30.957 INFO [ main] main.BaseMainSupport : [application.properties] camel.server.enabled=false 06:34:30.957 INFO [ main] main.BaseMainSupport : [application.properties] camel.server.healthCheckEnabled=true 06:34:30.957 INFO [ main] main.BaseMainSupport : [application.properties] camel.server.devConsoleEnabled=true 06:34:30.958 INFO [ main] main.BaseMainSupport : [application.properties] camel.health.enabled=false 06:34:30.958 INFO [ main] main.BaseMainSupport : [application.properties] camel.health.exposureLevel=full 06:34:31.364 INFO [ main] or.LocalCliConnector : Camel CLI enabled (local) 06:34:31.744 INFO [ main] AbstractCamelContext : Apache Camel 4.2.0 (zip-files-2) is starting 06:34:31.915 INFO [ main] e.AggregateProcessor : Defaulting to MemoryAggregationRepository 06:34:32.058 INFO [ main] main.BaseMainSupport : Property-placeholders summary 06:34:32.058 INFO [ main] main.BaseMainSupport : [application.properties] indir=in 06:34:32.059 INFO [ main] main.BaseMainSupport : [application.properties] batchDir=in/batches 06:34:32.059 INFO [ main] main.BaseMainSupport : [application.properties] sentDir=send 06:34:32.060 INFO [ main] main.BaseMainSupport : [application.properties] skippedDir=skipped 06:34:32.096 INFO [ main] AbstractCamelContext : Routes startup (started:1) 06:34:32.097 INFO [ main] AbstractCamelContext : Started zip-files (file://in) 06:34:32.097 INFO [ main] AbstractCamelContext : Apache Camel 4.2.0 (zip-files-2) started in 352ms (build:0ms init:0ms start:352ms) 06:34:32.100 INFO [ main] mel.main.MainSupport : Waiting until complete: Duration idle 1 seconds 06:34:33.140 DEBUG [ file://in] zip-files-2.yaml:62 : File