orpiske edited a comment on pull request #1262:
URL: 
https://github.com/apache/camel-kafka-connector/pull/1262#issuecomment-933633337


   I understand the motivation for the fix, but I think that adding a fix that 
is somewhat specific to Kinesis in the core code is not the right way. 
Otherwise, the header `CamelAwsKinesisPartitionKey` as added on the PR would 
leak for other connectors that are not related to AWS Kinesis in any way. 
   
   The proposed solution to use the SMT transformation is more adequate 
precisely because it can be implemented at the connector level. For instance, 
if you have an transformation such as: 
   
   ```
   @Override
       public R apply(R r) {
           Object value = r.value();
   
           if (value instanceof MyTypeMessage) {
   
               // do any other specific transformation that you need and create 
the record using API that allows it to include extra headers
               return r.newRecord(r.topic(), r.kafkaPartition(), null, r.key(), 
buildSchemaBuilderForType(// ... code));
   
           } else {
               LOG.debug("Unexpected message type: {}", value == null ? "null 
instance" : value.getClass());
   
               return r;
           }
       }
   ```
   
   I think that if you create a SMT that uses this [connect record 
API](https://kafka.apache.org/23/javadoc/org/apache/kafka/connect/connector/ConnectRecord.html#newRecord-java.lang.String-java.lang.Integer-org.apache.kafka.connect.data.Schema-java.lang.Object-org.apache.kafka.connect.data.Schema-java.lang.Object-java.lang.Long-java.lang.Iterable-)
 to include the header as you need, it would do the trick. 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to