This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a68a8b6  KAFKA-6511; Corrected connect list/map parsing logic (#4516)
a68a8b6 is described below

commit a68a8b6b6cc47d488ada99e4bfe87cc7467621bb
Author: Randall Hauch <rha...@gmail.com>
AuthorDate: Tue Feb 13 13:44:12 2018 -0600

    KAFKA-6511; Corrected connect list/map parsing logic (#4516)
    
    Corrected the parsing of invalid list values. A list can only be parsed if 
it contains elements that have a common type, and a map can only be parsed if 
it contains keys with a common type and values with a common type.
    
    Reviewers: Arjun Satish <ar...@confluent.io>, Magesh Nandakumar 
<magesh.n.ku...@gmail.com>, Konstantine Karantasis <konstant...@confluent.io>, 
Jason Gustafson <ja...@confluent.io>
---
 .../java/org/apache/kafka/connect/data/Values.java |  29 ++++--
 .../org/apache/kafka/connect/data/ValuesTest.java  | 112 +++++++++++++++++++++
 2 files changed, 132 insertions(+), 9 deletions(-)

diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java 
b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
index 41040c7..05248ef 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
@@ -749,10 +749,16 @@ public class Values {
                 Schema elementSchema = null;
                 while (parser.hasNext()) {
                     if (parser.canConsume(ARRAY_END_DELIMITER)) {
-                        Schema listSchema = elementSchema == null ? null : 
SchemaBuilder.array(elementSchema).schema();
+                        Schema listSchema = null;
+                        if (elementSchema != null) {
+                            listSchema = 
SchemaBuilder.array(elementSchema).schema();
+                        }
                         result = alignListEntriesWithSchema(listSchema, 
result);
                         return new SchemaAndValue(listSchema, result);
                     }
+                    if (parser.canConsume(COMMA_DELIMITER)) {
+                        throw new DataException("Unable to parse an empty 
array element: " + parser.original());
+                    }
                     SchemaAndValue element = parse(parser, true);
                     elementSchema = commonSchemaFor(elementSchema, element);
                     result.add(element.value());
@@ -760,9 +766,9 @@ public class Values {
                 }
                 // Missing either a comma or an end delimiter
                 if (COMMA_DELIMITER.equals(parser.previous())) {
-                    throw new DataException("Malformed array: missing element 
after ','");
+                    throw new DataException("Array is missing element after 
',': " + parser.original());
                 }
-                throw new DataException("Malformed array: missing terminating 
']'");
+                throw new DataException("Array is missing terminating ']': " + 
parser.original());
             }
 
             if (parser.canConsume(MAP_BEGIN_DELIMITER)) {
@@ -771,17 +777,22 @@ public class Values {
                 Schema valueSchema = null;
                 while (parser.hasNext()) {
                     if (parser.canConsume(MAP_END_DELIMITER)) {
-                        Schema mapSchema =
-                                keySchema == null || valueSchema == null ? 
null : SchemaBuilder.map(keySchema, valueSchema).schema();
+                        Schema mapSchema = null;
+                        if (keySchema != null && valueSchema != null) {
+                            mapSchema = SchemaBuilder.map(keySchema, 
valueSchema).schema();
+                        }
                         result = alignMapKeysAndValuesWithSchema(mapSchema, 
result);
                         return new SchemaAndValue(mapSchema, result);
                     }
+                    if (parser.canConsume(COMMA_DELIMITER)) {
+                        throw new DataException("Unable to parse a map entry 
has no key or value: " + parser.original());
+                    }
                     SchemaAndValue key = parse(parser, true);
                     if (key == null || key.value() == null) {
-                        throw new DataException("Malformed map entry: null 
key");
+                        throw new DataException("Map entry may not have a null 
key: " + parser.original());
                     }
                     if (!parser.canConsume(ENTRY_DELIMITER)) {
-                        throw new DataException("Malformed map entry: missing 
'='");
+                        throw new DataException("Map entry is missing '=': " + 
parser.original());
                     }
                     SchemaAndValue value = parse(parser, true);
                     Object entryValue = value != null ? value.value() : null;
@@ -792,9 +803,9 @@ public class Values {
                 }
                 // Missing either a comma or an end delimiter
                 if (COMMA_DELIMITER.equals(parser.previous())) {
-                    throw new DataException("Malformed map: missing element 
after ','");
+                    throw new DataException("Map is missing element after ',': 
" + parser.original());
                 }
-                throw new DataException("Malformed array: missing terminating 
']'");
+                throw new DataException("Map is missing terminating ']': " + 
parser.original());
             }
         } catch (DataException e) {
             LOG.debug("Unable to parse the value as a map; reverting to 
string", e);
diff --git 
a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java 
b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
index 70835c8..c2caf08 100644
--- a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
@@ -16,7 +16,9 @@
  */
 package org.apache.kafka.connect.data;
 
+import org.apache.kafka.connect.data.Schema.Type;
 import org.apache.kafka.connect.data.Values.Parser;
+import org.apache.kafka.connect.errors.DataException;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -159,6 +161,116 @@ public class ValuesTest {
         assertRoundTrip(INT_LIST_SCHEMA, INT_LIST_SCHEMA, INT_LIST);
     }
 
+    /**
+     * The parsed array has byte values and one int value, so we should return 
list with single unified type of integers.
+     */
+    @Test
+    public void 
shouldConvertStringOfListWithOnlyNumericElementTypesIntoListOfLargestNumericType()
 {
+        int thirdValue = Short.MAX_VALUE + 1;
+        List<?> list = Values.convertToList(Schema.STRING_SCHEMA, "[1, 2, " + 
thirdValue + "]");
+        assertEquals(3, list.size());
+        assertEquals(1, ((Number) list.get(0)).intValue());
+        assertEquals(2, ((Number) list.get(1)).intValue());
+        assertEquals(thirdValue, ((Number) list.get(2)).intValue());
+    }
+
+    /**
+     * The parsed array has byte values and one int value, so we should return 
list with single unified type of integers.
+     */
+    @Test
+    public void 
shouldConvertStringOfListWithMixedElementTypesIntoListWithDifferentElementTypes()
 {
+        String str = "[1, 2, \"three\"]";
+        List<?> list = Values.convertToList(Schema.STRING_SCHEMA, str);
+        assertEquals(3, list.size());
+        assertEquals(1, ((Number) list.get(0)).intValue());
+        assertEquals(2, ((Number) list.get(1)).intValue());
+        assertEquals("three", list.get(2));
+    }
+
+    /**
+     * We parse into different element types, but cannot infer a common 
element schema.
+     */
+    @Test
+    public void 
shouldParseStringListWithMultipleElementTypesAndReturnListWithNoSchema() {
+        String str = "[1, 2, 3, \"four\"]";
+        SchemaAndValue result = Values.parseString(str);
+        assertNull(result.schema());
+        List<?> list = (List<?>) result.value();
+        assertEquals(4, list.size());
+        assertEquals(1, ((Number) list.get(0)).intValue());
+        assertEquals(2, ((Number) list.get(1)).intValue());
+        assertEquals(3, ((Number) list.get(2)).intValue());
+        assertEquals("four", list.get(3));
+    }
+
+    /**
+     * We can't infer or successfully parse into a different type, so this 
returns the same string.
+     */
+    @Test
+    public void shouldParseStringListWithExtraDelimitersAndReturnString() {
+        String str = "[1, 2, 3,,,]";
+        SchemaAndValue result = Values.parseString(str);
+        assertEquals(Type.STRING, result.schema().type());
+        assertEquals(str, result.value());
+    }
+
+    /**
+     * This is technically invalid JSON, and we don't want to simply ignore 
the blank elements.
+     */
+    @Test(expected = DataException.class)
+    public void shouldFailToConvertToListFromStringWithExtraDelimiters() {
+        Values.convertToList(Schema.STRING_SCHEMA, "[1, 2, 3,,,]");
+    }
+
+    /**
+     * Schema of type ARRAY requires a schema for the values, but Connect has 
no union or "any" schema type.
+     * Therefore, we can't represent this.
+     */
+    @Test(expected = DataException.class)
+    public void 
shouldFailToConvertToListFromStringWithNonCommonElementTypeAndBlankElement() {
+        Values.convertToList(Schema.STRING_SCHEMA, "[1, 2, 3, \"four\",,,]");
+    }
+
+    /**
+     * This is technically invalid JSON, and we don't want to simply ignore 
the blank entry.
+     */
+    @Test(expected = DataException.class)
+    public void shouldFailToParseStringOfMapWithIntValuesWithBlankEntry() {
+        Values.convertToList(Schema.STRING_SCHEMA, " { \"foo\" :  1234567890 
,, \"bar\" : 0,  \"baz\" : -987654321 }  ");
+    }
+
+    /**
+     * This is technically invalid JSON, and we don't want to simply ignore 
the malformed entry.
+     */
+    @Test(expected = DataException.class)
+    public void shouldFailToParseStringOfMalformedMap() {
+        Values.convertToList(Schema.STRING_SCHEMA, " { \"foo\" :  1234567890 , 
\"a\", \"bar\" : 0,  \"baz\" : -987654321 }  ");
+    }
+
+    /**
+     * This is technically invalid JSON, and we don't want to simply ignore 
the blank entries.
+     */
+    @Test(expected = DataException.class)
+    public void 
shouldFailToParseStringOfMapWithIntValuesWithOnlyBlankEntries() {
+        Values.convertToList(Schema.STRING_SCHEMA, " { ,,  , , }  ");
+    }
+
+    /**
+     * This is technically invalid JSON, and we don't want to simply ignore 
the blank entry.
+     */
+    @Test(expected = DataException.class)
+    public void shouldFailToParseStringOfMapWithIntValuesWithBlankEntries() {
+        Values.convertToList(Schema.STRING_SCHEMA, " { \"foo\" :  
\"1234567890\" ,, \"bar\" : \"0\",  \"baz\" : \"boz\" }  ");
+    }
+
+    /**
+     * Schema for Map requires a schema for key and value, but we have no key 
or value and Connect has no "any" type
+     */
+    @Test(expected = DataException.class)
+    public void shouldFailToParseStringOfEmptyMap() {
+        Values.convertToList(Schema.STRING_SCHEMA, " { }  ");
+    }
+
     @Test
     public void shouldParseStringsWithoutDelimiters() {
         //assertParsed("");

-- 
To stop receiving notification emails like this one, please contact
j...@apache.org.

Reply via email to