This is an automated email from the ASF dual-hosted git repository. showuon pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 30d7d3b5ce4 MINOR: add size check for tagged fields (#13100) 30d7d3b5ce4 is described below commit 30d7d3b5ce42ed7663a90193be47e30aab75537f Author: Luke Chen <show...@gmail.com> AuthorDate: Wed Feb 22 11:52:21 2023 +0800 MINOR: add size check for tagged fields (#13100) Add size check for taggedFields of a tag, and add tests. Reviewers: Mickael Maison <mickael.mai...@gmail.com>, Divij Vaidya <di...@amazon.com> --- .../kafka/common/protocol/types/TaggedFields.java | 5 +++++ .../apache/kafka/common/protocol/types/Type.java | 1 + .../protocol/types/ProtocolSerializationTest.java | 22 ++++++++++++++++++++++ 3 files changed, 28 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/TaggedFields.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/TaggedFields.java index 26fb65830e6..901bf3f85aa 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/TaggedFields.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/TaggedFields.java @@ -100,6 +100,11 @@ public class TaggedFields extends DocumentedType { } prevTag = tag; int size = ByteUtils.readUnsignedVarint(buffer); + if (size < 0) + throw new SchemaException("field size " + size + " cannot be negative"); + if (size > buffer.remaining()) + throw new SchemaException("Error reading field of size " + size + ", only " + buffer.remaining() + " bytes available"); + Field field = fields.get(tag); if (field == null) { byte[] bytes = new byte[size]; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java index 4af74dbf4cc..bd4cb41c7e7 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java @@ -39,6 +39,7 @@ public abstract class Type { /** * Read the typed object from the buffer + * Please remember to do size validation before creating the container (ex: array) for the following data * * @throws SchemaException If the object is not valid for its type */ diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java index 811bcf4f88a..f96112dd806 100644 --- a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java +++ b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java @@ -220,6 +220,28 @@ public class ProtocolSerializationTest { } } + @Test + public void testReadTaggedFieldsSizeTooLarge() { + int tag = 1; + Type type = TaggedFields.of(tag, new Field("field", Type.NULLABLE_STRING)); + int size = 10; + ByteBuffer buffer = ByteBuffer.allocate(size); + int numTaggedFields = 1; + // write the number of tagged fields + ByteUtils.writeUnsignedVarint(numTaggedFields, buffer); + // write the tag of the first tagged fields + ByteUtils.writeUnsignedVarint(tag, buffer); + // write the size of tagged fields for this tag, using a large number for testing + ByteUtils.writeUnsignedVarint(Integer.MAX_VALUE, buffer); + int expectedRemaining = buffer.remaining(); + buffer.rewind(); + + // should throw SchemaException while reading the buffer, instead of OOM + Throwable e = assertThrows(SchemaException.class, () -> type.read(buffer)); + assertEquals("Error reading field of size " + Integer.MAX_VALUE + ", only " + expectedRemaining + " bytes available", + e.getMessage()); + } + @Test public void testReadNegativeArraySize() { Type type = new ArrayOf(Type.INT8);