This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new a7e38a7d527 Change aws2 kinesis record body type to byte array (#14179) a7e38a7d527 is described below commit a7e38a7d52758351e07fb244c14e9e8e61a6cf10 Author: fanyang <ko...@qq.com> AuthorDate: Sat May 18 16:16:40 2024 +0800 Change aws2 kinesis record body type to byte array (#14179) Co-authored-by: Fan Yang <fan...@microsoft.com> --- .../org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java | 2 +- .../org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java index 93209375dcd..fd9b1ee098d 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java @@ -303,7 +303,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R protected Exchange createExchange(Shard shard, Record dataRecord) { LOG.debug("Received Kinesis record with partition_key={}", dataRecord.partitionKey()); Exchange exchange = createExchange(true); - exchange.getIn().setBody(dataRecord.data().asInputStream()); + exchange.getIn().setBody(dataRecord.data().asByteArray()); exchange.getIn().setHeader(Kinesis2Constants.APPROX_ARRIVAL_TIME, dataRecord.approximateArrivalTimestamp()); exchange.getIn().setHeader(Kinesis2Constants.PARTITION_KEY, dataRecord.partitionKey()); exchange.getIn().setHeader(Kinesis2Constants.SEQUENCE_NUMBER, dataRecord.sequenceNumber()); diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java index 1763dd3fe83..ce8903aa4ac 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java @@ -16,8 +16,6 @@ */ package org.apache.camel.component.aws2.kinesis; -import java.nio.ByteBuffer; - import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.support.DefaultProducer; @@ -57,12 +55,12 @@ public class Kinesis2Producer extends DefaultProducer { } private PutRecordRequest createRequest(Exchange exchange) { - ByteBuffer body = exchange.getIn().getBody(ByteBuffer.class); + byte[] body = exchange.getIn().getBody(byte[].class); Object partitionKey = exchange.getIn().getHeader(Kinesis2Constants.PARTITION_KEY); Object sequenceNumber = exchange.getIn().getHeader(Kinesis2Constants.SEQUENCE_NUMBER); PutRecordRequest.Builder putRecordRequest = PutRecordRequest.builder(); - putRecordRequest.data(SdkBytes.fromByteBuffer(body)); + putRecordRequest.data(SdkBytes.fromByteArray(body)); putRecordRequest.streamName(getEndpoint().getConfiguration().getStreamName()); putRecordRequest.partitionKey(partitionKey.toString()); if (sequenceNumber != null) {