[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...

2018-01-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5218


---


[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...

2018-01-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159691205
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java 
---
@@ -45,6 +45,12 @@ public void enableQuotedStringParsing(byte 
quoteCharacter) {
@Override
public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, StringValue reusable) {
 
+   if (startPos == limit) {
+   setErrorState(ParseErrorState.EMPTY_COLUMN);
+   reusable.setValueAscii(bytes, startPos, limit - 
startPos);
--- End diff --

we can also remove the condition in line 99.


---


[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...

2018-01-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159691114
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java ---
@@ -42,6 +42,12 @@ public void enableQuotedStringParsing(byte 
quoteCharacter) {
@Override
public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, String reusable) {
 
+   if (startPos == limit) {
+   setErrorState(ParseErrorState.EMPTY_COLUMN);
+   this.result = new String(bytes, startPos, limit - 
startPos, getCharset());
--- End diff --

We can also remove the condition in line 93.


---


[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...

2018-01-04 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159683964
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java ---
@@ -86,6 +92,11 @@ public int parseField(byte[] bytes, int startPos, int 
limit, byte[] delimiter, L
}
}
 
+   if (startPos == limit) {
--- End diff --

Good catch!


---


[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...

2018-01-04 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159682488
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java ---
@@ -42,6 +42,12 @@ public void enableQuotedStringParsing(byte 
quoteCharacter) {
@Override
public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, String reusable) {
 
+   if (startPos == limit) {
+   setErrorState(ParseErrorState.EMPTY_COLUMN);
+   this.result = new String(bytes, startPos, limit - 
startPos, getCharset());
--- End diff --

I find new String will do some check.. but change to `this.result=""` also 
make sense.


---


[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...

2018-01-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159670504
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/types/parser/StringValueParser.java 
---
@@ -45,6 +45,12 @@ public void enableQuotedStringParsing(byte 
quoteCharacter) {
@Override
public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, StringValue reusable) {
 
+   if (startPos == limit) {
+   setErrorState(ParseErrorState.EMPTY_COLUMN);
+   reusable.setValueAscii(bytes, startPos, limit - 
startPos);
--- End diff --

change to `reusable.setValueAscii(bytes, startPos, 0);`

also this early out means that the later check `if (limit == startPos)` in 
line 99 will never be true and can be removed.


---


[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...

2018-01-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159670229
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/types/parser/StringParser.java ---
@@ -42,6 +42,12 @@ public void enableQuotedStringParsing(byte 
quoteCharacter) {
@Override
public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, String reusable) {
 
+   if (startPos == limit) {
+   setErrorState(ParseErrorState.EMPTY_COLUMN);
+   this.result = new String(bytes, startPos, limit - 
startPos, getCharset());
--- End diff --

change to `this.result = "";`

also this early out means that the later check `if (limit == startPos)` in 
line 93 will never be true and can be removed.
  


---


[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...

2018-01-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159667871
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/types/parser/LongValueParser.java ---
@@ -89,6 +95,11 @@ public int parseField(byte[] bytes, int startPos, int 
limit, byte[] delimiter, L
}
}
 
+   if (startPos == limit) {
--- End diff --

Why do we need this additional check? 
This should never evaluate to `true` because the condition is checked 
before.
  


---


[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...

2018-01-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159667815
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/types/parser/LongParser.java ---
@@ -86,6 +92,11 @@ public int parseField(byte[] bytes, int startPos, int 
limit, byte[] delimiter, L
}
}
 
+   if (startPos == limit) {
--- End diff --

Why do we need this additional check? 
This should never evaluate to `true` because the condition is checked 
before.
  
  


---


[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...

2018-01-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159678062
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/types/parser/FieldParserTest.java ---
@@ -43,4 +46,100 @@ public void testEndsWithDelimiter() throws Exception {
assertFalse(FieldParser.endsWithDelimiter(bytes, 3, delim));
}
 
-}
\ No newline at end of file
+   @Test
+   public void testNextStringEndPos() throws Exception {
+
+   FieldParser parser = new TestFieldParser();
+   // single-char delimiter
+   byte[] singleCharDelim = 
"|".getBytes(ConfigConstants.DEFAULT_CHARSET);
+
+   byte[] bytes1 = "a|".getBytes(ConfigConstants.DEFAULT_CHARSET);
+   assertEquals(1, parser.nextStringEndPos(bytes1, 0, 
bytes1.length, singleCharDelim));
+   assertEquals(-1, parser.nextStringEndPos(bytes1, 1, 
bytes1.length, singleCharDelim));
+   assertEquals(ParseErrorState.EMPTY_COLUMN, 
parser.getErrorState());
+
+   parser.resetParserState();
--- End diff --

please add 

```
parser.resetParserState();
assertEquals(-1, parser.nextStringEndPos(bytes1, 2, bytes1.length, 
singleCharDelim));
assertEquals(ParseErrorState.EMPTY_COLUMN, parser.getErrorState());
```


---


[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...

2017-12-31 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159139198
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java ---
@@ -407,26 +407,47 @@ public void testStaticParseMethodWithInvalidValues() {
@Test
public void testEmptyFieldInIsolation() {
try {
-   String [] emptyStrings = new String[] {"|"};
+   FieldParser parser = getParser();
+
+   byte[] bytes = 
"|".getBytes(ConfigConstants.DEFAULT_CHARSET);
--- End diff --

Yes, we can keep `testEmptyFieldInIsolation`.


---


[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...

2017-12-31 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159139167
  
--- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
 ---
@@ -362,61 +365,95 @@ public void readStringFieldsWithTrailingDelimiters() 
throws Exception {
 
@Test
public void testTailingEmptyFields() throws Exception {
-   String fileContent = "abc|-def|-ghijk\n" +
-   "abc|-def|-\n" +
-   "abc|-|-\n" +
-   "|-|-|-\n" +
-   "|-|-\n" +
-   "abc|-def\n";
 
-   FileInputSplit split = createTempFile(fileContent);
-
-   TypeInformation[] fieldTypes = new TypeInformation[]{
-   BasicTypeInfo.STRING_TYPE_INFO,
-   BasicTypeInfo.STRING_TYPE_INFO,
-   BasicTypeInfo.STRING_TYPE_INFO};
-
-   RowCsvInputFormat format = new RowCsvInputFormat(PATH, 
fieldTypes, "\n", "|");
-   format.setFieldDelimiter("|-");
-   format.configure(new Configuration());
-   format.open(split);
-
-   Row result = new Row(3);
-
-   result = format.nextRecord(result);
-   assertNotNull(result);
-   assertEquals("abc", result.getField(0));
-   assertEquals("def", result.getField(1));
-   assertEquals("ghijk", result.getField(2));
+   List> dataList 
= new java.util.ArrayList<>();
+
+   // test String
+   dataList.add(new Tuple4<>(BasicTypeInfo.STRING_TYPE_INFO, 
"bdc", "bdc", ""));
+   // test BigInt
+   dataList.add(new Tuple4<>(BasicTypeInfo.BIG_INT_TYPE_INFO,
--- End diff --

ooh,you are right. make sense to me. :)


---


[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...

2017-12-31 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159137345
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java ---
@@ -407,26 +407,47 @@ public void testStaticParseMethodWithInvalidValues() {
@Test
public void testEmptyFieldInIsolation() {
try {
-   String [] emptyStrings = new String[] {"|"};
+   FieldParser parser = getParser();
+
+   byte[] bytes = 
"|".getBytes(ConfigConstants.DEFAULT_CHARSET);
+   int numRead = parser.parseField(bytes, 0, bytes.length, 
new byte[]{'|'}, parser.createValue());
+
+   assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, 
parser.getErrorState());
+
+   if (this.allowsEmptyField()) {
+   assertTrue("Parser declared the empty string as 
invalid.", numRead != -1);
+   assertEquals("Invalid number of bytes read 
returned.", bytes.length, numRead);
+   } else {
+   assertTrue("Parser accepted the empty string.", 
numRead == -1);
+   }
+   } catch (Exception e) {
+   System.err.println(e.getMessage());
+   e.printStackTrace();
+   fail("Test erroneous: " + e.getMessage());
+   }
+   }
+
+   @Test
+   public void testTailingEmptyField() {
+   try {
+   String[] tailingEmptyFieldStrings = new String[]{"a|", 
"a||"};
 
FieldParser parser = getParser();
 
-   for (String emptyString : emptyStrings) {
-   byte[] bytes = 
emptyString.getBytes(ConfigConstants.DEFAULT_CHARSET);
-   int numRead = parser.parseField(bytes, 0, 
bytes.length, new byte[]{'|'}, parser.createValue());
+   for (String tailingEmptyFieldString : 
tailingEmptyFieldStrings) {
+   byte[] bytes = 
tailingEmptyFieldString.getBytes(ConfigConstants.DEFAULT_CHARSET);
+   int numRead = parser.parseField(bytes, 1, 
bytes.length, new byte[]{'|'}, parser.createValue());
 

assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, parser.getErrorState());
 
-   if(this.allowsEmptyField()) {
+   if (this.allowsEmptyField()) {
--- End diff --

Yes, you are right. I misinterpreted the `allowsEmptyField()` method. We 
need to keep it.


---


[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...

2017-12-31 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159136839
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java ---
@@ -407,26 +407,47 @@ public void testStaticParseMethodWithInvalidValues() {
@Test
public void testEmptyFieldInIsolation() {
try {
-   String [] emptyStrings = new String[] {"|"};
+   FieldParser parser = getParser();
+
+   byte[] bytes = 
"|".getBytes(ConfigConstants.DEFAULT_CHARSET);
+   int numRead = parser.parseField(bytes, 0, bytes.length, 
new byte[]{'|'}, parser.createValue());
+
+   assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, 
parser.getErrorState());
+
+   if (this.allowsEmptyField()) {
+   assertTrue("Parser declared the empty string as 
invalid.", numRead != -1);
+   assertEquals("Invalid number of bytes read 
returned.", bytes.length, numRead);
+   } else {
+   assertTrue("Parser accepted the empty string.", 
numRead == -1);
+   }
+   } catch (Exception e) {
+   System.err.println(e.getMessage());
+   e.printStackTrace();
+   fail("Test erroneous: " + e.getMessage());
+   }
+   }
+
+   @Test
+   public void testTailingEmptyField() {
+   try {
+   String[] tailingEmptyFieldStrings = new String[]{"a|", 
"a||"};
 
FieldParser parser = getParser();
 
-   for (String emptyString : emptyStrings) {
-   byte[] bytes = 
emptyString.getBytes(ConfigConstants.DEFAULT_CHARSET);
-   int numRead = parser.parseField(bytes, 0, 
bytes.length, new byte[]{'|'}, parser.createValue());
+   for (String tailingEmptyFieldString : 
tailingEmptyFieldStrings) {
+   byte[] bytes = 
tailingEmptyFieldString.getBytes(ConfigConstants.DEFAULT_CHARSET);
+   int numRead = parser.parseField(bytes, 1, 
bytes.length, new byte[]{'|'}, parser.createValue());
 

assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, parser.getErrorState());
 
-   if(this.allowsEmptyField()) {
+   if (this.allowsEmptyField()) {
--- End diff --

Oh, I find you added 4 test file ( `QuotedStringParserTest` | 
`QuotedStringValueParserTest` 
|`UnquotedStringParserTest`|`UnquotedStringValueParserTest` ) 
`allowsEmptyField` is true. 
so i keep this `allowsEmptyField` in `ParserTestBase`. 


---


[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...

2017-12-31 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159135944
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java ---
@@ -407,26 +407,47 @@ public void testStaticParseMethodWithInvalidValues() {
@Test
public void testEmptyFieldInIsolation() {
try {
-   String [] emptyStrings = new String[] {"|"};
+   FieldParser parser = getParser();
+
+   byte[] bytes = 
"|".getBytes(ConfigConstants.DEFAULT_CHARSET);
--- End diff --

this is the same case as `"a|"` in `testTailingEmptyField()`. We only need 
one of these.


---


[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...

2017-12-31 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159135908
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/types/parser/ParserTestBase.java ---
@@ -407,26 +407,47 @@ public void testStaticParseMethodWithInvalidValues() {
@Test
public void testEmptyFieldInIsolation() {
try {
-   String [] emptyStrings = new String[] {"|"};
+   FieldParser parser = getParser();
+
+   byte[] bytes = 
"|".getBytes(ConfigConstants.DEFAULT_CHARSET);
+   int numRead = parser.parseField(bytes, 0, bytes.length, 
new byte[]{'|'}, parser.createValue());
+
+   assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, 
parser.getErrorState());
+
+   if (this.allowsEmptyField()) {
+   assertTrue("Parser declared the empty string as 
invalid.", numRead != -1);
+   assertEquals("Invalid number of bytes read 
returned.", bytes.length, numRead);
+   } else {
+   assertTrue("Parser accepted the empty string.", 
numRead == -1);
+   }
+   } catch (Exception e) {
+   System.err.println(e.getMessage());
+   e.printStackTrace();
+   fail("Test erroneous: " + e.getMessage());
+   }
+   }
+
+   @Test
+   public void testTailingEmptyField() {
+   try {
+   String[] tailingEmptyFieldStrings = new String[]{"a|", 
"a||"};
 
FieldParser parser = getParser();
 
-   for (String emptyString : emptyStrings) {
-   byte[] bytes = 
emptyString.getBytes(ConfigConstants.DEFAULT_CHARSET);
-   int numRead = parser.parseField(bytes, 0, 
bytes.length, new byte[]{'|'}, parser.createValue());
+   for (String tailingEmptyFieldString : 
tailingEmptyFieldStrings) {
+   byte[] bytes = 
tailingEmptyFieldString.getBytes(ConfigConstants.DEFAULT_CHARSET);
+   int numRead = parser.parseField(bytes, 1, 
bytes.length, new byte[]{'|'}, parser.createValue());
 

assertEquals(FieldParser.ParseErrorState.EMPTY_COLUMN, parser.getErrorState());
 
-   if(this.allowsEmptyField()) {
+   if (this.allowsEmptyField()) {
--- End diff --

We should remove the `allowsEmptyField()` method from `ParserTestBase`. All 
parsers have to support empty fields.


---


[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...

2017-12-31 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159136001
  
--- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
 ---
@@ -362,61 +365,95 @@ public void readStringFieldsWithTrailingDelimiters() 
throws Exception {
 
@Test
public void testTailingEmptyFields() throws Exception {
-   String fileContent = "abc|-def|-ghijk\n" +
-   "abc|-def|-\n" +
-   "abc|-|-\n" +
-   "|-|-|-\n" +
-   "|-|-\n" +
-   "abc|-def\n";
 
-   FileInputSplit split = createTempFile(fileContent);
-
-   TypeInformation[] fieldTypes = new TypeInformation[]{
-   BasicTypeInfo.STRING_TYPE_INFO,
-   BasicTypeInfo.STRING_TYPE_INFO,
-   BasicTypeInfo.STRING_TYPE_INFO};
-
-   RowCsvInputFormat format = new RowCsvInputFormat(PATH, 
fieldTypes, "\n", "|");
-   format.setFieldDelimiter("|-");
-   format.configure(new Configuration());
-   format.open(split);
-
-   Row result = new Row(3);
-
-   result = format.nextRecord(result);
-   assertNotNull(result);
-   assertEquals("abc", result.getField(0));
-   assertEquals("def", result.getField(1));
-   assertEquals("ghijk", result.getField(2));
+   List> dataList 
= new java.util.ArrayList<>();
+
+   // test String
+   dataList.add(new Tuple4<>(BasicTypeInfo.STRING_TYPE_INFO, 
"bdc", "bdc", ""));
+   // test BigInt
+   dataList.add(new Tuple4<>(BasicTypeInfo.BIG_INT_TYPE_INFO,
--- End diff --

I think we can extend this test but should not exhaustively test all types. 
It's the responsibility of the different `FieldParser` to implement the empty 
field logic correctly. Hence, this should be tested in the field parser tests 
and not here.


---


[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...

2017-12-30 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159134590
  
--- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
 ---
@@ -61,6 +62,84 @@
private static final String FIRST_PART = "That is the first part";
private static final String SECOND_PART = "That is the second part";
 
+   @Test
+   public void testNullValueWithoutTrailingDelimiter() throws Exception {
--- End diff --

About `RowCsvInputFormatTest.testEmptyFields()`do not check the tailing 
empty field, And in  `RowCsvInputFormatTest.testTailingEmptyFields`, test 
tailing empty field for STRING type. I think we shoud add 
DOUBLE/FLOAT/BIGINT/BIGDEC/SQL_DATE/SQL_TIME/SQL_TIMESTAMP test case as 
well.
What do you think?


---


[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...

2017-12-30 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159132545
  
--- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
 ---
@@ -61,6 +62,84 @@
private static final String FIRST_PART = "That is the first part";
private static final String SECOND_PART = "That is the second part";
 
+   @Test
+   public void testNullValueWithoutTrailingDelimiter() throws Exception {
--- End diff --

+1


---


[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...

2017-12-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159127981
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/types/parser/FieldParserTest.java ---
@@ -43,4 +43,38 @@ public void testEndsWithDelimiter() throws Exception {
assertFalse(FieldParser.endsWithDelimiter(bytes, 3, delim));
}
 
+   @Test
+   public void testNextStringEndPos() throws Exception {
+   byte[] bytes = "a|c".getBytes();
+   byte[] delim = "|".getBytes();
+   FieldParser parser = new TestFieldParser();
+
+   assertEquals(1, parser.nextStringEndPos(bytes, 0, 3, delim));
+   assertEquals(-1, parser.nextStringEndPos(bytes, 3, 3, delim));
--- End diff --

add an assertion that the error state was correctly set


---


[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...

2017-12-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159128212
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/types/parser/FieldParserTest.java ---
@@ -43,4 +43,38 @@ public void testEndsWithDelimiter() throws Exception {
assertFalse(FieldParser.endsWithDelimiter(bytes, 3, delim));
}
 
+   @Test
+   public void testNextStringEndPos() throws Exception {
--- End diff --

It would be good if the test would cover more corner cases for 
`nextStringEndPos()`, such as

- multi-char delimiter
- field with trailing delimiter
- field without training delimiter
- empty field

For example with test date like this

```
byte[] bytes1 = "field1|#|field2".getBytes();
byte[] bytes2 = "field3|#|".getBytes();

byte[] delim = "|#|".getBytes();
```


---


[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...

2017-12-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159127955
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/types/parser/FieldParserTest.java ---
@@ -43,4 +43,38 @@ public void testEndsWithDelimiter() throws Exception {
assertFalse(FieldParser.endsWithDelimiter(bytes, 3, delim));
}
 
+   @Test
+   public void testNextStringEndPos() throws Exception {
+   byte[] bytes = "a|c".getBytes();
+   byte[] delim = "|".getBytes();
+   FieldParser parser = new TestFieldParser();
+
+   assertEquals(1, parser.nextStringEndPos(bytes, 0, 3, delim));
+   assertEquals(-1, parser.nextStringEndPos(bytes, 3, 3, delim));
+   }
+
--- End diff --

remove line break


---


[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...

2017-12-30 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5218#discussion_r159128322
  
--- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
 ---
@@ -61,6 +62,84 @@
private static final String FIRST_PART = "That is the first part";
private static final String SECOND_PART = "That is the second part";
 
+   @Test
+   public void testNullValueWithoutTrailingDelimiter() throws Exception {
--- End diff --

We should test the individual parsers by extending `ParserTestBase` with 
tests that check for correct empty field behavior (EMPTY_COLUMN error state) 
with and without a following delimiter.

IMO, the null field behavior of `RowCsvInputFormat` is sufficiently tested 
by `RowCsvInputFormatTest.testEmptyFields()` such that this test does not need 
to be extended.


---


[GitHub] flink pull request #5218: [FLINK-8331][core] FieldParser do not correctly se...

2017-12-30 Thread sunjincheng121
GitHub user sunjincheng121 opened a pull request:

https://github.com/apache/flink/pull/5218

[FLINK-8331][core] FieldParser do not correctly set EMPT_COLUMN error state.

*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## What is the purpose of the change

*(In this PR fixed FieldParser do not correctly set EMPT_COLUMN error state 
bug, i.e. All FieldParser correctly return the EMPTY_COLUMN error state in case 
of an empty field.)*


## Brief change log
  - *Change `nextStringEndPos` method set EMPT_COLUMN error state logic.

## Verifying this change
This change added tests and can be verified as follows:

   - *Add a `testNextStringEndPos` test case in `FieldParserTest`.*
   - *Add a `testNullValueWithoutTrailingDelimiter` test casle in 
`RowCsvInputFormatTest`*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sunjincheng121/flink FLINK-8331-PR

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5218.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5218


commit 32d22ab681c5292d23818012242a16db0a2d1a8e
Author: 金竹 
Date:   2017-12-30T03:10:18Z

[FLINK-8331][core] fix FieldParser do not correctly set EMPT_COLUMN error 
state bug




---