[ 
https://issues.apache.org/jira/browse/KAFKA-10173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17142460#comment-17142460
 ] 

John Roesler commented on KAFKA-10173:
--------------------------------------

Wow, thanks for that great information, [~karsten.schnitter] !

I was able to reproduce your exception with this code:
{code:java}
public final class BufferValue {
...

public static void main(String[] args) {
    final String 
str="00000172D14346CF00000000000000000000000C6C6F67732D696E6772657373000000390000000000000059FFFFFFFF00000051080110AC051A2436346139616437662D313838352D346234342D613163302D633134346565633162636665222436346139616437662D313838352D346234342D613163302D633134346565633162636665FFFFFFFF00000172D14346CF";
    final byte[] arr = hexStringToByteArray(str);

    final BufferValue deserialize = deserialize(ByteBuffer.wrap(arr));
    System.out.println(deserialize);
}

public static byte[] hexStringToByteArray(String s) {
    int len = s.length();
    byte[] data = new byte[len / 2];
    for (int i = 0; i < len; i += 2) {
        data[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4)
            + Character.digit(s.charAt(i+1), 16));
    }
    return data;
}

...
}
{code}
I also see the context deserialized as:
{code:java}
ProcessorRecordContext{topic='logs-ingress', partition=57, offset=0, 
timestamp=1592648746703, headers=RecordHeaders(headers = [], isReadOnly = 
false)}{code}
Note, this is not the record context of the suppression changelog, it's the 
context of the input record that got suppressed.

Incidentally, the context I get for that second record is:
{code:java}
ProcessorRecordContext{topic='logs-ingress', partition=24, offset=111, 
timestamp=1592648746581, headers=RecordHeaders(headers = [], isReadOnly = 
false)}{code}
One hypothesis I was hoping to verify was that we are in fact correctly 
attempting to deserialize a "version 2" record, not another record we're 
erroneously interpreting as "version 2". It looks like this is the case, 
otherwise it seems like we'd be unlikely to get a valid-looking 
ProcessorRecordContext. Still, maybe you can confirm that your topic 
"logs-ingress" really has a partition 57, and that the record at offset 0 has 
timestamp 1592648746703? And also, I guess, that it doesn't have any headers?

At least, that looks like a plausible timestamp:
{code:java}
jshell> new Date(1592648746703L)
$1 ==> Sat Jun 20 05:25:46 CDT 2020

{code}
Just looking at what is getting unpacked from this buffer (which ultimately 
results in that exception):
 # the context above
 # a "prior value" for this key (a byte array of length 89) of (hex): 
"FFFFFFFF00000051080110AC051A2436346139616437662D313838352D346234342D613163302D633134346565633162636665222436346139616437662D313838352D346234342D613163302D633134346565633162636665"
 ** Surprisingly, the "prior value" for the other record you gave me is exactly 
the same! It comes from a different partition of logs-ingress, though, so I 
don't think it could be the same record.
 # a null "old value"
 # When we come to extract the "new value" part, we read the length of the new 
value as 370, but at this point, we're at position 141 in the buffer, which is 
only 145 bytes long, so we get the underflow because we try to read 370 bytes, 
but there are only 4 bytes remaining.

Unfortunately, I still don't have any good answers at this point.

I think it's suspicious that we are reading a record that is apparently from 
offset zero of its topic partition, yet it has a "prior value".

[~karsten.schnitter], I'm sorry to put this back on you, but can you try to:
 # Verify that the context we got above looks legitimate? If not, it'll help 
narrow down where my serializer went wrong.
 # If you're able, perhaps you can take a look at the input records from the 
partition and offset that we see encoded here to see if it has the same key as 
the record we're trying to restore from the suppression changelog?
 # In addition to the information you've captured so far, can you also try to 
dump out the whole record we're restoring in `restoreBatch`? That one will let 
me look at the metadata for the changelog entry itself, not just the record we 
suppressed.
 # I'm really sorry to do this to you, but perhaps you can add some more 
debugging to the ByteBuffer#serialize path as well, so that we can see what 
actually went in when we wrote these records?

I'm still a little mystified about how we even wind up in this position... 
We're deserializing a version 2 changelog record, which must have been written 
by the _upgraded_ application, since version 2 was introduced in 2.4.0. But if 
all you're doing to repro this is run the app with 2.3.1 and then restarting 
with version 2.5.0, it shouldn't even be possible to get a version 2 record in 
the restore path.

I'll keep trying to figure it out, but any further assistance you can provide 
is greatly appreciated. Thanks (and sorry).

-John

> BufferUnderflowException during Kafka Streams Upgrade
> -----------------------------------------------------
>
>                 Key: KAFKA-10173
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10173
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.5.0
>            Reporter: Karsten Schnitter
>            Assignee: John Roesler
>            Priority: Major
>              Labels: suppress
>             Fix For: 2.5.1
>
>
> I migrated a Kafka Streams application from version 2.3.1 to 2.5.0. I 
> followed the steps described in the upgrade guide and set the property 
> {{migrate.from=2.3}}. On my dev system with just one running instance I got 
> the following exception:
> {noformat}
> stream-thread [0-StreamThread-2] Encountered the following error during 
> processing:
> java.nio.BufferUnderflowException: null
>       at java.base/java.nio.HeapByteBuffer.get(Unknown Source)
>       at java.base/java.nio.ByteBuffer.get(Unknown Source)
>       at 
> org.apache.kafka.streams.state.internals.BufferValue.extractValue(BufferValue.java:94)
>       at 
> org.apache.kafka.streams.state.internals.BufferValue.deserialize(BufferValue.java:83)
>       at 
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:368)
>       at 
> org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89)
>       at 
> org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:92)
>       at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:350)
>       at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94)
>       at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:401)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:779)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
> {noformat}
> I figured out, that this problem only occurs for stores, where I use the 
> suppress feature. If I rename the changelog topics during the migration, the 
> problem will not occur. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to