This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new a7e38a7d527 Change aws2 kinesis record body type to byte array (#14179)
a7e38a7d527 is described below

commit a7e38a7d52758351e07fb244c14e9e8e61a6cf10
Author: fanyang <ko...@qq.com>
AuthorDate: Sat May 18 16:16:40 2024 +0800

    Change aws2 kinesis record body type to byte array (#14179)
    
    Co-authored-by: Fan Yang <fan...@microsoft.com>
---
 .../org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java   | 2 +-
 .../org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java   | 6 ++----
 2 files changed, 3 insertions(+), 5 deletions(-)

diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index 93209375dcd..fd9b1ee098d 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -303,7 +303,7 @@ public class Kinesis2Consumer extends 
ScheduledBatchPollingConsumer implements R
     protected Exchange createExchange(Shard shard, Record dataRecord) {
         LOG.debug("Received Kinesis record with partition_key={}", 
dataRecord.partitionKey());
         Exchange exchange = createExchange(true);
-        exchange.getIn().setBody(dataRecord.data().asInputStream());
+        exchange.getIn().setBody(dataRecord.data().asByteArray());
         exchange.getIn().setHeader(Kinesis2Constants.APPROX_ARRIVAL_TIME, 
dataRecord.approximateArrivalTimestamp());
         exchange.getIn().setHeader(Kinesis2Constants.PARTITION_KEY, 
dataRecord.partitionKey());
         exchange.getIn().setHeader(Kinesis2Constants.SEQUENCE_NUMBER, 
dataRecord.sequenceNumber());
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
index 1763dd3fe83..ce8903aa4ac 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
@@ -16,8 +16,6 @@
  */
 package org.apache.camel.component.aws2.kinesis;
 
-import java.nio.ByteBuffer;
-
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.support.DefaultProducer;
@@ -57,12 +55,12 @@ public class Kinesis2Producer extends DefaultProducer {
     }
 
     private PutRecordRequest createRequest(Exchange exchange) {
-        ByteBuffer body = exchange.getIn().getBody(ByteBuffer.class);
+        byte[] body = exchange.getIn().getBody(byte[].class);
         Object partitionKey = 
exchange.getIn().getHeader(Kinesis2Constants.PARTITION_KEY);
         Object sequenceNumber = 
exchange.getIn().getHeader(Kinesis2Constants.SEQUENCE_NUMBER);
 
         PutRecordRequest.Builder putRecordRequest = PutRecordRequest.builder();
-        putRecordRequest.data(SdkBytes.fromByteBuffer(body));
+        putRecordRequest.data(SdkBytes.fromByteArray(body));
         
putRecordRequest.streamName(getEndpoint().getConfiguration().getStreamName());
         putRecordRequest.partitionKey(partitionKey.toString());
         if (sequenceNumber != null) {

Reply via email to