Hello Team,

Recently we have developed one file processing solution using Camel and
wanted to get some review comments or any better alternative on the current
implementation if any.

Trying to add in detail to explain but if any queries would try to clarify
again.

So the the requirement we had -

1) Our input file is around 1.5 GB arranged in message blocks which we
would be reading from AWS -S3 bucket

2) After reading the file we need to process the data as per some business
rules and create a pipe separated output file which need to upload in
destination s3 bucket.

# Our Current Solution

In order to achieve that we have used 2 routes as mentioned below -

a) Route -1

Route Definition -

            from(timer("startTimer").repeatCount(1))

           .noStreamCaching()

           .process(e -> {

                     e.getIn().setBody(createRange());

           })

           .to(direct("start"))

           .end();

   1.

   The reason for writing this route is that as it is a large file we had
   to use the S3 range object to get selective bytes from the file for
   processing.
   2.

   Here we are getting file size by sending additional HeadObjectRequest to
   s3.
   3.

   Then In the body we are setting the byte range list like - let's say if
   the file size is 819200 bytes then the range array list contains 4 objects-
   (0, 2047999) (2048000 - 4095999).. till end of bytes.
   4.

   Then sending that list of ranges to the route -2 direct endpoint for
   further processing and to fetch the real data from s3 based on the range
   that we created.
   5.

   As we had to start this route automatically, we used a timer component
   which automatically starts and can call route-2 directly.
   6.

   We tried to get rid of this additional init route with the help of
   ProducerTemplate option but we are getting exception "Caused by:
   java.util.concurrent.RejectedExecutionException: CamelContext is stopped"
   if we used outside the camel Processor.


b) Route -2

   1.

   For reading the data we have used the camel getObjectRange option in
   aws-s3 component and got the ResponseInputStream in the exchange.
   2.

   Inside the fileProcessor we process the data in parallel by using the
   executor framework.
   3.

   Once the data has been processed we need to marshal it.
   4.

   After that in order to upload it back to s3 we used the multi part
   option of aws-s3 component. But as it needs the whole file before upload
   starts hence we need to create the file locally.
   5.

   Finally once all the ranges are processed then inside the postProcessor
   we pass that file object to the exchange body and send it to s3 using
   multipart.


Route Definition -

from(direct("start"))

 .noStreamCaching()

 .onCompletion()

 .process(postProcessor)

 
.to("aws2-s3://test-bucket?s3Client=#client&multiPartUpload=true&partSize=10485760")

 .end()

 .split(body())

 .streaming()

 .process(exchange -> {

 ItemDto item = (ItemDto) exchange.getIn().getBody();

 exchange.getIn().setHeader(AWS2S3Constants.RANGE_START, item.getFrom());

 exchange.getIn().setHeader(AWS2S3Constants.RANGE_END, item.getTo());

 exchange.getIn().setHeader(BLOCK_SEQUENCE, item.getBlockSeq());

 exchange.getIn().setHeader(AWS2S3Constants.KEY, config.getFileName());

 })
.to("aws2-s3://test-bucket?s3Client=#client&repeatCount=1&deleteAfterRead=false&fileName=testfile.dat&operation=getObjectRange")

 .process(FileProcessor)

 .marshal(bindy)

 .to(file(tempFilePath).fileExist("Append").fileName(TEMP_FILE_NAME))

 .end();

The above code is working fine as expected so far but request you to kindly
review the above route definitions and let us know any
suggestions/improvements we can try?

Thanks in advance.

Reply via email to