This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 4491a0d  KAFKA-9074: Correct Connect’s `Values.parseString` to 
properly parse a time and timestamp literal (#7568)
4491a0d is described below

commit 4491a0dbb8fb183c21343b3d7145574b9c728485
Author: Randall Hauch <rha...@gmail.com>
AuthorDate: Tue Feb 4 11:15:04 2020 -0600

    KAFKA-9074: Correct Connect’s `Values.parseString` to properly parse a time 
and timestamp literal (#7568)
    
    * KAFKA-9074: Correct Connect’s `Values.parseString` to properly parse a 
time and timestamp literal
    
    Time and timestamp literal strings contain a `:` character, but the 
internal parser used in the `Values.parseString(String)` method tokenizes on 
the colon character to tokenize and parse map entries. The colon could be 
escaped, but then the backslash character used to escape the colon is not 
removed and the parser fails to match the literal as a time or timestamp value.
    
    This fix corrects the parsing logic to properly parse timestamp and time 
literal strings whose colon characters are either escaped or unescaped. 
Additional unit tests were added to first verify the incorrect behavior and 
then to validate the correction.
    
    Author: Randall Hauch <rha...@gmail.com>
    Reviewers: Chris Egerton <chr...@confluent.io>, Nigel Liang 
<ni...@nigelliang.com>, Jason Gustafson <ja...@confluent.io>
---
 .../java/org/apache/kafka/connect/data/Values.java | 133 +++++++++++++-----
 .../org/apache/kafka/connect/data/ValuesTest.java  | 155 +++++++++++++++++++++
 2 files changed, 257 insertions(+), 31 deletions(-)

diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java 
b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
index 93c320a..d99fbca 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
@@ -30,13 +30,17 @@ import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.text.StringCharacterIterator;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Base64;
 import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Set;
 import java.util.TimeZone;
 import java.util.regex.Pattern;
 
@@ -70,9 +74,18 @@ public class Values {
     private static final String FALSE_LITERAL = Boolean.FALSE.toString();
     private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
     private static final String NULL_VALUE = "null";
-    private static final String ISO_8601_DATE_FORMAT_PATTERN = "yyyy-MM-dd";
-    private static final String ISO_8601_TIME_FORMAT_PATTERN = 
"HH:mm:ss.SSS'Z'";
-    private static final String ISO_8601_TIMESTAMP_FORMAT_PATTERN = 
ISO_8601_DATE_FORMAT_PATTERN + "'T'" + ISO_8601_TIME_FORMAT_PATTERN;
+    static final String ISO_8601_DATE_FORMAT_PATTERN = "yyyy-MM-dd";
+    static final String ISO_8601_TIME_FORMAT_PATTERN = "HH:mm:ss.SSS'Z'";
+    static final String ISO_8601_TIMESTAMP_FORMAT_PATTERN = 
ISO_8601_DATE_FORMAT_PATTERN + "'T'" + ISO_8601_TIME_FORMAT_PATTERN;
+    private static final Set<String> TEMPORAL_LOGICAL_TYPE_NAMES =
+            Collections.unmodifiableSet(
+                    new HashSet<>(
+                            Arrays.asList(Time.LOGICAL_NAME,
+                            Timestamp.LOGICAL_NAME,
+                            Date.LOGICAL_NAME
+                            )
+                    )
+            );
 
     private static final String QUOTE_DELIMITER = "\"";
     private static final String COMMA_DELIMITER = ",";
@@ -467,6 +480,9 @@ public class Values {
                                 int days = (int) (millis / MILLIS_PER_DAY); // 
truncates
                                 return Date.toLogical(toSchema, days);
                             }
+                        } else {
+                            // There is no fromSchema, so no conversion is 
needed
+                            return value;
                         }
                     }
                     long numeric = asLong(value, fromSchema, null);
@@ -492,6 +508,9 @@ public class Values {
                                 calendar.set(Calendar.DAY_OF_MONTH, 1);
                                 return Time.toLogical(toSchema, (int) 
calendar.getTimeInMillis());
                             }
+                        } else {
+                            // There is no fromSchema, so no conversion is 
needed
+                            return value;
                         }
                     }
                     long numeric = asLong(value, fromSchema, null);
@@ -523,6 +542,9 @@ public class Values {
                             if (Timestamp.LOGICAL_NAME.equals(fromSchemaName)) 
{
                                 return value;
                             }
+                        } else {
+                            // There is no fromSchema, so no conversion is 
needed
+                            return value;
                         }
                     }
                     long numeric = asLong(value, fromSchema, null);
@@ -755,7 +777,14 @@ public class Values {
                     }
                     sb.append(parser.next());
                 }
-                return new SchemaAndValue(Schema.STRING_SCHEMA, sb.toString());
+                String content = sb.toString();
+                // We can parse string literals as temporal logical types, but 
all others
+                // are treated as strings
+                SchemaAndValue parsed = parseString(content);
+                if (parsed != null && 
TEMPORAL_LOGICAL_TYPE_NAMES.contains(parsed.schema().name())) {
+                    return parsed;
+                }
+                return new SchemaAndValue(Schema.STRING_SCHEMA, content);
             }
         }
 
@@ -838,7 +867,9 @@ public class Values {
                     }
 
                     if (!parser.canConsume(ENTRY_DELIMITER)) {
-                        throw new DataException("Map entry is missing '=': " + 
parser.original());
+                        throw new DataException("Map entry is missing '" + 
ENTRY_DELIMITER
+                                                + "' at " + parser.position()
+                                                + " in " + parser.original());
                     }
                     SchemaAndValue value = parse(parser, true);
                     Object entryValue = value != null ? value.value() : null;
@@ -855,7 +886,7 @@ public class Values {
                 throw new DataException("Map is missing terminating '}': " + 
parser.original());
             }
         } catch (DataException e) {
-            LOG.debug("Unable to parse the value as a map or an array; 
reverting to string", e);
+            LOG.trace("Unable to parse the value as a map or an array; 
reverting to string", e);
             parser.rewindTo(startPosition);
         }
 
@@ -867,6 +898,27 @@ public class Values {
 
         char firstChar = token.charAt(0);
         boolean firstCharIsDigit = Character.isDigit(firstChar);
+
+        // Temporal types are more restrictive, so try them first
+        if (firstCharIsDigit) {
+            // The time and timestamp literals may be split into 5 tokens 
since an unescaped colon
+            // is a delimiter. Check these first since the first of these 
tokens is a simple numeric
+            int position = parser.mark();
+            String remainder = parser.next(4);
+            if (remainder != null) {
+                String timeOrTimestampStr = token + remainder;
+                SchemaAndValue temporal = parseAsTemporal(timeOrTimestampStr);
+                if (temporal != null) {
+                    return temporal;
+                }
+            }
+            // No match was found using the 5 tokens, so rewind and see if the 
current token has a date, time, or timestamp
+            parser.rewindTo(position);
+            SchemaAndValue temporal = parseAsTemporal(token);
+            if (temporal != null) {
+                return temporal;
+            }
+        }
         if (firstCharIsDigit || firstChar == '+' || firstChar == '-') {
             try {
                 // Try to parse as a number ...
@@ -901,37 +953,42 @@ public class Values {
                 // can't parse as a number
             }
         }
-        if (firstCharIsDigit) {
-            // Check for a date, time, or timestamp ...
-            int tokenLength = token.length();
-            if (tokenLength == ISO_8601_DATE_LENGTH) {
-                try {
-                    return new SchemaAndValue(Date.SCHEMA, new 
SimpleDateFormat(ISO_8601_DATE_FORMAT_PATTERN).parse(token));
-                } catch (ParseException e) {
-                    // not a valid date
-                }
-            } else if (tokenLength == ISO_8601_TIME_LENGTH) {
-                try {
-                    return new SchemaAndValue(Time.SCHEMA, new 
SimpleDateFormat(ISO_8601_TIME_FORMAT_PATTERN).parse(token));
-                } catch (ParseException e) {
-                    // not a valid date
-                }
-            } else if (tokenLength == ISO_8601_TIMESTAMP_LENGTH) {
-                try {
-                    return new SchemaAndValue(Timestamp.SCHEMA, new 
SimpleDateFormat(ISO_8601_TIMESTAMP_FORMAT_PATTERN).parse(token));
-                } catch (ParseException e) {
-                    // not a valid date
-                }
-            }
-        }
         if (embedded) {
             throw new DataException("Failed to parse embedded value");
         }
-        // At this point, the only thing this can be is a string. Embedded 
strings were processed above,
-        // so this is not embedded and we can use the original string...
+        // At this point, the only thing this non-embedded value can be is a 
string.
         return new SchemaAndValue(Schema.STRING_SCHEMA, parser.original());
     }
 
+    private static SchemaAndValue parseAsTemporal(String token) {
+        if (token == null) {
+            return null;
+        }
+        // If the colons were escaped, we'll see the escape chars and need to 
remove them
+        token = token.replace("\\:", ":");
+        int tokenLength = token.length();
+        if (tokenLength == ISO_8601_TIME_LENGTH) {
+            try {
+                return new SchemaAndValue(Time.SCHEMA, new 
SimpleDateFormat(ISO_8601_TIME_FORMAT_PATTERN).parse(token));
+            } catch (ParseException e) {
+              // not a valid date
+            }
+        } else if (tokenLength == ISO_8601_TIMESTAMP_LENGTH) {
+            try {
+                return new SchemaAndValue(Timestamp.SCHEMA, new 
SimpleDateFormat(ISO_8601_TIMESTAMP_FORMAT_PATTERN).parse(token));
+            } catch (ParseException e) {
+              // not a valid date
+            }
+        } else if (tokenLength == ISO_8601_DATE_LENGTH) {
+            try {
+                return new SchemaAndValue(Date.SCHEMA, new 
SimpleDateFormat(ISO_8601_DATE_FORMAT_PATTERN).parse(token));
+            } catch (ParseException e) {
+                // not a valid date
+            }
+        }
+        return null;
+    }
+
     protected static Schema commonSchemaFor(Schema previous, SchemaAndValue 
latest) {
         if (latest == null) {
             return previous;
@@ -1088,6 +1145,7 @@ public class Values {
         public void rewindTo(int position) {
             iter.setIndex(position);
             nextToken = null;
+            previousToken = null;
         }
 
         public String original() {
@@ -1112,6 +1170,19 @@ public class Values {
             return previousToken;
         }
 
+        public String next(int n) {
+            int current = mark();
+            int start = mark();
+            for (int i = 0; i != n; ++i) {
+                if (!hasNext()) {
+                    rewindTo(start);
+                    return null;
+                }
+                next();
+            }
+            return original.substring(current, position());
+        }
+
         private String consumeNextToken() throws NoSuchElementException {
             boolean escaped = false;
             int start = iter.getIndex();
diff --git 
a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java 
b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
index a5909f3..c437e46 100644
--- a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.connect.data.Values.Parser;
 import org.apache.kafka.connect.errors.DataException;
 import org.junit.Test;
 
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -383,6 +384,146 @@ public class ValuesTest {
         assertEquals(str, result.value());
     }
 
+    @Test
+    public void shouldParseTimestampStringAsTimestamp() throws Exception {
+        String str = "2019-08-23T14:34:54.346Z";
+        SchemaAndValue result = Values.parseString(str);
+        assertEquals(Type.INT64, result.schema().type());
+        assertEquals(Timestamp.LOGICAL_NAME, result.schema().name());
+        java.util.Date expected = new 
SimpleDateFormat(Values.ISO_8601_TIMESTAMP_FORMAT_PATTERN).parse(str);
+        assertEquals(expected, result.value());
+    }
+
+    @Test
+    public void shouldParseDateStringAsDate() throws Exception {
+        String str = "2019-08-23";
+        SchemaAndValue result = Values.parseString(str);
+        assertEquals(Type.INT32, result.schema().type());
+        assertEquals(Date.LOGICAL_NAME, result.schema().name());
+        java.util.Date expected = new 
SimpleDateFormat(Values.ISO_8601_DATE_FORMAT_PATTERN).parse(str);
+        assertEquals(expected, result.value());
+    }
+
+    @Test
+    public void shouldParseTimeStringAsDate() throws Exception {
+        String str = "14:34:54.346Z";
+        SchemaAndValue result = Values.parseString(str);
+        assertEquals(Type.INT32, result.schema().type());
+        assertEquals(Time.LOGICAL_NAME, result.schema().name());
+        java.util.Date expected = new 
SimpleDateFormat(Values.ISO_8601_TIME_FORMAT_PATTERN).parse(str);
+        assertEquals(expected, result.value());
+    }
+
+    @Test
+    public void shouldParseTimestampStringWithEscapedColonsAsTimestamp() 
throws Exception {
+        String str = "2019-08-23T14\\:34\\:54.346Z";
+        SchemaAndValue result = Values.parseString(str);
+        assertEquals(Type.INT64, result.schema().type());
+        assertEquals(Timestamp.LOGICAL_NAME, result.schema().name());
+        String expectedStr = "2019-08-23T14:34:54.346Z";
+        java.util.Date expected = new 
SimpleDateFormat(Values.ISO_8601_TIMESTAMP_FORMAT_PATTERN).parse(expectedStr);
+        assertEquals(expected, result.value());
+    }
+
+    @Test
+    public void shouldParseTimeStringWithEscapedColonsAsDate() throws 
Exception {
+        String str = "14\\:34\\:54.346Z";
+        SchemaAndValue result = Values.parseString(str);
+        assertEquals(Type.INT32, result.schema().type());
+        assertEquals(Time.LOGICAL_NAME, result.schema().name());
+        String expectedStr = "14:34:54.346Z";
+        java.util.Date expected = new 
SimpleDateFormat(Values.ISO_8601_TIME_FORMAT_PATTERN).parse(expectedStr);
+        assertEquals(expected, result.value());
+    }
+
+    @Test
+    public void shouldParseDateStringAsDateInArray() throws Exception {
+        String dateStr = "2019-08-23";
+        String arrayStr = "[" + dateStr + "]";
+        SchemaAndValue result = Values.parseString(arrayStr);
+        assertEquals(Type.ARRAY, result.schema().type());
+        Schema elementSchema = result.schema().valueSchema();
+        assertEquals(Type.INT32, elementSchema.type());
+        assertEquals(Date.LOGICAL_NAME, elementSchema.name());
+        java.util.Date expected = new 
SimpleDateFormat(Values.ISO_8601_DATE_FORMAT_PATTERN).parse(dateStr);
+        assertEquals(Collections.singletonList(expected), result.value());
+    }
+
+    @Test
+    public void shouldParseTimeStringAsTimeInArray() throws Exception {
+        String timeStr = "14:34:54.346Z";
+        String arrayStr = "[" + timeStr + "]";
+        SchemaAndValue result = Values.parseString(arrayStr);
+        assertEquals(Type.ARRAY, result.schema().type());
+        Schema elementSchema = result.schema().valueSchema();
+        assertEquals(Type.INT32, elementSchema.type());
+        assertEquals(Time.LOGICAL_NAME, elementSchema.name());
+        java.util.Date expected = new 
SimpleDateFormat(Values.ISO_8601_TIME_FORMAT_PATTERN).parse(timeStr);
+        assertEquals(Collections.singletonList(expected), result.value());
+    }
+
+    @Test
+    public void shouldParseTimestampStringAsTimestampInArray() throws 
Exception {
+        String tsStr = "2019-08-23T14:34:54.346Z";
+        String arrayStr = "[" + tsStr + "]";
+        SchemaAndValue result = Values.parseString(arrayStr);
+        assertEquals(Type.ARRAY, result.schema().type());
+        Schema elementSchema = result.schema().valueSchema();
+        assertEquals(Type.INT64, elementSchema.type());
+        assertEquals(Timestamp.LOGICAL_NAME, elementSchema.name());
+        java.util.Date expected = new 
SimpleDateFormat(Values.ISO_8601_TIMESTAMP_FORMAT_PATTERN).parse(tsStr);
+        assertEquals(Collections.singletonList(expected), result.value());
+    }
+
+    @Test
+    public void shouldParseMultipleTimestampStringAsTimestampInArray() throws 
Exception {
+        String tsStr1 = "2019-08-23T14:34:54.346Z";
+        String tsStr2 = "2019-01-23T15:12:34.567Z";
+        String tsStr3 = "2019-04-23T19:12:34.567Z";
+        String arrayStr = "[" + tsStr1 + "," + tsStr2 + ",   " + tsStr3 + "]";
+        SchemaAndValue result = Values.parseString(arrayStr);
+        assertEquals(Type.ARRAY, result.schema().type());
+        Schema elementSchema = result.schema().valueSchema();
+        assertEquals(Type.INT64, elementSchema.type());
+        assertEquals(Timestamp.LOGICAL_NAME, elementSchema.name());
+        java.util.Date expected1 = new 
SimpleDateFormat(Values.ISO_8601_TIMESTAMP_FORMAT_PATTERN).parse(tsStr1);
+        java.util.Date expected2 = new 
SimpleDateFormat(Values.ISO_8601_TIMESTAMP_FORMAT_PATTERN).parse(tsStr2);
+        java.util.Date expected3 = new 
SimpleDateFormat(Values.ISO_8601_TIMESTAMP_FORMAT_PATTERN).parse(tsStr3);
+        assertEquals(Arrays.asList(expected1, expected2, expected3), 
result.value());
+    }
+
+    @Test
+    public void shouldParseQuotedTimeStringAsTimeInMap() throws Exception {
+        String keyStr = "k1";
+        String timeStr = "14:34:54.346Z";
+        String mapStr = "{\"" + keyStr + "\":\"" + timeStr + "\"}";
+        SchemaAndValue result = Values.parseString(mapStr);
+        assertEquals(Type.MAP, result.schema().type());
+        Schema keySchema = result.schema().keySchema();
+        Schema valueSchema = result.schema().valueSchema();
+        assertEquals(Type.STRING, keySchema.type());
+        assertEquals(Type.INT32, valueSchema.type());
+        assertEquals(Time.LOGICAL_NAME, valueSchema.name());
+        java.util.Date expected = new 
SimpleDateFormat(Values.ISO_8601_TIME_FORMAT_PATTERN).parse(timeStr);
+        assertEquals(Collections.singletonMap(keyStr, expected), 
result.value());
+    }
+
+    @Test
+    public void shouldParseTimeStringAsTimeInMap() throws Exception {
+        String keyStr = "k1";
+        String timeStr = "14:34:54.346Z";
+        String mapStr = "{\"" + keyStr + "\":" + timeStr + "}";
+        SchemaAndValue result = Values.parseString(mapStr);
+        assertEquals(Type.MAP, result.schema().type());
+        Schema keySchema = result.schema().keySchema();
+        Schema valueSchema = result.schema().valueSchema();
+        assertEquals(Type.STRING, keySchema.type());
+        assertEquals(Type.INT32, valueSchema.type());
+        assertEquals(Time.LOGICAL_NAME, valueSchema.name());
+        java.util.Date expected = new 
SimpleDateFormat(Values.ISO_8601_TIME_FORMAT_PATTERN).parse(timeStr);
+        assertEquals(Collections.singletonMap(keyStr, expected), 
result.value());
+    }
+
     /**
      * This is technically invalid JSON, and we don't want to simply ignore 
the blank elements.
      */
@@ -433,6 +574,20 @@ public class ValuesTest {
     }
 
     @Test
+    public void shouldConsumeMultipleTokens() {
+        String value = "a:b:c:d:e:f:g:h";
+        Parser parser = new Parser(value);
+        String firstFive = parser.next(5);
+        assertEquals("a:b:c", firstFive);
+        assertEquals(":", parser.next());
+        assertEquals("d", parser.next());
+        assertEquals(":", parser.next());
+        String lastEight = parser.next(8); // only 7 remain
+        assertNull(lastEight);
+        assertEquals("e", parser.next());
+    }
+
+    @Test
     public void shouldParseStringsWithoutDelimiters() {
         //assertParsed("");
         assertParsed("  ");

Reply via email to