Repository: flink
Updated Branches:
  refs/heads/master d7b59d761 -> 4b1a9c72e


[FLINK-4081] [core] [table] FieldParsers should support empty strings

This closes #2297.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b1a9c72
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b1a9c72
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b1a9c72

Branch: refs/heads/master
Commit: 4b1a9c72e99125680035e5dadc148b187d9d4124
Parents: d7b59d7
Author: twalthr <twal...@apache.org>
Authored: Tue Jul 26 16:24:24 2016 +0200
Committer: twalthr <twal...@apache.org>
Committed: Mon Sep 19 14:35:59 2016 +0200

----------------------------------------------------------------------
 .../flink/types/parser/BooleanParser.java       |  4 +++
 .../apache/flink/types/parser/ByteParser.java   |  2 +-
 .../flink/types/parser/ByteValueParser.java     |  2 +-
 .../apache/flink/types/parser/DoubleParser.java |  4 +++
 .../flink/types/parser/DoubleValueParser.java   |  4 +++
 .../apache/flink/types/parser/FieldParser.java  |  4 +--
 .../apache/flink/types/parser/FloatParser.java  |  4 +++
 .../flink/types/parser/FloatValueParser.java    |  4 +++
 .../apache/flink/types/parser/IntParser.java    |  2 +-
 .../flink/types/parser/IntValueParser.java      |  2 +-
 .../apache/flink/types/parser/LongParser.java   |  2 +-
 .../flink/types/parser/LongValueParser.java     |  2 +-
 .../apache/flink/types/parser/ShortParser.java  |  2 +-
 .../flink/types/parser/ShortValueParser.java    |  2 +-
 .../apache/flink/types/parser/StringParser.java |  8 ++++-
 .../flink/types/parser/StringValueParser.java   |  6 ++++
 .../types/parser/BooleanValueParserTest.java    |  2 +-
 .../flink/types/parser/ByteParserTest.java      |  3 --
 .../flink/types/parser/ByteValueParserTest.java |  2 --
 .../flink/types/parser/DoubleParserTest.java    |  3 --
 .../types/parser/DoubleValueParserTest.java     |  2 --
 .../flink/types/parser/FloatParserTest.java     |  3 --
 .../types/parser/FloatValueParserTest.java      |  2 --
 .../flink/types/parser/IntParserTest.java       |  3 --
 .../flink/types/parser/IntValueParserTest.java  |  2 --
 .../flink/types/parser/LongParserTest.java      |  3 --
 .../flink/types/parser/LongValueParserTest.java |  2 --
 .../flink/types/parser/ParserTestBase.java      |  2 ++
 .../types/parser/QuotedStringParserTest.java    |  2 +-
 .../parser/QuotedStringValueParserTest.java     |  4 +--
 .../flink/types/parser/ShortParserTest.java     |  3 --
 .../types/parser/ShortValueParserTest.java      |  2 --
 .../types/parser/UnquotedStringParserTest.java  |  5 +--
 .../types/parser/VarLengthStringParserTest.java |  1 -
 .../table/runtime/io/RowCsvInputFormat.scala    | 14 +++++---
 .../runtime/io/RowCsvInputFormatTest.scala      | 37 ++++++++++----------
 36 files changed, 77 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
index 90fa41e..f8b890a 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/BooleanParser.java
@@ -44,6 +44,10 @@ public class BooleanParser extends FieldParser<Boolean> {
 
                while (i < limit) {
                        if (i < delimLimit && delimiterNext(bytes, i, delim)) {
+                               if (i == startPos) {
+                                       
setErrorState(ParseErrorState.EMPTY_COLUMN);
+                                       return -1;
+                               }
                                break;
                        }
                        i++;

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
index a521ac1..7ee257e 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/ByteParser.java
@@ -48,7 +48,7 @@ public class ByteParser extends FieldParser<Byte> {
                for (int i = startPos; i < limit; i++) {
                        if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
                                if (i == startPos) {
-                                       
setErrorState(ParseErrorState.EMPTY_STRING);
+                                       
setErrorState(ParseErrorState.EMPTY_COLUMN);
                                        return -1;
                                }
                                this.result = (byte) (neg ? -val : val);

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
index 4cda98c..c79f5d4 100644
--- 
a/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
+++ 
b/flink-core/src/main/java/org/apache/flink/types/parser/ByteValueParser.java
@@ -55,7 +55,7 @@ public class ByteValueParser extends FieldParser<ByteValue> {
 
                        if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
                                if (i == startPos) {
-                                       
setErrorState(ParseErrorState.EMPTY_STRING);
+                                       
setErrorState(ParseErrorState.EMPTY_COLUMN);
                                        return -1;
                                }
                                reusable.setValue((byte) (neg ? -val : val));

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
index 0b2f5a2..8af496d 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleParser.java
@@ -39,6 +39,10 @@ public class DoubleParser extends FieldParser<Double> {
 
                while (i < limit) {
                        if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
+                               if (i == startPos) {
+                                       
setErrorState(ParseErrorState.EMPTY_COLUMN);
+                                       return -1;
+                               }
                                break;
                        }
                        i++;

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
index 333e6c9..5c657be 100644
--- 
a/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
+++ 
b/flink-core/src/main/java/org/apache/flink/types/parser/DoubleValueParser.java
@@ -39,6 +39,10 @@ public class DoubleValueParser extends 
FieldParser<DoubleValue> {
 
                while (i < limit) {
                        if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
+                               if (i == startPos) {
+                                       
setErrorState(ParseErrorState.EMPTY_COLUMN);
+                                       return -1;
+                               }
                                break;
                        }
                        i++;

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
index 67c1bd7..5f17840 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
@@ -69,8 +69,8 @@ public abstract class FieldParser<T> {
                /** The parser found characters between the end of the quoted 
string and the delimiter. */
                UNQUOTED_CHARS_AFTER_QUOTED_STRING,
 
-               /** The string is empty. */
-               EMPTY_STRING,
+               /** The column is empty. */
+               EMPTY_COLUMN,
 
                /** Invalid Boolean value **/
                BOOLEAN_INVALID

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
index a47877e..3304f24 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FloatParser.java
@@ -39,6 +39,10 @@ public class FloatParser extends FieldParser<Float> {
 
                while (i < limit) {
                        if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
+                               if (i == startPos) {
+                                       
setErrorState(ParseErrorState.EMPTY_COLUMN);
+                                       return -1;
+                               }
                                break;
                        }
                        i++;

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
index b6da3d3..26ee47b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
+++ 
b/flink-core/src/main/java/org/apache/flink/types/parser/FloatValueParser.java
@@ -39,6 +39,10 @@ public class FloatValueParser extends 
FieldParser<FloatValue> {
 
                while (i < limit) {
                        if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
+                               if (i == startPos) {
+                                       
setErrorState(ParseErrorState.EMPTY_COLUMN);
+                                       return -1;
+                               }
                                break;
                        }
                        i++;

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
index 4d2ae7c..4e5d43f 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/IntParser.java
@@ -57,7 +57,7 @@ public class IntParser extends FieldParser<Integer> {
                for (int i = startPos; i < limit; i++) {
                        if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
                                if (i == startPos) {
-                                       
setErrorState(ParseErrorState.EMPTY_STRING);
+                                       
setErrorState(ParseErrorState.EMPTY_COLUMN);
                                        return -1;
                                }
                                this.result = (int) (neg ? -val : val);

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
index d487c66..0229bc7 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/IntValueParser.java
@@ -57,7 +57,7 @@ public class IntValueParser extends FieldParser<IntValue> {
                for (int i = startPos; i < limit; i++) {
                        if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
                                if (i == startPos) {
-                                       
setErrorState(ParseErrorState.EMPTY_STRING);
+                                       
setErrorState(ParseErrorState.EMPTY_COLUMN);
                                        return -1;
                                }
                                reusable.setValue((int) (neg ? -val : val));

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
index c7b76d2..79eb080 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java
@@ -51,7 +51,7 @@ public class LongParser extends FieldParser<Long> {
                for (int i = startPos; i < limit; i++) {
                        if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
                                if (i == startPos) {
-                                       
setErrorState(ParseErrorState.EMPTY_STRING);
+                                       
setErrorState(ParseErrorState.EMPTY_COLUMN);
                                        return -1;
                                }
                                this.result = neg ? -val : val;

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
index 597abc0..5ddd40c 100644
--- 
a/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
+++ 
b/flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java
@@ -54,7 +54,7 @@ public class LongValueParser extends FieldParser<LongValue> {
                for (int i = startPos; i < limit; i++) {
                        if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
                                if (i == startPos) {
-                                       
setErrorState(ParseErrorState.EMPTY_STRING);
+                                       
setErrorState(ParseErrorState.EMPTY_COLUMN);
                                        return -1;
                                }
                                reusable.setValue(neg ? -val : val);

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
index 3afa761..c458a3f 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/ShortParser.java
@@ -55,7 +55,7 @@ public class ShortParser extends FieldParser<Short> {
                for (int i = startPos; i < limit; i++) {
                        if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
                                if (i == startPos) {
-                                       
setErrorState(ParseErrorState.EMPTY_STRING);
+                                       
setErrorState(ParseErrorState.EMPTY_COLUMN);
                                        return -1;
                                }
                                this.result = (short) (neg ? -val : val);

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
index 880af25..47471a3 100644
--- 
a/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
+++ 
b/flink-core/src/main/java/org/apache/flink/types/parser/ShortValueParser.java
@@ -57,7 +57,7 @@ public class ShortValueParser extends FieldParser<ShortValue> 
{
                for (int i = startPos; i < limit; i++) {
                        if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
                                if (i == startPos) {
-                                       
setErrorState(ParseErrorState.EMPTY_STRING);
+                                       
setErrorState(ParseErrorState.EMPTY_COLUMN);
                                        return -1;
                                }
                                reusable.setValue((short) (neg ? -val : val));

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
index 9cee990..1a2c7e3 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java
@@ -47,7 +47,7 @@ public class StringParser extends FieldParser<String> {
                final int delimLimit = limit - delimiter.length + 1;
 
                if(quotedStringParsing && bytes[i] == quoteCharacter) {
-                       // quoted string parsing enabled and first character 
Vis a quote
+                       // quoted string parsing enabled and first character is 
a quote
                        i++;
 
                        // search for ending quote character, continue when it 
is escaped
@@ -84,10 +84,16 @@ public class StringParser extends FieldParser<String> {
 
                        if (i >= delimLimit) {
                                // no delimiter found. Take the full string
+                               if (limit == startPos) {
+                                       
setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column
+                               }
                                this.result = new String(bytes, startPos, limit 
- startPos);
                                return limit;
                        } else {
                                // delimiter found.
+                               if (i == startPos) {
+                                       
setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column
+                               }
                                this.result = new String(bytes, startPos, i - 
startPos);
                                return i + delimiter.length;
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java
index b136576..c72b029 100644
--- 
a/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java
+++ 
b/flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java
@@ -90,10 +90,16 @@ public class StringValueParser extends 
FieldParser<StringValue> {
 
                        if (i >= delimLimit) {
                                // no delimiter found. Take the full string
+                               if (limit == startPos) {
+                                       
setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column
+                               }
                                reusable.setValueAscii(bytes, startPos, limit - 
startPos);
                                return limit;
                        } else {
                                // delimiter found.
+                               if (i == startPos) {
+                                       
setErrorState(ParseErrorState.EMPTY_COLUMN); // mark empty column
+                               }
                                reusable.setValueAscii(bytes, startPos, i - 
startPos);
                                return i + delimiter.length;
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/BooleanValueParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/BooleanValueParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/BooleanValueParserTest.java
index ff1885d..3b120e2 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/BooleanValueParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/BooleanValueParserTest.java
@@ -19,9 +19,9 @@
 
 package org.apache.flink.types.parser;
 
-
 import org.apache.flink.types.BooleanValue;
 
+
 public class BooleanValueParserTest extends ParserTestBase<BooleanValue> {
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java 
b/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
index dac5144..579f003 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ByteParserTest.java
@@ -19,9 +19,6 @@
 
 package org.apache.flink.types.parser;
 
-import org.apache.flink.types.parser.ByteParser;
-import org.apache.flink.types.parser.FieldParser;
-
 
 public class ByteParserTest extends ParserTestBase<Byte> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
index 31b60d4..f5abe05 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/ByteValueParserTest.java
@@ -20,8 +20,6 @@
 package org.apache.flink.types.parser;
 
 import org.apache.flink.types.ByteValue;
-import org.apache.flink.types.parser.ByteValueParser;
-import org.apache.flink.types.parser.FieldParser;
 
 
 public class ByteValueParserTest extends ParserTestBase<ByteValue> {

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java 
b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java
index bda8252..98655d1 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleParserTest.java
@@ -19,9 +19,6 @@
 
 package org.apache.flink.types.parser;
 
-import org.apache.flink.types.parser.DoubleParser;
-import org.apache.flink.types.parser.FieldParser;
-
 
 public class DoubleParserTest extends ParserTestBase<Double> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
index fbbb5f2..dfe8936 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/DoubleValueParserTest.java
@@ -20,8 +20,6 @@
 package org.apache.flink.types.parser;
 
 import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.parser.DoubleValueParser;
-import org.apache.flink.types.parser.FieldParser;
 
 
 public class DoubleValueParserTest extends ParserTestBase<DoubleValue> {

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java 
b/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java
index d05557f..480f1fb 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/FloatParserTest.java
@@ -19,9 +19,6 @@
 
 package org.apache.flink.types.parser;
 
-import org.apache.flink.types.parser.FieldParser;
-import org.apache.flink.types.parser.FloatParser;
-
 
 public class FloatParserTest extends ParserTestBase<Float> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
index 5c6e1c3..d71b20a 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/FloatValueParserTest.java
@@ -20,8 +20,6 @@
 package org.apache.flink.types.parser;
 
 import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.parser.FloatValueParser;
-import org.apache.flink.types.parser.FieldParser;
 
 
 public class FloatValueParserTest extends ParserTestBase<FloatValue> {

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java 
b/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java
index 1d33b51..f587086 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/IntParserTest.java
@@ -19,9 +19,6 @@
 
 package org.apache.flink.types.parser;
 
-import org.apache.flink.types.parser.FieldParser;
-import org.apache.flink.types.parser.IntParser;
-
 
 public class IntParserTest extends ParserTestBase<Integer> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
index eb0403e..a70d65c 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/IntValueParserTest.java
@@ -20,8 +20,6 @@
 package org.apache.flink.types.parser;
 
 import org.apache.flink.types.IntValue;
-import org.apache.flink.types.parser.IntValueParser;
-import org.apache.flink.types.parser.FieldParser;
 
 
 public class IntValueParserTest extends ParserTestBase<IntValue> {

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java 
b/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java
index b17de9d..d32eef1 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/LongParserTest.java
@@ -19,9 +19,6 @@
 
 package org.apache.flink.types.parser;
 
-import org.apache.flink.types.parser.FieldParser;
-import org.apache.flink.types.parser.LongParser;
-
 
 public class LongParserTest extends ParserTestBase<Long> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
index f4d82a0..b9c5eec 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/LongValueParserTest.java
@@ -20,8 +20,6 @@
 package org.apache.flink.types.parser;
 
 import org.apache.flink.types.LongValue;
-import org.apache.flink.types.parser.LongValueParser;
-import org.apache.flink.types.parser.FieldParser;
 
 
 public class LongValueParserTest extends ParserTestBase<LongValue> {

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java 
b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
index b979a38..9b02147 100644
--- a/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java
@@ -414,6 +414,8 @@ public abstract class ParserTestBase<T> extends TestLogger {
                                byte[] bytes = emptyString.getBytes();
                                int numRead = parser.parseField(bytes, 0, 
bytes.length, new byte[]{'|'}, parser.createValue());
 
+                               
assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, parser.getErrorState());
+
                                if(this.allowsEmptyField()) {
                                        assertTrue("Parser declared the empty 
string as invalid.", numRead != -1);
                                        assertEquals("Invalid number of bytes 
read returned.", bytes.length, numRead);

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java
index d4b7e1f..6fda78a 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringParserTest.java
@@ -65,4 +65,4 @@ public class QuotedStringParserTest extends 
ParserTestBase<String> {
     public Class<String> getTypeClass() {
         return String.class;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java
index 2801582..2cf901c 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/QuotedStringValueParserTest.java
@@ -20,8 +20,6 @@
 package org.apache.flink.types.parser;
 
 import org.apache.flink.types.StringValue;
-import org.apache.flink.types.parser.FieldParser;
-import org.apache.flink.types.parser.StringValueParser;
 
 
 public class QuotedStringValueParserTest extends ParserTestBase<StringValue> {
@@ -69,4 +67,4 @@ public class QuotedStringValueParserTest extends 
ParserTestBase<StringValue> {
     public Class<StringValue> getTypeClass() {
         return StringValue.class;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java 
b/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java
index 201714b..822d871 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/ShortParserTest.java
@@ -19,9 +19,6 @@
 
 package org.apache.flink.types.parser;
 
-import org.apache.flink.types.parser.FieldParser;
-import org.apache.flink.types.parser.ShortParser;
-
 
 public class ShortParserTest extends ParserTestBase<Short> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
index 59e9c52..c4b5f02 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/ShortValueParserTest.java
@@ -20,8 +20,6 @@
 package org.apache.flink.types.parser;
 
 import org.apache.flink.types.ShortValue;
-import org.apache.flink.types.parser.ShortValueParser;
-import org.apache.flink.types.parser.FieldParser;
 
 
 public class ShortValueParserTest extends ParserTestBase<ShortValue> {

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java
index 8e75192..739bd76 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/UnquotedStringParserTest.java
@@ -19,9 +19,6 @@
 
 package org.apache.flink.types.parser;
 
-import org.apache.flink.types.parser.StringParser;
-import org.apache.flink.types.parser.FieldParser;
-
 
 public class UnquotedStringParserTest extends ParserTestBase<String> {
 
@@ -58,4 +55,4 @@ public class UnquotedStringParserTest extends 
ParserTestBase<String> {
     public Class<String> getTypeClass() {
         return String.class;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
index 4f9069e..1fe8850 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/VarLengthStringParserTest.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertTrue;
 
 import org.apache.flink.types.StringValue;
 import org.apache.flink.types.Value;
-import org.apache.flink.types.parser.StringValueParser;
 import org.junit.Test;
 
 public class VarLengthStringParserTest {

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala
index 1eb056c..b0ab801 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala
@@ -36,7 +36,8 @@ class RowCsvInputFormat(
     rowTypeInfo: RowTypeInfo,
     lineDelimiter: String = DEFAULT_LINE_DELIMITER,
     fieldDelimiter: String = DEFAULT_FIELD_DELIMITER,
-    includedFieldsMask: Array[Boolean] = null)
+    includedFieldsMask: Array[Boolean] = null,
+    emptyColumnAsNull: Boolean = false)
   extends CsvInputFormat[Row](filePath) {
 
   if (rowTypeInfo.getArity == 0) {
@@ -134,8 +135,8 @@ class RowCsvInputFormat(
           holders(output))
 
         if (!isLenient && (parser.getErrorState ne ParseErrorState.NONE)) {
-          // Row is able to handle null values
-          if (parser.getErrorState ne ParseErrorState.EMPTY_STRING) {
+          // the error state EMPTY_COLUMN is ignored
+          if (parser.getErrorState ne ParseErrorState.EMPTY_COLUMN) {
             throw new ParseException(s"Parsing error for column $field of row 
'"
               + new String(bytes, offset, numBytes)
               + s"' originated by ${parser.getClass.getSimpleName}: 
${parser.getErrorState}.")
@@ -143,8 +144,11 @@ class RowCsvInputFormat(
         }
         holders(output) = parser.getLastResult
 
-        // check parse result
-        if (startPos < 0) {
+        // check parse result:
+        // the result is null if it is invalid
+        // or empty with emptyColumnAsNull enabled
+        if (startPos < 0 ||
+            (emptyColumnAsNull && (parser.getErrorState eq 
ParseErrorState.EMPTY_COLUMN))) {
           holders(output) = null
           startPos = skipFields(bytes, latestValidPos, limit, fieldDelimiter)
         }

http://git-wip-us.apache.org/repos/asf/flink/blob/4b1a9c72/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
index 540776d..db01b69 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
@@ -379,39 +379,40 @@ class RowCsvInputFormatTest {
   @Test
   def testEmptyFields() {
     val fileContent =
-      "|0|0|0|0|0|\n" +
-        "1||1|1|1|1|\n" +
-        "2|2||2|2|2|\n" +
-        "3|3|3||3|3|\n" +
-        "4|4|4|4||4|\n" +
-        "5|5|5|5|5||\n"
+      ",,,,,,,,\n" +
+      ",,,,,,,,\n" +
+      ",,,,,,,,\n" +
+      ",,,,,,,,\n" +
+      ",,,,,,,,\n" +
+      ",,,,,,,,\n" +
+      ",,,,,,,,\n" +
+      ",,,,,,,,\n"
 
     val split = createTempFile(fileContent)
 
-    // TODO: FLOAT_TYPE_INFO and DOUBLE_TYPE_INFO don't handle correctly null 
values
     val typeInfo = new RowTypeInfo(Seq(
-      BasicTypeInfo.SHORT_TYPE_INFO,
+      BasicTypeInfo.BOOLEAN_TYPE_INFO,
+      BasicTypeInfo.BYTE_TYPE_INFO,
+      BasicTypeInfo.DOUBLE_TYPE_INFO,
+      BasicTypeInfo.FLOAT_TYPE_INFO,
       BasicTypeInfo.INT_TYPE_INFO,
       BasicTypeInfo.LONG_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.BYTE_TYPE_INFO))
+      BasicTypeInfo.SHORT_TYPE_INFO,
+      BasicTypeInfo.STRING_TYPE_INFO))
 
-    val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo)
-    format.setFieldDelimiter("|")
+    val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo, 
emptyColumnAsNull = true)
+    format.setFieldDelimiter(",")
     format.configure(new Configuration)
     format.open(split)
 
-    var result = new Row(6)
+    var result = new Row(8)
     val linesCnt = fileContent.split("\n").length
 
-    var i = 0
-    while (i < linesCnt) {
+    for (i <- 0 until linesCnt) yield {
       result = format.nextRecord(result)
       assertNull(result.productElement(i))
-      i += 1
     }
-    
+
     // ensure no more rows
     assertNull(format.nextRecord(result))
     assertTrue(format.reachedEnd)

Reply via email to