Hello, I try to use the camel elasticsearch-component to push a list of objects to an elastic search with bulk operation (camel with spring boot in 4.0.3 version) After reading the documentation, and search samples, i can't use the elasticsearch bulk with camel. Does anyone try the bulk operation with success?
Details : I push a list of elastic index operations (BulkOperation) without success (code at the end) without result in my elastic search (elastic stay empty) I try to send BulkRequest.Builder instead and list of operation to the elasticsearch-component. The operations are well in debuger. The elastic search is well configured (i can push blank document to him). I have an error : 2024-02-13T15:31:29,258 [Camel (camel-1) thread #8 - Aggregator] INFO DefaultErrorHandler - Failed delivery for (MessageId: 02649700127A6F9-00000000000004CA on ExchangeId: 02649700127A6F9-00000000000004CA). On delivery attempt: 0 caught: org.apache.camel.CamelExchangeException: An error occurred while executing the action. Exchange[02649700127A6F9-00000000000004CA]. Caused by: [java.util.concurrent.CompletionException - jakarta.json.JsonException: Jackson exception] In can transform object my class in json with jackson in the rest of the project (it require module jackson-datatype-jdk8, andjackson-datatype-jsr310 that are in the classpath) Code that generates the index operation list == public void createElasticsearchBulkExchange(Exchange aggregatedExchange) { List<Exchange> exchangeOperations = aggregatedExchange.getMessage().getBody(List.class); List<BulkOperation> operationsList = new ArrayList<>(200); for (Exchange exchange : exchangeOperations) { // exchange operation contains à list of objects to push to elastic BulkOperation indexOperation = (new BulkOperation.Builder()).index(operation -> operation.id(getEntityId(exchange)) .version(getVersion(exchange)) .versionType(VersionType.ExternalGte) .index(elasticPjConfiguration.getIngestAlias()) .document(exchange.getMessage().getBody(MyObj.class))).build(); //MyObj is th operationsList.add(indexOperation); } aggregatedExchange.getMessage().setHeader(ElasticsearchConstants.PARAM_INDEX_NAME, elasticPjConfiguration.getIngestAlias()); aggregatedExchange.getMessage().setHeader(ElasticsearchConstants.PARAM_DOCUMENT_MODE, true); aggregatedExchange.getMessage().setHeader(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchOperation.Bulk); aggregatedExchange.getMessage().setHeader(ElasticsearchConstants.PARAM_DOCUMENT_CLASS, MyObj.class); aggregatedExchange.getMessage().setHeader(ElasticsearchConstants.PARAM_SIZE, operationsList.size()); aggregatedExchange.getMessage().setBody(operationsList); } Part of the camel road (the elastic search index seams configured well, i have add empty document in the elastic) == .process(elasticsearchIndexUtilsService::createElasticsearchBulkExchange) .to(elasticsearch("elasticsearch") .operation(ElasticsearchOperation.Bulk) .indexName(elasticIndexPbConfigurationManager.getIngestAliasName())) [bandeau_pagesjaunes3]<https://www.youtube.com/playlist?list=PLlp-0-iXtsZMOsEKlmySbkrj7VNjqUOfH> [bandeau_partenaires]<https://www.solocal.com> Les informations contenues dans le présent message sont strictement confidentielles et ne sont destinées qu'à l'usage de la ou des personne(s) dont le nom apparaît en qualité de destinataire(s) et de tout autre personne spécifiquement autorisée à les recevoir. Si vous n'êtes pas la personne à qui ce message est destiné, nous vous informons qu'il est strictement interdit de le lire, diffuser, de le distribuer ou d'en faire des copies, totalement ou partiellement, sur tout support, notamment un support électronique, ou autre. La présente interdiction s'applique tant au message lui-même qu'aux documents qui peuvent être joints audit message. Si vous recevez ce message par erreur, nous vous remercions de bien vouloir le détruire ainsi que toute copie et de signaler l'erreur à l'envoyeur par retour d'e-mail.