[ 
https://issues.apache.org/jira/browse/KAFKA-10173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17141983#comment-17141983
 ] 

Karsten Schnitter commented on KAFKA-10173:
-------------------------------------------

Hi [~vvcephei],

after trying for the whole weekend, I finally found a solution to provide the 
messages. My approach is to change 
{{org.apache.kafka.streams.state.internals.BufferValue}} in a way to log during 
deserialisation:

{code:java}
public final class BufferValue {
        // omitted

        private static final Logger LOG = 
LoggerFactory.getLogger(BufferValue.class);

        static BufferValue deserialize(final ByteBuffer buffer) {
                final ProcessorRecordContext context = 
ProcessorRecordContext.deserialize(buffer);

                LOG.debug("Deserialize with context <{}>", context);

                try {
                        final byte[] priorValue = extractValue(buffer);

                        final byte[] oldValue;
                        final int oldValueLength = buffer.getInt();
                        if (oldValueLength == NULL_VALUE_SENTINEL) {
                                oldValue = null;
                        } else if (oldValueLength == 
OLD_PREV_DUPLICATE_VALUE_SENTINEL) {
                                oldValue = priorValue;
                        } else {
                                oldValue = new byte[oldValueLength];
                                buffer.get(oldValue);
                        }

                        final byte[] newValue = extractValue(buffer);

                        return new BufferValue(priorValue, oldValue, newValue, 
context);
                } catch (BufferUnderflowException underflow) {
                        LOG.error("Error deserializing buffer <{}>", 
bytesToHex(buffer.array()));
                        throw underflow;
                }
        }

        private static final char[] HEX_ARRAY = 
"0123456789ABCDEF".toCharArray();

        private static String bytesToHex(byte[] bytes) {
                char[] hexChars = new char[bytes.length * 2];
                for (int j = 0; j < bytes.length; j++) {
                        int v = bytes[j] & 0xFF;
                        hexChars[j * 2] = HEX_ARRAY[v >>> 4];
                        hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F];
                }
                return new String(hexChars);
        }

        // omitted
}
{code}

The result of this is somewhat confusing to me:

{noformat}
DEBUG.   Deserialize with context <ProcessorRecordContext{topic='logs-ingress', 
partition=57, offset=0, timestamp=1592648746703, headers=RecordHeaders(headers 
= [], isReadOnly = false)}>
ERROR.   Error deserializing buffer 
<00000172D14346CF00000000000000000000000C6C6F67732D696E6772657373000000390000000000000059FFFFFFFF00000051080110AC051A2436346139616437662D313838352D346234342D613163302D633134346565633162636665222436346139616437662D313838352D346234342D613163302D633134346565633162636665FFFFFFFF00000172D14346CF>
{noformat}

# The DEBUG log of the context tells me, that a message from my main topic was 
read. I would have expected a message from a store changelog. I guess the 
context is misleading here?
# The ERROR log has a rather short message, that fits much better to a store 
changelog. As a background: This is most likely a protobuf message containing 
two uuids encoded as strings together with some integer or long numbers.

The system I used was freshly created on Saturday, June 20th. It is running a 
Kafka Cluster on version 2.4.1 and the Kafka Streams application in version 
2.3.1. The error was recorded with the same Kafka Streams application updated 
to v2.5.0 with the modification from above. Since there are two stream threads, 
I can provide another buffer that fails:

{noformat}
ERROR.   Error deserializing buffer 
<00000172D1434655000000000000006F0000000C6C6F67732D696E6772657373000000180000000000000059FFFFFFFF00000051080110AC051A2436346139616437662D313838352D346234342D613163302D633134346565633162636665222436346139616437662D313838352D346234342D613163302D633134346565633162636665FFFFFFFF00000172D1434655>
{noformat}

Best Regards,
Karsten

> BufferUnderflowException during Kafka Streams Upgrade
> -----------------------------------------------------
>
>                 Key: KAFKA-10173
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10173
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.5.0
>            Reporter: Karsten Schnitter
>            Assignee: John Roesler
>            Priority: Major
>              Labels: suppress
>             Fix For: 2.5.1
>
>
> I migrated a Kafka Streams application from version 2.3.1 to 2.5.0. I 
> followed the steps described in the upgrade guide and set the property 
> {{migrate.from=2.3}}. On my dev system with just one running instance I got 
> the following exception:
> {noformat}
> stream-thread [0-StreamThread-2] Encountered the following error during 
> processing:
> java.nio.BufferUnderflowException: null
>       at java.base/java.nio.HeapByteBuffer.get(Unknown Source)
>       at java.base/java.nio.ByteBuffer.get(Unknown Source)
>       at 
> org.apache.kafka.streams.state.internals.BufferValue.extractValue(BufferValue.java:94)
>       at 
> org.apache.kafka.streams.state.internals.BufferValue.deserialize(BufferValue.java:83)
>       at 
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:368)
>       at 
> org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89)
>       at 
> org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:92)
>       at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:350)
>       at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94)
>       at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:401)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:779)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
> {noformat}
> I figured out, that this problem only occurs for stores, where I use the 
> suppress feature. If I rename the changelog topics during the migration, the 
> problem will not occur. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to