Repository: flink Updated Branches: refs/heads/master d7b59d761 -> 4b1a9c72e
[FLINK-4081] [core] [table] FieldParsers should support empty strings This closes #2297. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b1a9c72 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b1a9c72 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b1a9c72 Branch: refs/heads/master Commit: 4b1a9c72e99125680035e5dadc148b187d9d4124 Parents: d7b59d7 Author: twalthr <twal...@apache.org> Authored: Tue Jul 26 16:24:24 2016 +0200 Committer: twalthr <twal...@apache.org> Committed: Mon Sep 19 14:35:59 2016 +0200 ---------------------------------------------------------------------- .../flink/types/parser/BooleanParser.java | 4 +++ .../apache/flink/types/parser/ByteParser.java | 2 +- .../flink/types/parser/ByteValueParser.java | 2 +- .../apache/flink/types/parser/DoubleParser.java | 4 +++ .../flink/types/parser/DoubleValueParser.java | 4 +++ .../apache/flink/types/parser/FieldParser.java | 4 +-- .../apache/flink/types/parser/FloatParser.java | 4 +++ .../flink/types/parser/FloatValueParser.java | 4 +++ .../apache/flink/types/parser/IntParser.java | 2 +- .../flink/types/parser/IntValueParser.java | 2 +- .../apache/flink/types/parser/LongParser.java | 2 +- .../flink/types/parser/LongValueParser.java | 2 +- .../apache/flink/types/parser/ShortParser.java | 2 +- .../flink/types/parser/ShortValueParser.java | 2 +- .../apache/flink/types/parser/StringParser.java | 8 ++++- .../flink/types/parser/StringValueParser.java | 6 ++++ .../types/parser/BooleanValueParserTest.java | 2 +- .../flink/types/parser/ByteParserTest.java | 3 -- .../flink/types/parser/ByteValueParserTest.java | 2 -- .../flink/types/parser/DoubleParserTest.java | 3 -- .../types/parser/DoubleValueParserTest.java | 2 -- .../flink/types/parser/FloatParserTest.java | 3 -- .../types/parser/FloatValueParserTest.java | 2 -- .../flink/types/parser/IntParserTest.java | 3 -- .../flink/types/parser/IntValueParserTest.java | 2 -- .../flink/types/parser/LongParserTest.java | 3 -- .../flink/types/parser/LongValueParserTest.java | 2 -- .../flink/types/parser/ParserTestBase.java | 2 ++ .../types/parser/QuotedStringParserTest.java | 2 +- .../parser/QuotedStringValueParserTest.java | 4 +-- .../flink/types/parser/ShortParserTest.java | 3 -- .../types/parser/ShortValueParserTest.java | 2 -- .../types/parser/UnquotedStringParserTest.java | 5 +-- .../types/parser/VarLengthStringParserTest.java | 1 - .../table/runtime/io/RowCsvInputFormat.scala | 14 +++++--- .../runtime/io/RowCsvInputFormatTest.scala | 37 ++++++++++---------- 36 files changed, 77 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java index 90fa41e..f8b890a 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java @@ -44,6 +44,10 @@ public class BooleanParser extends FieldParser<Boolean> { while (i < limit) { if (i < delimLimit && delimiterNext(bytes, i, delim)) { + if (i == startPos) { + setErrorState(ParseErrorState.EMPTY_COLUMN); + return -1; + } break; } i++; http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java index a521ac1..7ee257e 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java @@ -48,7 +48,7 @@ public class ByteParser extends FieldParser<Byte> { for (int i = startPos; i < limit; i++) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { if (i == startPos) { - setErrorState(ParseErrorState.EMPTY_STRING); + setErrorState(ParseErrorState.EMPTY_COLUMN); return -1; } this.result = (byte) (neg ? -val : val); http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java index 4cda98c..c79f5d4 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java @@ -55,7 +55,7 @@ public class ByteValueParser extends FieldParser<ByteValue> { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { if (i == startPos) { - setErrorState(ParseErrorState.EMPTY_STRING); + setErrorState(ParseErrorState.EMPTY_COLUMN); return -1; } reusable.setValue((byte) (neg ? -val : val)); http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java index 0b2f5a2..8af496d 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java @@ -39,6 +39,10 @@ public class DoubleParser extends FieldParser<Double> { while (i < limit) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { + if (i == startPos) { + setErrorState(ParseErrorState.EMPTY_COLUMN); + return -1; + } break; } i++; http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java index 333e6c9..5c657be 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java @@ -39,6 +39,10 @@ public class DoubleValueParser extends FieldParser<DoubleValue> { while (i < limit) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { + if (i == startPos) { + setErrorState(ParseErrorState.EMPTY_COLUMN); + return -1; + } break; } i++; http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java index 67c1bd7..5f17840 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java @@ -69,8 +69,8 @@ public abstract class FieldParser<T> { /** The parser found characters between the end of the quoted string and the delimiter. */ UNQUOTED_CHARS_AFTER_QUOTED_STRING, - /** The string is empty. */ - EMPTY_STRING, + /** The column is empty. */ + EMPTY_COLUMN, /** Invalid Boolean value **/ BOOLEAN_INVALID http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java index a47877e..3304f24 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java @@ -39,6 +39,10 @@ public class FloatParser extends FieldParser<Float> { while (i < limit) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { + if (i == startPos) { + setErrorState(ParseErrorState.EMPTY_COLUMN); + return -1; + } break; } i++; http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java index b6da3d3..26ee47b 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java @@ -39,6 +39,10 @@ public class FloatValueParser extends FieldParser<FloatValue> { while (i < limit) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { + if (i == startPos) { + setErrorState(ParseErrorState.EMPTY_COLUMN); + return -1; + } break; } i++; http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java index 4d2ae7c..4e5d43f 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java @@ -57,7 +57,7 @@ public class IntParser extends FieldParser<Integer> { for (int i = startPos; i < limit; i++) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { if (i == startPos) { - setErrorState(ParseErrorState.EMPTY_STRING); + setErrorState(ParseErrorState.EMPTY_COLUMN); return -1; } this.result = (int) (neg ? -val : val); http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java index d487c66..0229bc7 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java @@ -57,7 +57,7 @@ public class IntValueParser extends FieldParser<IntValue> { for (int i = startPos; i < limit; i++) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { if (i == startPos) { - setErrorState(ParseErrorState.EMPTY_STRING); + setErrorState(ParseErrorState.EMPTY_COLUMN); return -1; } reusable.setValue((int) (neg ? -val : val)); http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java index c7b76d2..79eb080 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java @@ -51,7 +51,7 @@ public class LongParser extends FieldParser<Long> { for (int i = startPos; i < limit; i++) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { if (i == startPos) { - setErrorState(ParseErrorState.EMPTY_STRING); + setErrorState(ParseErrorState.EMPTY_COLUMN); return -1; } this.result = neg ? -val : val; http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java index 597abc0..5ddd40c 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java @@ -54,7 +54,7 @@ public class LongValueParser extends FieldParser<LongValue> { for (int i = startPos; i < limit; i++) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { if (i == startPos) { - setErrorState(ParseErrorState.EMPTY_STRING); + setErrorState(ParseErrorState.EMPTY_COLUMN); return -1; } reusable.setValue(neg ? -val : val); http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java index 3afa761..c458a3f 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java @@ -55,7 +55,7 @@ public class ShortParser extends FieldParser<Short> { for (int i = startPos; i < limit; i++) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { if (i == startPos) { - setErrorState(ParseErrorState.EMPTY_STRING); + setErrorState(ParseErrorState.EMPTY_COLUMN); return -1; } this.result = (short) (neg ? -val : val); http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java index 880af25..47471a3 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java @@ -57,7 +57,7 @@ public class ShortValueParser extends FieldParser<ShortValue> { for (int i = startPos; i < limit; i++) { if (i < delimLimit && delimiterNext(bytes, i, delimiter)) { if (i == startPos) { - setErrorState(ParseErrorState.EMPTY_STRING); + setErrorState(ParseErrorState.EMPTY_COLUMN); return -1; } reusable.setValue((short) (neg ? -val : val)); http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java index 9cee990..1a2c7e3 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java @@ -47,7 +47,7 @@ public class StringParser extends FieldParser<String> { final int delimLimit = limit - delimiter.length + 1; if(quotedStringParsing && bytes[i] == quoteCharacter) { - // quoted string parsing enabled and first character Vis a quote + // quoted string parsing enabled and first character is a quote i++; // search for ending quote character, continue when it is escaped @@ -84,10 +84,16 @@ public class StringParser extends FieldParser<String> { if (i >= delimLimit) { // no delimiter found. Take the full string + if (limit == startPos) { + setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column + } this.result = new String(bytes, startPos, limit - startPos); return limit; } else { // delimiter found. + if (i == startPos) { + setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column + } this.result = new String(bytes, startPos, i - startPos); return i + delimiter.length; } http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java b/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java index b136576..c72b029 100644 --- a/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java +++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java @@ -90,10 +90,16 @@ public class StringValueParser extends FieldParser<StringValue> { if (i >= delimLimit) { // no delimiter found. Take the full string + if (limit == startPos) { + setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column + } reusable.setValueAscii(bytes, startPos, limit - startPos); return limit; } else { // delimiter found. + if (i == startPos) { + setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column + } reusable.setValueAscii(bytes, startPos, i - startPos); return i + delimiter.length; } http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/BooleanValueParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/BooleanValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/BooleanValueParserTest.java index ff1885d..3b120e2 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/BooleanValueParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/BooleanValueParserTest.java @@ -19,9 +19,9 @@ package org.apache.flink.types.parser; - import org.apache.flink.types.BooleanValue; + public class BooleanValueParserTest extends ParserTestBase<BooleanValue> { http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java index dac5144..579f003 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java @@ -19,9 +19,6 @@ package org.apache.flink.types.parser; -import org.apache.flink.types.parser.ByteParser; -import org.apache.flink.types.parser.FieldParser; - public class ByteParserTest extends ParserTestBase<Byte> { http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java index 31b60d4..f5abe05 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java @@ -20,8 +20,6 @@ package org.apache.flink.types.parser; import org.apache.flink.types.ByteValue; -import org.apache.flink.types.parser.ByteValueParser; -import org.apache.flink.types.parser.FieldParser; public class ByteValueParserTest extends ParserTestBase<ByteValue> { http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java index bda8252..98655d1 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java @@ -19,9 +19,6 @@ package org.apache.flink.types.parser; -import org.apache.flink.types.parser.DoubleParser; -import org.apache.flink.types.parser.FieldParser; - public class DoubleParserTest extends ParserTestBase<Double> { http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java index fbbb5f2..dfe8936 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java @@ -20,8 +20,6 @@ package org.apache.flink.types.parser; import org.apache.flink.types.DoubleValue; -import org.apache.flink.types.parser.DoubleValueParser; -import org.apache.flink.types.parser.FieldParser; public class DoubleValueParserTest extends ParserTestBase<DoubleValue> { http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java index d05557f..480f1fb 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java @@ -19,9 +19,6 @@ package org.apache.flink.types.parser; -import org.apache.flink.types.parser.FieldParser; -import org.apache.flink.types.parser.FloatParser; - public class FloatParserTest extends ParserTestBase<Float> { http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java index 5c6e1c3..d71b20a 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java @@ -20,8 +20,6 @@ package org.apache.flink.types.parser; import org.apache.flink.types.FloatValue; -import org.apache.flink.types.parser.FloatValueParser; -import org.apache.flink.types.parser.FieldParser; public class FloatValueParserTest extends ParserTestBase<FloatValue> { http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java index 1d33b51..f587086 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java @@ -19,9 +19,6 @@ package org.apache.flink.types.parser; -import org.apache.flink.types.parser.FieldParser; -import org.apache.flink.types.parser.IntParser; - public class IntParserTest extends ParserTestBase<Integer> { http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java index eb0403e..a70d65c 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java @@ -20,8 +20,6 @@ package org.apache.flink.types.parser; import org.apache.flink.types.IntValue; -import org.apache.flink.types.parser.IntValueParser; -import org.apache.flink.types.parser.FieldParser; public class IntValueParserTest extends ParserTestBase<IntValue> { http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java index b17de9d..d32eef1 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java @@ -19,9 +19,6 @@ package org.apache.flink.types.parser; -import org.apache.flink.types.parser.FieldParser; -import org.apache.flink.types.parser.LongParser; - public class LongParserTest extends ParserTestBase<Long> { http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java index f4d82a0..b9c5eec 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java @@ -20,8 +20,6 @@ package org.apache.flink.types.parser; import org.apache.flink.types.LongValue; -import org.apache.flink.types.parser.LongValueParser; -import org.apache.flink.types.parser.FieldParser; public class LongValueParserTest extends ParserTestBase<LongValue> { http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java index b979a38..9b02147 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java @@ -414,6 +414,8 @@ public abstract class ParserTestBase<T> extends TestLogger { byte[] bytes = emptyString.getBytes(); int numRead = parser.parseField(bytes, 0, bytes.length, new byte[]{'|'}, parser.createValue()); + assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, parser.getErrorState()); + if(this.allowsEmptyField()) { assertTrue("Parser declared the empty string as invalid.", numRead != -1); assertEquals("Invalid number of bytes read returned.", bytes.length, numRead); http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java index d4b7e1f..6fda78a 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java @@ -65,4 +65,4 @@ public class QuotedStringParserTest extends ParserTestBase<String> { public Class<String> getTypeClass() { return String.class; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java index 2801582..2cf901c 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java @@ -20,8 +20,6 @@ package org.apache.flink.types.parser; import org.apache.flink.types.StringValue; -import org.apache.flink.types.parser.FieldParser; -import org.apache.flink.types.parser.StringValueParser; public class QuotedStringValueParserTest extends ParserTestBase<StringValue> { @@ -69,4 +67,4 @@ public class QuotedStringValueParserTest extends ParserTestBase<StringValue> { public Class<StringValue> getTypeClass() { return StringValue.class; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java index 201714b..822d871 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java @@ -19,9 +19,6 @@ package org.apache.flink.types.parser; -import org.apache.flink.types.parser.FieldParser; -import org.apache.flink.types.parser.ShortParser; - public class ShortParserTest extends ParserTestBase<Short> { http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java index 59e9c52..c4b5f02 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java @@ -20,8 +20,6 @@ package org.apache.flink.types.parser; import org.apache.flink.types.ShortValue; -import org.apache.flink.types.parser.ShortValueParser; -import org.apache.flink.types.parser.FieldParser; public class ShortValueParserTest extends ParserTestBase<ShortValue> { http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java index 8e75192..739bd76 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java @@ -19,9 +19,6 @@ package org.apache.flink.types.parser; -import org.apache.flink.types.parser.StringParser; -import org.apache.flink.types.parser.FieldParser; - public class UnquotedStringParserTest extends ParserTestBase<String> { @@ -58,4 +55,4 @@ public class UnquotedStringParserTest extends ParserTestBase<String> { public Class<String> getTypeClass() { return String.class; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java index 4f9069e..1fe8850 100644 --- a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java @@ -23,7 +23,6 @@ import static org.junit.Assert.assertTrue; import org.apache.flink.types.StringValue; import org.apache.flink.types.Value; -import org.apache.flink.types.parser.StringValueParser; import org.junit.Test; public class VarLengthStringParserTest { http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala index 1eb056c..b0ab801 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala @@ -36,7 +36,8 @@ class RowCsvInputFormat( rowTypeInfo: RowTypeInfo, lineDelimiter: String = DEFAULT_LINE_DELIMITER, fieldDelimiter: String = DEFAULT_FIELD_DELIMITER, - includedFieldsMask: Array[Boolean] = null) + includedFieldsMask: Array[Boolean] = null, + emptyColumnAsNull: Boolean = false) extends CsvInputFormat[Row](filePath) { if (rowTypeInfo.getArity == 0) { @@ -134,8 +135,8 @@ class RowCsvInputFormat( holders(output)) if (!isLenient && (parser.getErrorState ne ParseErrorState.NONE)) { - // Row is able to handle null values - if (parser.getErrorState ne ParseErrorState.EMPTY_STRING) { + // the error state EMPTY_COLUMN is ignored + if (parser.getErrorState ne ParseErrorState.EMPTY_COLUMN) { throw new ParseException(s"Parsing error for column $field of row '" + new String(bytes, offset, numBytes) + s"' originated by ${parser.getClass.getSimpleName}: ${parser.getErrorState}.") @@ -143,8 +144,11 @@ class RowCsvInputFormat( } holders(output) = parser.getLastResult - // check parse result - if (startPos < 0) { + // check parse result: + // the result is null if it is invalid + // or empty with emptyColumnAsNull enabled + if (startPos < 0 || + (emptyColumnAsNull && (parser.getErrorState eq ParseErrorState.EMPTY_COLUMN))) { holders(output) = null startPos = skipFields(bytes, latestValidPos, limit, fieldDelimiter) } http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala index 540776d..db01b69 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala @@ -379,39 +379,40 @@ class RowCsvInputFormatTest { @Test def testEmptyFields() { val fileContent = - "|0|0|0|0|0|\n" + - "1||1|1|1|1|\n" + - "2|2||2|2|2|\n" + - "3|3|3||3|3|\n" + - "4|4|4|4||4|\n" + - "5|5|5|5|5||\n" + ",,,,,,,,\n" + + ",,,,,,,,\n" + + ",,,,,,,,\n" + + ",,,,,,,,\n" + + ",,,,,,,,\n" + + ",,,,,,,,\n" + + ",,,,,,,,\n" + + ",,,,,,,,\n" val split = createTempFile(fileContent) - // TODO: FLOAT_TYPE_INFO and DOUBLE_TYPE_INFO don't handle correctly null values val typeInfo = new RowTypeInfo(Seq( - BasicTypeInfo.SHORT_TYPE_INFO, + BasicTypeInfo.BOOLEAN_TYPE_INFO, + BasicTypeInfo.BYTE_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.FLOAT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.BYTE_TYPE_INFO)) + BasicTypeInfo.SHORT_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO)) - val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo) - format.setFieldDelimiter("|") + val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo, emptyColumnAsNull = true) + format.setFieldDelimiter(",") format.configure(new Configuration) format.open(split) - var result = new Row(6) + var result = new Row(8) val linesCnt = fileContent.split("\n").length - var i = 0 - while (i < linesCnt) { + for (i <- 0 until linesCnt) yield { result = format.nextRecord(result) assertNull(result.productElement(i)) - i += 1 } - + // ensure no more rows assertNull(format.nextRecord(result)) assertTrue(format.reachedEnd)