rhauch commented on a change in pull request #7496:
URL: https://github.com/apache/kafka/pull/7496#discussion_r438497632



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -507,6 +508,18 @@ private SinkRecord convertAndTransformRecord(final 
ConsumerRecord<byte[], byte[]
         return transformationChain.apply(origRecord);
     }
 
+    private SchemaAndValue convertKeyValue(ConsumerRecord<byte[], byte[]> msg, 
boolean isKey) {
+        try {
+            byte[] value = isKey ? msg.key() : msg.value();
+            Converter converter = isKey ? keyConverter : valueConverter;
+            return converter.toConnectData(msg.topic(), msg.headers(), value);
+        } catch (Exception e) {
+            log.error("Error converting message {} in topic '{}' partition {} 
at offset {}",
+                    isKey ? ConverterType.KEY.getName() : 
ConverterType.VALUE.getName(), msg.topic(), msg.partition(), msg.offset());
+            throw e;
+        }
+    }
+

Review comment:
       Since the calling code already knows whether it's a key or value, how 
about just having separate methods? Yeah, they'd be mostly the same, but we 
could avoid the superfluous logic and could simplify things a bit.
   
   Also, would it be better to wrap the exception rather than just log the 
error? Especially with the retry operator, it's possible that the error won't 
get logged near this log message, so we'd lose the correlation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to