Hi

Thank you very much for reporting and creating the JIRA and sending the PR

Nice to see the small reproducer app running easily with camel-jbang

On Mon, Nov 20, 2023 at 8:01 AM Dinu Pavithran <din...@yahoo.com.invalid>
wrote:

> I am seeing wrong correlation key is set for the first exchange emitted
> from aggregate EIP.
>
> In the log below,
> Exchange[7F40FFF506AA26C-000000000000000D] is aggregating with correlation
> key: batch-1, but when this exchange is emitted, the
> property CamelAggregatedCorrelationKey is set as batch-4
>
> I saw the same result with Camel v 4.2.0 run using camel-jbang and Camel v
> 3.18.4 using camel-main started from Groovy or Java
> I also saw the same result when using different aggregation strategies.
>
> Is there something wrong with the way I have configured the EIP?
>
>
> --- zip-files-2.yaml ---
>
> - route:
>     id: zip-files
>     from:
>       uri: file:{{indir}}
>       parameters:
>         sort-by: reverse:file:modified #last modified will be the first
>         pre-sort: true
>         recursive: true
>         max-depth: 3
>         min-depth: 3
>         antInclude: batches/**
>         antExclude: '*/processing-*/*'
>         include-ext: csv
>         repeatCount: 1
>         synchronous: true
>       steps:
>       - setHeader:
>           name: Batch
>           simple:
> ${header.CamelFileParent.replaceFirst('{{inDir}}/batches/','')}
>
>       - aggregate:
>           correlation-expression:
>             header: Batch
>           aggregation-strategy:
> '#class:org.apache.camel.processor.aggregate.zipfile.ZipAggregationStrategy'
>           completion-from-batch-consumer: true
>           eager-check-completion: true
>
>       #simplified
>       - setHeader:
>           name: LatestBatch
>           constant: batch-4
>       - setHeader:
>           name: RunID
>           constant: 20231120063433118
>
>
>       - setHeader:
>           name: Batch
>           exchangeProperty: CamelAggregatedCorrelationKey
>
>       - log:
>           loggingLevel: debug
>           message: Processing batch ${headers.Batch} from exchange
> ${exchangeId}
>
>       - filter:
>           simple: ${headers.LatestBatch} == ${headers.Batch}
>           steps:
>             # for tracing
>             - to:
> file:{{indir}}/{{sentDir}}?file-name=Processing-${header.RunID}-${header.Batch}.zip
>             - log: Wrote ${headers.Batch} to
> ${header.CamelFileNameProduced}
>
>             - log: Send ${headers.Batch}
>
> --- tree {{indir}}/batches --
> in
> ├── batches
> │   ├── batch-1
> │   │   ├── events.csv
> │   │   └── users.csv
> │   ├── batch-2
> │   │   ├── events.csv
> │   │   └── users.csv
> │   ├── batch-3
> │   │   ├── events.csv
> │   │   └── users.csv
> │   ├── batch-4
> │   │   ├── events.csv
> │   │   └── users.csv
> │   └── processing-batch-5
> │       ├── events.csv
> │       └── users.csv
>
> -- log ---
> 06:34:29.845  INFO [      main] mel.main.MainSupport : Apache Camel
> (JBang) 4.2.0 is starting
> 06:34:30.061  INFO [      main] mel.main.MainSupport : Using Java 17.0.8.1
> with PID 481790. Started by vscode in /home/vscode/camel
> 06:34:30.956  INFO [      main] main.BaseMainSupport : Auto-configuration
> summary
> 06:34:30.956  INFO [      main] main.BaseMainSupport :
>  [application.properties]       camel.main.durationMaxIdleSeconds=1
> 06:34:30.956  INFO [      main] main.BaseMainSupport :
>  [application.properties]       camel.main.shutdownTimeout=5
> 06:34:30.957  INFO [      main] main.BaseMainSupport :
>  [application.properties]       camel.server.enabled=false
> 06:34:30.957  INFO [      main] main.BaseMainSupport :
>  [application.properties]       camel.server.healthCheckEnabled=true
> 06:34:30.957  INFO [      main] main.BaseMainSupport :
>  [application.properties]       camel.server.devConsoleEnabled=true
> 06:34:30.958  INFO [      main] main.BaseMainSupport :
>  [application.properties]       camel.health.enabled=false
> 06:34:30.958  INFO [      main] main.BaseMainSupport :
>  [application.properties]       camel.health.exposureLevel=full
> 06:34:31.364  INFO [      main] or.LocalCliConnector : Camel CLI enabled
> (local)
> 06:34:31.744  INFO [      main] AbstractCamelContext : Apache Camel 4.2.0
> (zip-files-2) is starting
> 06:34:31.915  INFO [      main] e.AggregateProcessor : Defaulting to
> MemoryAggregationRepository
> 06:34:32.058  INFO [      main] main.BaseMainSupport :
> Property-placeholders summary
> 06:34:32.058  INFO [      main] main.BaseMainSupport :
>  [application.properties]       indir=in
> 06:34:32.059  INFO [      main] main.BaseMainSupport :
>  [application.properties]       batchDir=in/batches
> 06:34:32.059  INFO [      main] main.BaseMainSupport :
>  [application.properties]       sentDir=send
> 06:34:32.060  INFO [      main] main.BaseMainSupport :
>  [application.properties]       skippedDir=skipped
> 06:34:32.096  INFO [      main] AbstractCamelContext : Routes startup
> (started:1)
> 06:34:32.097  INFO [      main] AbstractCamelContext :     Started
> zip-files (file://in)
> 06:34:32.097  INFO [      main] AbstractCamelContext : Apache Camel 4.2.0
> (zip-files-2) started in 352ms (build:0ms init:0ms start:352ms)
> 06:34:32.100  INFO [      main] mel.main.MainSupport : Waiting until
> complete: Duration idle 1 seconds
> 06:34:33.140 DEBUG [ file://in]  zip-files-2.yaml:62 : File
> batches/batch-4/events.csv is for batch  batch-4
> 06:34:33.157 TRACE [ file://in] e.AggregateProcessor : onAggregation +++
> start +++ with correlation key: batch-4
> 06:34:33.185 TRACE [ file://in] e.AggregateProcessor : In progress
> aggregated oldExchange: null, newExchange:
> Exchange[7F40FFF506AA26C-0000000000000001] with correlation key: batch-4
> 06:34:33.186 TRACE [ file://in] e.AggregateProcessor : onAggregation +++
> end  +++ with correlation key: batch-4
> 06:34:33.205 DEBUG [ file://in]  zip-files-2.yaml:62 : File
> batches/batch-4/users.csv is for batch  batch-4
> 06:34:33.214 TRACE [ file://in] e.AggregateProcessor : onAggregation +++
> start +++ with correlation key: batch-4
> 06:34:33.217 TRACE [ file://in] e.AggregateProcessor : In progress
> aggregated oldExchange: Exchange[7F40FFF506AA26C-0000000000000001],
> newExchange: Exchange[7F40FFF506AA26C-0000000000000001] with correlation
> key: batch-4
> 06:34:33.217 TRACE [ file://in] e.AggregateProcessor : onAggregation +++
> end  +++ with correlation key: batch-4
> 06:34:33.232 DEBUG [ file://in]  zip-files-2.yaml:62 : File
> batches/batch-3/users.csv is for batch  batch-3
> 06:34:33.238 TRACE [ file://in] e.AggregateProcessor : onAggregation +++
> start +++ with correlation key: batch-3
> 06:34:33.242 TRACE [ file://in] e.AggregateProcessor : In progress
> aggregated oldExchange: null, newExchange:
> Exchange[7F40FFF506AA26C-0000000000000005] with correlation key: batch-3
> 06:34:33.242 TRACE [ file://in] e.AggregateProcessor : onAggregation +++
> end  +++ with correlation key: batch-3
> 06:34:33.261 DEBUG [ file://in]  zip-files-2.yaml:62 : File
> batches/batch-3/events.csv is for batch  batch-3
> 06:34:33.273 TRACE [ file://in] e.AggregateProcessor : onAggregation +++
> start +++ with correlation key: batch-3
> 06:34:33.276 TRACE [ file://in] e.AggregateProcessor : In progress
> aggregated oldExchange: Exchange[7F40FFF506AA26C-0000000000000005],
> newExchange: Exchange[7F40FFF506AA26C-0000000000000005] with correlation
> key: batch-3
> 06:34:33.277 TRACE [ file://in] e.AggregateProcessor : onAggregation +++
> end  +++ with correlation key: batch-3
> 06:34:33.291 DEBUG [ file://in]  zip-files-2.yaml:62 : File
> batches/batch-2/events.csv is for batch  batch-2
> 06:34:33.296 TRACE [ file://in] e.AggregateProcessor : onAggregation +++
> start +++ with correlation key: batch-2
> 06:34:33.299 TRACE [ file://in] e.AggregateProcessor : In progress
> aggregated oldExchange: null, newExchange:
> Exchange[7F40FFF506AA26C-0000000000000009] with correlation key: batch-2
> 06:34:33.299 TRACE [ file://in] e.AggregateProcessor : onAggregation +++
> end  +++ with correlation key: batch-2
> 06:34:33.312 DEBUG [ file://in]  zip-files-2.yaml:62 : File
> batches/batch-2/users.csv is for batch  batch-2
> 06:34:33.317 TRACE [ file://in] e.AggregateProcessor : onAggregation +++
> start +++ with correlation key: batch-2
> 06:34:33.320 TRACE [ file://in] e.AggregateProcessor : In progress
> aggregated oldExchange: Exchange[7F40FFF506AA26C-0000000000000009],
> newExchange: Exchange[7F40FFF506AA26C-0000000000000009] with correlation
> key: batch-2
> 06:34:33.320 TRACE [ file://in] e.AggregateProcessor : onAggregation +++
> end  +++ with correlation key: batch-2
> 06:34:33.333 DEBUG [ file://in]  zip-files-2.yaml:62 : File
> batches/batch-1/users.csv is for batch  batch-1
> 06:34:33.340 TRACE [ file://in] e.AggregateProcessor : onAggregation +++
> start +++ with correlation key: batch-1
> 06:34:33.343 TRACE [ file://in] e.AggregateProcessor : In progress
> aggregated oldExchange: null, newExchange:
> Exchange[7F40FFF506AA26C-000000000000000D] with correlation key: batch-1
> 06:34:33.343 TRACE [ file://in] e.AggregateProcessor : onAggregation +++
> end  +++ with correlation key: batch-1
> 06:34:33.357 DEBUG [ file://in]  zip-files-2.yaml:62 : File
> batches/batch-1/events.csv is for batch  batch-1
> 06:34:33.364 TRACE [ file://in] e.AggregateProcessor : onAggregation +++
> start +++ with correlation key: batch-1
> 06:34:33.367 TRACE [ file://in] e.AggregateProcessor : onAggregation +++
> end  +++ with correlation key: batch-1
> 06:34:33.367 DEBUG [ file://in] e.AggregateProcessor : Aggregation
> complete for correlation key batch-1 sending aggregated exchange:
> Exchange[7F40FFF506AA26C-000000000000000D]
> 06:34:33.367 DEBUG [ file://in] e.AggregateProcessor : Processing
> aggregated exchange: Exchange[7F40FFF506AA26C-000000000000000D]
> 06:34:33.370 DEBUG [ file://in] e.AggregateProcessor : Aggregation
> complete for correlation key batch-1 sending aggregated exchange:
> Exchange[7F40FFF506AA26C-0000000000000009]
> 06:34:33.370 DEBUG [ file://in] e.AggregateProcessor : Processing
> aggregated exchange: Exchange[7F40FFF506AA26C-0000000000000009]
> 06:34:33.371 DEBUG [ file://in] e.AggregateProcessor : Aggregation
> complete for correlation key batch-1 sending aggregated exchange:
> Exchange[7F40FFF506AA26C-0000000000000005]
> 06:34:33.371 DEBUG [ file://in] e.AggregateProcessor : Processing
> aggregated exchange: Exchange[7F40FFF506AA26C-0000000000000005]
> 06:34:33.371 DEBUG [ file://in] e.AggregateProcessor : Aggregation
> complete for correlation key batch-1 sending aggregated exchange:
> Exchange[7F40FFF506AA26C-0000000000000001]
> 06:34:33.371 DEBUG [ file://in] e.AggregateProcessor : Processing
> aggregated exchange: Exchange[7F40FFF506AA26C-0000000000000001]
> 06:34:33.392 DEBUG [Aggregator] zip-files-2.yaml:106 : Processing batch
> batch-4 from exchange 7F40FFF506AA26C-000000000000000D
> 06:34:33.411  INFO [Aggregator] zip-files-2.yaml:132 : Wrote batch-4 to
> in/send/Processing-20231120063433118-batch-4.zip
> 06:34:33.413  INFO [Aggregator] zip-files-2.yaml:135 : Send batch-4
> 06:34:33.414 TRACE [Aggregator] e.AggregateProcessor : Aggregated exchange
> onComplete: Exchange[7F40FFF506AA26C-000000000000000D]
> 06:34:33.415 TRACE [Aggregator] e.AggregateProcessor : Processing
> aggregated exchange: Exchange[7F40FFF506AA26C-000000000000000D] complete.
> 06:34:33.424 DEBUG [Aggregator] zip-files-2.yaml:106 : Processing batch
> batch-2 from exchange 7F40FFF506AA26C-0000000000000009
> 06:34:33.435 TRACE [Aggregator] e.AggregateProcessor : Aggregated exchange
> onComplete: Exchange[7F40FFF506AA26C-0000000000000009]
> 06:34:33.435 TRACE [Aggregator] e.AggregateProcessor : Processing
> aggregated exchange: Exchange[7F40FFF506AA26C-0000000000000009] complete.
> 06:34:33.445 DEBUG [Aggregator] zip-files-2.yaml:106 : Processing batch
> batch-3 from exchange 7F40FFF506AA26C-0000000000000005
> 06:34:33.460 TRACE [Aggregator] e.AggregateProcessor : Aggregated exchange
> onComplete: Exchange[7F40FFF506AA26C-0000000000000005]
> 06:34:33.460 TRACE [Aggregator] e.AggregateProcessor : Processing
> aggregated exchange: Exchange[7F40FFF506AA26C-0000000000000005] complete.
> 06:34:33.466 DEBUG [Aggregator] zip-files-2.yaml:106 : Processing batch
> batch-4 from exchange 7F40FFF506AA26C-0000000000000001
> 06:34:33.483  INFO [Aggregator] zip-files-2.yaml:132 : Wrote batch-4 to
> in/send/Processing-20231120063433118-batch-4.zip
> 06:34:33.485  INFO [Aggregator] zip-files-2.yaml:135 : Send batch-4
> 06:34:33.486 TRACE [Aggregator] e.AggregateProcessor : Aggregated exchange
> onComplete: Exchange[7F40FFF506AA26C-0000000000000001]
> 06:34:33.486 TRACE [Aggregator] e.AggregateProcessor : Processing
> aggregated exchange: Exchange[7F40FFF506AA26C-0000000000000001] complete.
> 06:34:34.962  INFO [2-thread-1] ainLifecycleStrategy : Duration max idle
> triggering shutdown of the JVM
> 06:34:34.965  INFO [melContext] AbstractCamelContext : Apache Camel 4.2.0
> (zip-files-2) is shutting down (timeout:5s0ms)
> 06:34:34.982  INFO [melContext] AbstractCamelContext : Routes stopped
> (stopped:1)
> 06:34:34.982  INFO [melContext] AbstractCamelContext :     Stopped
> zip-files (file://in)
> 06:34:34.994  INFO [melContext] AbstractCamelContext : Apache Camel 4.2.0
> (zip-files-2) shutdown in 28ms (uptime:3s)
> 06:34:34.995  INFO [      main] mel.main.MainSupport : Apache Camel
> (JBang) 4.2.0 shutdown
>


-- 
Claus Ibsen
-----------------
@davsclaus
Camel in Action 2: https://www.manning.com/ibsen2

Reply via email to