This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new a68a8b6 KAFKA-6511; Corrected connect list/map parsing logic (#4516) a68a8b6 is described below commit a68a8b6b6cc47d488ada99e4bfe87cc7467621bb Author: Randall Hauch <rha...@gmail.com> AuthorDate: Tue Feb 13 13:44:12 2018 -0600 KAFKA-6511; Corrected connect list/map parsing logic (#4516) Corrected the parsing of invalid list values. A list can only be parsed if it contains elements that have a common type, and a map can only be parsed if it contains keys with a common type and values with a common type. Reviewers: Arjun Satish <ar...@confluent.io>, Magesh Nandakumar <magesh.n.ku...@gmail.com>, Konstantine Karantasis <konstant...@confluent.io>, Jason Gustafson <ja...@confluent.io> --- .../java/org/apache/kafka/connect/data/Values.java | 29 ++++-- .../org/apache/kafka/connect/data/ValuesTest.java | 112 +++++++++++++++++++++ 2 files changed, 132 insertions(+), 9 deletions(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java index 41040c7..05248ef 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java @@ -749,10 +749,16 @@ public class Values { Schema elementSchema = null; while (parser.hasNext()) { if (parser.canConsume(ARRAY_END_DELIMITER)) { - Schema listSchema = elementSchema == null ? null : SchemaBuilder.array(elementSchema).schema(); + Schema listSchema = null; + if (elementSchema != null) { + listSchema = SchemaBuilder.array(elementSchema).schema(); + } result = alignListEntriesWithSchema(listSchema, result); return new SchemaAndValue(listSchema, result); } + if (parser.canConsume(COMMA_DELIMITER)) { + throw new DataException("Unable to parse an empty array element: " + parser.original()); + } SchemaAndValue element = parse(parser, true); elementSchema = commonSchemaFor(elementSchema, element); result.add(element.value()); @@ -760,9 +766,9 @@ public class Values { } // Missing either a comma or an end delimiter if (COMMA_DELIMITER.equals(parser.previous())) { - throw new DataException("Malformed array: missing element after ','"); + throw new DataException("Array is missing element after ',': " + parser.original()); } - throw new DataException("Malformed array: missing terminating ']'"); + throw new DataException("Array is missing terminating ']': " + parser.original()); } if (parser.canConsume(MAP_BEGIN_DELIMITER)) { @@ -771,17 +777,22 @@ public class Values { Schema valueSchema = null; while (parser.hasNext()) { if (parser.canConsume(MAP_END_DELIMITER)) { - Schema mapSchema = - keySchema == null || valueSchema == null ? null : SchemaBuilder.map(keySchema, valueSchema).schema(); + Schema mapSchema = null; + if (keySchema != null && valueSchema != null) { + mapSchema = SchemaBuilder.map(keySchema, valueSchema).schema(); + } result = alignMapKeysAndValuesWithSchema(mapSchema, result); return new SchemaAndValue(mapSchema, result); } + if (parser.canConsume(COMMA_DELIMITER)) { + throw new DataException("Unable to parse a map entry has no key or value: " + parser.original()); + } SchemaAndValue key = parse(parser, true); if (key == null || key.value() == null) { - throw new DataException("Malformed map entry: null key"); + throw new DataException("Map entry may not have a null key: " + parser.original()); } if (!parser.canConsume(ENTRY_DELIMITER)) { - throw new DataException("Malformed map entry: missing '='"); + throw new DataException("Map entry is missing '=': " + parser.original()); } SchemaAndValue value = parse(parser, true); Object entryValue = value != null ? value.value() : null; @@ -792,9 +803,9 @@ public class Values { } // Missing either a comma or an end delimiter if (COMMA_DELIMITER.equals(parser.previous())) { - throw new DataException("Malformed map: missing element after ','"); + throw new DataException("Map is missing element after ',': " + parser.original()); } - throw new DataException("Malformed array: missing terminating ']'"); + throw new DataException("Map is missing terminating ']': " + parser.original()); } } catch (DataException e) { LOG.debug("Unable to parse the value as a map; reverting to string", e); diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java index 70835c8..c2caf08 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java @@ -16,7 +16,9 @@ */ package org.apache.kafka.connect.data; +import org.apache.kafka.connect.data.Schema.Type; import org.apache.kafka.connect.data.Values.Parser; +import org.apache.kafka.connect.errors.DataException; import org.junit.Test; import java.util.ArrayList; @@ -159,6 +161,116 @@ public class ValuesTest { assertRoundTrip(INT_LIST_SCHEMA, INT_LIST_SCHEMA, INT_LIST); } + /** + * The parsed array has byte values and one int value, so we should return list with single unified type of integers. + */ + @Test + public void shouldConvertStringOfListWithOnlyNumericElementTypesIntoListOfLargestNumericType() { + int thirdValue = Short.MAX_VALUE + 1; + List<?> list = Values.convertToList(Schema.STRING_SCHEMA, "[1, 2, " + thirdValue + "]"); + assertEquals(3, list.size()); + assertEquals(1, ((Number) list.get(0)).intValue()); + assertEquals(2, ((Number) list.get(1)).intValue()); + assertEquals(thirdValue, ((Number) list.get(2)).intValue()); + } + + /** + * The parsed array has byte values and one int value, so we should return list with single unified type of integers. + */ + @Test + public void shouldConvertStringOfListWithMixedElementTypesIntoListWithDifferentElementTypes() { + String str = "[1, 2, \"three\"]"; + List<?> list = Values.convertToList(Schema.STRING_SCHEMA, str); + assertEquals(3, list.size()); + assertEquals(1, ((Number) list.get(0)).intValue()); + assertEquals(2, ((Number) list.get(1)).intValue()); + assertEquals("three", list.get(2)); + } + + /** + * We parse into different element types, but cannot infer a common element schema. + */ + @Test + public void shouldParseStringListWithMultipleElementTypesAndReturnListWithNoSchema() { + String str = "[1, 2, 3, \"four\"]"; + SchemaAndValue result = Values.parseString(str); + assertNull(result.schema()); + List<?> list = (List<?>) result.value(); + assertEquals(4, list.size()); + assertEquals(1, ((Number) list.get(0)).intValue()); + assertEquals(2, ((Number) list.get(1)).intValue()); + assertEquals(3, ((Number) list.get(2)).intValue()); + assertEquals("four", list.get(3)); + } + + /** + * We can't infer or successfully parse into a different type, so this returns the same string. + */ + @Test + public void shouldParseStringListWithExtraDelimitersAndReturnString() { + String str = "[1, 2, 3,,,]"; + SchemaAndValue result = Values.parseString(str); + assertEquals(Type.STRING, result.schema().type()); + assertEquals(str, result.value()); + } + + /** + * This is technically invalid JSON, and we don't want to simply ignore the blank elements. + */ + @Test(expected = DataException.class) + public void shouldFailToConvertToListFromStringWithExtraDelimiters() { + Values.convertToList(Schema.STRING_SCHEMA, "[1, 2, 3,,,]"); + } + + /** + * Schema of type ARRAY requires a schema for the values, but Connect has no union or "any" schema type. + * Therefore, we can't represent this. + */ + @Test(expected = DataException.class) + public void shouldFailToConvertToListFromStringWithNonCommonElementTypeAndBlankElement() { + Values.convertToList(Schema.STRING_SCHEMA, "[1, 2, 3, \"four\",,,]"); + } + + /** + * This is technically invalid JSON, and we don't want to simply ignore the blank entry. + */ + @Test(expected = DataException.class) + public void shouldFailToParseStringOfMapWithIntValuesWithBlankEntry() { + Values.convertToList(Schema.STRING_SCHEMA, " { \"foo\" : 1234567890 ,, \"bar\" : 0, \"baz\" : -987654321 } "); + } + + /** + * This is technically invalid JSON, and we don't want to simply ignore the malformed entry. + */ + @Test(expected = DataException.class) + public void shouldFailToParseStringOfMalformedMap() { + Values.convertToList(Schema.STRING_SCHEMA, " { \"foo\" : 1234567890 , \"a\", \"bar\" : 0, \"baz\" : -987654321 } "); + } + + /** + * This is technically invalid JSON, and we don't want to simply ignore the blank entries. + */ + @Test(expected = DataException.class) + public void shouldFailToParseStringOfMapWithIntValuesWithOnlyBlankEntries() { + Values.convertToList(Schema.STRING_SCHEMA, " { ,, , , } "); + } + + /** + * This is technically invalid JSON, and we don't want to simply ignore the blank entry. + */ + @Test(expected = DataException.class) + public void shouldFailToParseStringOfMapWithIntValuesWithBlankEntries() { + Values.convertToList(Schema.STRING_SCHEMA, " { \"foo\" : \"1234567890\" ,, \"bar\" : \"0\", \"baz\" : \"boz\" } "); + } + + /** + * Schema for Map requires a schema for key and value, but we have no key or value and Connect has no "any" type + */ + @Test(expected = DataException.class) + public void shouldFailToParseStringOfEmptyMap() { + Values.convertToList(Schema.STRING_SCHEMA, " { } "); + } + @Test public void shouldParseStringsWithoutDelimiters() { //assertParsed(""); -- To stop receiving notification emails like this one, please contact j...@apache.org.