vvcephei commented on a change in pull request #11099:
URL: https://github.com/apache/kafka/pull/11099#discussion_r674847014



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
##########
@@ -92,33 +98,62 @@ public void init(final 
org.apache.kafka.streams.processor.ProcessorContext conte
         }
 
         @Override
-        public void process(final K key, final V value) {
+        public void process(final Record<KIn, VIn> record) {
             // if the key is null, then ignore the record
-            if (key == null) {
-                LOG.warn(
-                    "Skipping record due to null key. topic=[{}] 
partition=[{}] offset=[{}]",
-                    context().topic(), context().partition(), 
context().offset()
-                );
+            if (record.key() == null) {
+                if (context.recordMetadata().isPresent()) {
+                    final RecordMetadata recordMetadata = 
context.recordMetadata().get();
+                    LOG.warn(
+                        "Skipping record due to null key. "
+                            + "value=[{}] topic=[{}] partition=[{}] 
offset=[{}]",
+                        record.value(),
+                        recordMetadata.topic(), recordMetadata.partition(), 
recordMetadata.offset()
+                    );
+                } else {
+                    LOG.warn(
+                        "Skipping record due to null key. "
+                            + "value=[{}]. Topic, partition, and offset not 
known.",
+                        record.value()
+                    );
+                }
                 droppedRecordsSensor.record();
                 return;
             }
 
             if (queryableName != null) {
-                final ValueAndTimestamp<V> oldValueAndTimestamp = 
store.get(key);
-                final V oldValue;
+                final ValueAndTimestamp<VIn> oldValueAndTimestamp = 
store.get(record.key());
+                final VIn oldValue;
                 if (oldValueAndTimestamp != null) {
                     oldValue = oldValueAndTimestamp.value();
-                    if (context().timestamp() < 
oldValueAndTimestamp.timestamp()) {
-                        LOG.warn("Detected out-of-order KTable update for {} 
at offset {}, partition {}.",
-                            store.name(), context().offset(), 
context().partition());
+                    if (record.timestamp() < oldValueAndTimestamp.timestamp()) 
{
+                        if (context.recordMetadata().isPresent()) {
+                            final RecordMetadata recordMetadata = 
context.recordMetadata().get();
+                            LOG.warn(
+                                "Detected out-of-order KTable update for {}, "
+                                    + "old timestamp=[{}] new timestamp=[{}]. "
+                                    + "value=[{}] topic=[{}] partition=[{}] 
offset=[{}].",

Review comment:
       also here

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
##########
@@ -92,33 +98,62 @@ public void init(final 
org.apache.kafka.streams.processor.ProcessorContext conte
         }
 
         @Override
-        public void process(final K key, final V value) {
+        public void process(final Record<KIn, VIn> record) {
             // if the key is null, then ignore the record
-            if (key == null) {
-                LOG.warn(
-                    "Skipping record due to null key. topic=[{}] 
partition=[{}] offset=[{}]",
-                    context().topic(), context().partition(), 
context().offset()
-                );
+            if (record.key() == null) {
+                if (context.recordMetadata().isPresent()) {
+                    final RecordMetadata recordMetadata = 
context.recordMetadata().get();
+                    LOG.warn(
+                        "Skipping record due to null key. "
+                            + "value=[{}] topic=[{}] partition=[{}] 
offset=[{}]",

Review comment:
       Oh, I'm sorry, but it looks like we need one more revision. Useful as it 
would be at times, we can't log any data (keys, values, or headers) because it 
might leak sensitive information into the logs.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
##########
@@ -156,7 +156,7 @@ public void kTableShouldLogAndMeterOnSkippedRecords() {
                     .filter(e -> e.getLevel().equals("WARN"))
                     .map(Event::getMessage)
                     .collect(Collectors.toList()),
-                hasItem("Skipping record due to null key. topic=[topic] 
partition=[0] offset=[0]")
+                hasItem("Skipping record due to null key. value=[value] 
topic=[topic] partition=[0] offset=[0]")

Review comment:
       I probably don't need to point this out, but this will have to change 
back when you remove the value from the production code.
   
   On another note, I guess we could add a test for the other (new) code path 
when the metadata is absent. I'll leave it up to you.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -704,7 +696,7 @@ public void validateCopartition() {
     private void validateGlobalStoreArguments(final String sourceName,
                                               final String topic,
                                               final String processorName,
-                                              final ProcessorSupplier<?, ?, 
Void, Void> stateUpdateSupplier,
+                                              final ProcessorSupplier<?, ?, ?, 
?> stateUpdateSupplier,

Review comment:
       This should be able to roll back as well, right?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
##########
@@ -92,33 +98,62 @@ public void init(final 
org.apache.kafka.streams.processor.ProcessorContext conte
         }
 
         @Override
-        public void process(final K key, final V value) {
+        public void process(final Record<KIn, VIn> record) {
             // if the key is null, then ignore the record
-            if (key == null) {
-                LOG.warn(
-                    "Skipping record due to null key. topic=[{}] 
partition=[{}] offset=[{}]",
-                    context().topic(), context().partition(), 
context().offset()
-                );
+            if (record.key() == null) {
+                if (context.recordMetadata().isPresent()) {
+                    final RecordMetadata recordMetadata = 
context.recordMetadata().get();
+                    LOG.warn(
+                        "Skipping record due to null key. "
+                            + "value=[{}] topic=[{}] partition=[{}] 
offset=[{}]",
+                        record.value(),
+                        recordMetadata.topic(), recordMetadata.partition(), 
recordMetadata.offset()
+                    );
+                } else {
+                    LOG.warn(
+                        "Skipping record due to null key. "
+                            + "value=[{}]. Topic, partition, and offset not 
known.",
+                        record.value()
+                    );
+                }
                 droppedRecordsSensor.record();
                 return;
             }
 
             if (queryableName != null) {
-                final ValueAndTimestamp<V> oldValueAndTimestamp = 
store.get(key);
-                final V oldValue;
+                final ValueAndTimestamp<VIn> oldValueAndTimestamp = 
store.get(record.key());
+                final VIn oldValue;
                 if (oldValueAndTimestamp != null) {
                     oldValue = oldValueAndTimestamp.value();
-                    if (context().timestamp() < 
oldValueAndTimestamp.timestamp()) {
-                        LOG.warn("Detected out-of-order KTable update for {} 
at offset {}, partition {}.",
-                            store.name(), context().offset(), 
context().partition());
+                    if (record.timestamp() < oldValueAndTimestamp.timestamp()) 
{
+                        if (context.recordMetadata().isPresent()) {
+                            final RecordMetadata recordMetadata = 
context.recordMetadata().get();
+                            LOG.warn(
+                                "Detected out-of-order KTable update for {}, "
+                                    + "old timestamp=[{}] new timestamp=[{}]. "
+                                    + "value=[{}] topic=[{}] partition=[{}] 
offset=[{}].",
+                                store.name(),
+                                oldValueAndTimestamp.timestamp(), 
record.timestamp(),
+                                record.value(),
+                                recordMetadata.topic(), 
recordMetadata.offset(), recordMetadata.partition()
+                            );
+                        } else {
+                            LOG.warn(
+                                "Detected out-of-order KTable update for {}, "
+                                    + "old timestamp=[{}] new timestamp=[{}]. "
+                                    + "value=[{}]. Topic, partition and offset 
not known.",

Review comment:
       and here

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
##########
@@ -90,6 +92,13 @@ public String newStoreName(final String prefix) {
                 "store-"
             );
 
+        final ProcessorSupplier<Object, Object, Void, Void> processorSupplier 
= () ->
+            new ContextualProcessor<Object, Object, Void, Void>() {
+                @Override
+                public void process(final Record<Object, Object> record) {
+                }

Review comment:
       Huh, I'm surprised this works; I would have expected that the processor 
has to put the record in the store. If the test does still pass this way, it 
might reveal that the test is actually not evaluating anything.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to