Repository: flink
Updated Branches:
  refs/heads/master 32130160b -> 2f9a28ae1


[FLINK-3901] [table] Convert Java implementation to Scala and fix bugs

This closes #2283.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2f9a28ae
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2f9a28ae
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2f9a28ae

Branch: refs/heads/master
Commit: 2f9a28ae113f0e941b8bf3676ab84e1492fbf26f
Parents: c5d1d12
Author: twalthr <twal...@apache.org>
Authored: Fri Jul 22 11:26:43 2016 +0200
Committer: twalthr <twal...@apache.org>
Committed: Mon Jul 25 15:23:38 2016 +0200

----------------------------------------------------------------------
 .../flink/api/java/io/RowCsvInputFormat.java    |  143 ---
 .../plan/nodes/dataset/DataSetValues.scala      |    6 +-
 .../nodes/datastream/DataStreamValues.scala     |    2 +-
 .../api/table/runtime/ValuesInputFormat.scala   |   43 -
 .../table/runtime/io/RowCsvInputFormat.scala    |  177 +++
 .../table/runtime/io/ValuesInputFormat.scala    |   43 +
 .../api/table/sources/CsvTableSource.scala      |   30 +-
 .../api/java/io/RowCsvInputFormatTest.java      | 1086 ------------------
 .../runtime/io/RowCsvInputFormatTest.scala      |  841 ++++++++++++++
 9 files changed, 1080 insertions(+), 1291 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2f9a28ae/flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
 
b/flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
deleted file mode 100644
index a826a06..0000000
--- 
a/flink-libraries/flink-table/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHRow WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.io;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.io.ParseException;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.parser.FieldParser;
-import org.apache.flink.types.parser.FieldParser.ParseErrorState;
-
-@Internal
-public class RowCsvInputFormat extends CsvInputFormat<Row> {
-
-       private static final long serialVersionUID = 1L;
-
-       private int arity;
-
-       public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo) {
-               this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo);
-       }
-
-       public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo) {
-               this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, 
createDefaultMask(rowTypeInfo.getArity()));
-       }
-
-       public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, int[] 
includedFieldsMask) {
-               this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
-       }
-
-       public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
-                       int[] includedFieldsMask) {
-               this(filePath, lineDelimiter, fieldDelimiter, rowTypeInfo, 
(includedFieldsMask == null) ? createDefaultMask(rowTypeInfo.getArity())
-                               : toBooleanMask(includedFieldsMask));
-       }
-
-       public RowCsvInputFormat(Path filePath, RowTypeInfo rowTypeInfo, 
boolean[] includedFieldsMask) {
-               this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
rowTypeInfo, includedFieldsMask);
-       }
-
-       public RowCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, RowTypeInfo rowTypeInfo,
-                       boolean[] includedFieldsMask) {
-               super(filePath);
-               if (rowTypeInfo.getArity() == 0) {
-                       throw new IllegalArgumentException("Row arity must be 
greater than 0.");
-               }
-
-               if (includedFieldsMask == null) {
-                       includedFieldsMask = 
createDefaultMask(rowTypeInfo.getArity());
-               }
-
-               this.arity = rowTypeInfo.getArity();
-
-               setDelimiter(lineDelimiter);
-               setFieldDelimiter(fieldDelimiter);
-
-               Class<?>[] classes = new Class<?>[rowTypeInfo.getArity()];
-
-               for (int i = 0; i < rowTypeInfo.getArity(); i++) {
-                       classes[i] = rowTypeInfo.getTypeAt(i).getTypeClass();
-               }
-
-               setFieldsGeneric(includedFieldsMask, classes);
-       }
-
-
-       @Override
-       public Row fillRecord(Row reuse, Object[] parsedValues) {
-               if (reuse == null) {
-                       reuse = new Row(arity);
-               }
-               for (int i = 0; i < parsedValues.length; i++) {
-                       reuse.setField(i, parsedValues[i]);
-               }
-               return reuse;
-       }
-
-       @Override
-       protected boolean parseRecord(Object[] holders, byte[] bytes, int 
offset, int numBytes) throws ParseException {
-               boolean[] fieldIncluded = this.fieldIncluded;
-
-               int startPos = offset;
-               final int limit = offset + numBytes;
-
-               for (int field = 0, output = 0; field < fieldIncluded.length; 
field++) {
-
-                       // check valid start position
-                       if (startPos >= limit) {
-                               if (isLenient()) {
-                                       return false;
-                               } else {
-                                       throw new ParseException("Row too 
short: " + new String(bytes, offset, numBytes));
-                               }
-                       }
-
-                       if (fieldIncluded[field]) {
-                               // parse field
-                               @SuppressWarnings("unchecked")
-                               FieldParser<Object> parser = 
(FieldParser<Object>) this.getFieldParsers()[output];
-                               int latestValidPos = startPos;
-                               startPos = 
parser.resetErrorStateAndParse(bytes, startPos, limit, 
this.getFieldDelimiter(), holders[output]);
-                               if (!isLenient() && parser.getErrorState() != 
ParseErrorState.NONE) {
-                                       // Row is able to handle null values
-                                       if (parser.getErrorState() != 
ParseErrorState.EMPTY_STRING) {
-                                               throw new ParseException(
-                                                               
String.format("Parsing error for column %s of row '%s' originated by %s: %s.", 
field,
-                                                                               
new String(bytes, offset, numBytes),
-                                                                               
parser.getClass().getSimpleName(), parser.getErrorState()));
-                                       }
-                               }
-                               holders[output] = parser.getLastResult();
-
-                               // check parse result
-                               if (startPos < 0) {
-                                       holders[output] = null;
-                                       startPos = skipFields(bytes, 
latestValidPos, limit, this.getFieldDelimiter());
-                               }
-                               output++;
-                       } else {
-                               // skip field
-                               startPos = skipFields(bytes, startPos, limit, 
this.getFieldDelimiter());
-                       }
-               }
-               return true;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f9a28ae/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
index fbdc7d5..a31f199 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetValues.scala
@@ -19,14 +19,14 @@
 package org.apache.flink.api.table.plan.nodes.dataset
 
 import com.google.common.collect.ImmutableList
-import org.apache.calcite.plan.{RelTraitSet, RelOptCluster}
-import org.apache.calcite.rel.{RelWriter, RelNode}
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.Values
 import org.apache.calcite.rex.RexLiteral
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.DataSet
-import org.apache.flink.api.table.runtime.ValuesInputFormat
+import org.apache.flink.api.table.runtime.io.ValuesInputFormat
 import org.apache.flink.api.table.typeutils.RowTypeInfo
 import org.apache.flink.api.table.typeutils.TypeConverter._
 import org.apache.flink.api.table.{BatchTableEnvironment, Row}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f9a28ae/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
index 4a3a704..3ae19ac 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamValues.scala
@@ -25,8 +25,8 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.Values
 import org.apache.calcite.rex.RexLiteral
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.runtime.io.ValuesInputFormat
 import org.apache.flink.api.table.{Row, StreamTableEnvironment}
-import org.apache.flink.api.table.runtime.ValuesInputFormat
 import org.apache.flink.api.table.typeutils.RowTypeInfo
 import org.apache.flink.api.table.typeutils.TypeConverter._
 import org.apache.flink.streaming.api.datastream.DataStream

http://git-wip-us.apache.org/repos/asf/flink/blob/2f9a28ae/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ValuesInputFormat.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ValuesInputFormat.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ValuesInputFormat.scala
deleted file mode 100644
index 13b7fa0..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ValuesInputFormat.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.runtime
-
-import org.apache.flink.api.common.io.{NonParallelInput, GenericInputFormat}
-import org.apache.flink.api.table.Row
-
-class ValuesInputFormat(val rows: Seq[Row])
-  extends GenericInputFormat[Row]
-    with NonParallelInput {
-
-  var readIdx = 0
-
-  override def reachedEnd(): Boolean = readIdx == rows.size
-
-  override def nextRecord(reuse: Row): Row = {
-
-    if (readIdx == rows.size) {
-      return null
-    }
-
-    val outRow = rows(readIdx)
-    readIdx += 1
-
-    outRow
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f9a28ae/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala
new file mode 100644
index 0000000..1eb056c
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormat.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime.io
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.io.ParseException
+import org.apache.flink.api.java.io.CsvInputFormat
+import org.apache.flink.api.java.io.CsvInputFormat.{DEFAULT_FIELD_DELIMITER, 
DEFAULT_LINE_DELIMITER, createDefaultMask, toBooleanMask}
+import org.apache.flink.api.table.Row
+import 
org.apache.flink.api.table.runtime.io.RowCsvInputFormat.extractTypeClasses
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.core.fs.Path
+import org.apache.flink.types.parser.FieldParser
+import org.apache.flink.types.parser.FieldParser.ParseErrorState
+
+@Internal
+@SerialVersionUID(1L)
+class RowCsvInputFormat(
+    filePath: Path,
+    rowTypeInfo: RowTypeInfo,
+    lineDelimiter: String = DEFAULT_LINE_DELIMITER,
+    fieldDelimiter: String = DEFAULT_FIELD_DELIMITER,
+    includedFieldsMask: Array[Boolean] = null)
+  extends CsvInputFormat[Row](filePath) {
+
+  if (rowTypeInfo.getArity == 0) {
+    throw new IllegalArgumentException("Row arity must be greater than 0.")
+  }
+  private val arity = rowTypeInfo.getArity
+  private lazy val defaultFieldMask = createDefaultMask(arity)
+  private val fieldsMask = 
Option(includedFieldsMask).getOrElse(defaultFieldMask)
+
+  // prepare CsvInputFormat
+  setDelimiter(lineDelimiter)
+  setFieldDelimiter(fieldDelimiter)
+  setFieldsGeneric(fieldsMask, extractTypeClasses(rowTypeInfo))
+
+  def this(
+      filePath: Path,
+      rowTypeInfo: RowTypeInfo,
+      lineDelimiter: String,
+      fieldDelimiter: String,
+      includedFieldsMask: Array[Int]) {
+    this(
+      filePath,
+      rowTypeInfo,
+      lineDelimiter,
+      fieldDelimiter,
+      if (includedFieldsMask == null) {
+        null
+      } else {
+        toBooleanMask(includedFieldsMask)
+      })
+  }
+
+  def this(
+      filePath: Path,
+      rowTypeInfo: RowTypeInfo,
+      includedFieldsMask: Array[Int]) {
+    this(
+      filePath,
+      rowTypeInfo,
+      DEFAULT_LINE_DELIMITER,
+      DEFAULT_FIELD_DELIMITER,
+      includedFieldsMask)
+  }
+
+  def fillRecord(reuse: Row, parsedValues: Array[AnyRef]): Row = {
+    val reuseRow = if (reuse == null) {
+      new Row(arity)
+    } else {
+      reuse
+    }
+    var i: Int = 0
+    while (i < parsedValues.length) {
+      reuse.setField(i, parsedValues(i))
+      i += 1
+    }
+    reuseRow
+  }
+
+  @throws[ParseException]
+  override protected def parseRecord(
+      holders: Array[AnyRef],
+      bytes: Array[Byte],
+      offset: Int,
+      numBytes: Int)
+    : Boolean = {
+    val fieldDelimiter = this.getFieldDelimiter
+    val fieldIncluded: Array[Boolean] = this.fieldIncluded
+
+    var startPos = offset
+    val limit = offset + numBytes
+
+    var field = 0
+    var output = 0
+    while (field < fieldIncluded.length) {
+
+      // check valid start position
+      if (startPos >= limit) {
+        if (isLenient) {
+          return false
+        } else {
+          throw new ParseException("Row too short: " + new String(bytes, 
offset, numBytes))
+        }
+      }
+
+      if (fieldIncluded(field)) {
+        // parse field
+        val parser: FieldParser[AnyRef] = this.getFieldParsers()(output)
+          .asInstanceOf[FieldParser[AnyRef]]
+        val latestValidPos = startPos
+        startPos = parser.resetErrorStateAndParse(
+          bytes,
+          startPos,
+          limit,
+          fieldDelimiter,
+          holders(output))
+
+        if (!isLenient && (parser.getErrorState ne ParseErrorState.NONE)) {
+          // Row is able to handle null values
+          if (parser.getErrorState ne ParseErrorState.EMPTY_STRING) {
+            throw new ParseException(s"Parsing error for column $field of row 
'"
+              + new String(bytes, offset, numBytes)
+              + s"' originated by ${parser.getClass.getSimpleName}: 
${parser.getErrorState}.")
+          }
+        }
+        holders(output) = parser.getLastResult
+
+        // check parse result
+        if (startPos < 0) {
+          holders(output) = null
+          startPos = skipFields(bytes, latestValidPos, limit, fieldDelimiter)
+        }
+        output += 1
+      } else {
+        // skip field
+        startPos = skipFields(bytes, startPos, limit, fieldDelimiter)
+      }
+
+      // check if something went wrong
+      if (startPos < 0) {
+        throw new ParseException(s"Unexpected parser position for column 
$field of row '"
+          + new String(bytes, offset, numBytes) + "'")
+      }
+
+      field += 1
+    }
+    true
+  }
+}
+
+object RowCsvInputFormat {
+
+  private def extractTypeClasses(rowTypeInfo: RowTypeInfo): Array[Class[_]] = {
+    val classes = for (i <- 0 until rowTypeInfo.getArity)
+      yield rowTypeInfo.getTypeAt(i).getTypeClass
+    classes.toArray
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f9a28ae/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala
new file mode 100644
index 0000000..5e0a466
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/io/ValuesInputFormat.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime.io
+
+import org.apache.flink.api.common.io.{GenericInputFormat, NonParallelInput}
+import org.apache.flink.api.table.Row
+
+class ValuesInputFormat(val rows: Seq[Row])
+  extends GenericInputFormat[Row]
+    with NonParallelInput {
+
+  var readIdx = 0
+
+  override def reachedEnd(): Boolean = readIdx == rows.size
+
+  override def nextRecord(reuse: Row): Row = {
+
+    if (readIdx == rows.size) {
+      return null
+    }
+
+    val outRow = rows(readIdx)
+    readIdx += 1
+
+    outRow
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f9a28ae/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
index 44796b2..54d7718 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/sources/CsvTableSource.scala
@@ -19,14 +19,12 @@
 package org.apache.flink.api.table.sources
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.io.TupleCsvInputFormat
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TupleTypeInfoBase}
+import org.apache.flink.api.java.io.CsvInputFormat
 import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
-import org.apache.flink.api.table.Row
-import org.apache.flink.core.fs.Path
+import org.apache.flink.api.table.{Row, TableException}
+import org.apache.flink.api.table.runtime.io.RowCsvInputFormat
 import org.apache.flink.api.table.typeutils.RowTypeInfo
-import org.apache.flink.api.java.io.RowCsvInputFormat
+import org.apache.flink.core.fs.Path
 
 /**
   * A [[TableSource]] for simple CSV files with a (logically) unlimited number 
of fields.
@@ -45,19 +43,23 @@ class CsvTableSource(
     path: String,
     fieldNames: Array[String],
     fieldTypes: Array[TypeInformation[_]],
-    fieldDelim: String = ",",
-    rowDelim: String = "\n",
+    fieldDelim: String = CsvInputFormat.DEFAULT_FIELD_DELIMITER,
+    rowDelim: String = CsvInputFormat.DEFAULT_LINE_DELIMITER,
     quoteCharacter: Character = null,
     ignoreFirstLine: Boolean = false,
     ignoreComments: String = null,
     lenient: Boolean = false)
   extends BatchTableSource[Row] {
 
+  if (fieldNames.length != fieldTypes.length) {
+    throw TableException("Number of field names and field types must be 
equal.")
+  }
+
+  private val returnType = new RowTypeInfo(fieldTypes)
+
   /** Returns the data of the table as a [[DataSet]] of [[Row]]. */
   override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
-
-    val typeInfo = getReturnType.asInstanceOf[RowTypeInfo]
-    val inputFormat = new RowCsvInputFormat(new Path(path), rowDelim, 
fieldDelim, typeInfo)
+    val inputFormat = new RowCsvInputFormat(new Path(path), returnType, 
rowDelim, fieldDelim)
 
     inputFormat.setSkipFirstLineAsHeader(ignoreFirstLine)
     inputFormat.setLenient(lenient)
@@ -68,7 +70,7 @@ class CsvTableSource(
       inputFormat.setCommentPrefix(ignoreComments)
     }
 
-    execEnv.createInput(inputFormat, typeInfo)
+    execEnv.createInput(inputFormat, returnType)
   }
 
   /** Returns the types of the table fields. */
@@ -81,7 +83,5 @@ class CsvTableSource(
   override def getNumberOfFields: Int = fieldNames.length
 
   /** Returns the [[RowTypeInfo]] for the return type of the 
[[CsvTableSource]]. */
-  override def getReturnType: RowTypeInfo = {
-    new RowTypeInfo(fieldTypes)
-  }
+  override def getReturnType: RowTypeInfo = returnType
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2f9a28ae/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
deleted file mode 100644
index 0ab9453..0000000
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
+++ /dev/null
@@ -1,1086 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.io;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.io.ParseException;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.typeutils.RowTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.parser.FieldParser;
-import org.apache.flink.types.parser.StringParser;
-import org.junit.Test;
-
-public class RowCsvInputFormatTest {
-
-       private static final Path PATH = new Path("an/ignored/file/");
-
-       //Static variables for testing the removal of \r\n to \n
-       private static final String FIRST_PART = "That is the first part";
-
-       private static final String SECOND_PART = "That is the second part";
-
-       @Test
-       public void ignoreInvalidLines() {
-               try {
-                       String fileContent = "#description of the data\n" +
-                                       "header1|header2|header3|\n"+
-                                       "this is|1|2.0|\n"+
-                                       "//a comment\n" +
-                                       "a test|3|4.0|\n" +
-                                       "#next|5|6.0|\n";
-
-                       FileInputSplit split = createTempFile(fileContent);
-
-                       RowTypeInfo typeInfo = new RowTypeInfo(new 
TypeInformation<?>[] {
-                               BasicTypeInfo.STRING_TYPE_INFO,
-                               BasicTypeInfo.INT_TYPE_INFO,
-                               BasicTypeInfo.DOUBLE_TYPE_INFO
-                       });
-                       CsvInputFormat<Row> format = new 
RowCsvInputFormat(PATH, "\n", "|", typeInfo);
-                       format.setLenient(false);
-
-                       Configuration parameters = new Configuration();
-                       format.configure(parameters);
-                       format.open(split);
-
-                       Row result = new Row(3);
-
-                       try {
-                               result = format.nextRecord(result);
-                               fail("Parse Exception was not thrown! (Row too 
short)");
-                       } catch (ParseException ex) {
-                       }
-
-                       try {
-                               result = format.nextRecord(result);
-                               fail("Parse Exception was not thrown! (Invalid 
int value)");
-                       } catch (ParseException ex) {
-                       }
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals("this is", result.productElement(0));
-                       assertEquals(new Integer(1), result.productElement(1));
-                       assertEquals(new Double(2.0), result.productElement(2));
-
-                       try {
-                               result = format.nextRecord(result);
-                               fail("Parse Exception was not thrown! (Row too 
short)");
-                       } catch (ParseException ex) {
-                       }
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals("a test", result.productElement(0));
-                       assertEquals(new Integer(3), result.productElement(1));
-                       assertEquals(new Double(4.0), result.productElement(2));
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals("#next", result.productElement(0));
-                       assertEquals(new Integer(5), result.productElement(1));
-                       assertEquals(new Double(6.0), result.productElement(2));
-
-                       result = format.nextRecord(result);
-                       assertNull(result);
-
-                       //re-open with lenient = true
-                       format.setLenient(true);
-                       format.configure(parameters);
-                       format.open(split);
-
-                       result = new Row(3);
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals("header1", result.productElement(0));
-                       assertNull(result.productElement(1));
-                       assertNull(result.productElement(2));
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals("this is", result.productElement(0));
-                       assertEquals(new Integer(1), result.productElement(1));
-                       assertEquals(new Double(2.0), result.productElement(2));
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals("a test", result.productElement(0));
-                       assertEquals(new Integer(3), result.productElement(1));
-                       assertEquals(new Double(4.0), result.productElement(2));
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals("#next", result.productElement(0));
-                       assertEquals(new Integer(5), result.productElement(1));
-                       assertEquals(new Double(6.0), result.productElement(2));
-
-                       result = format.nextRecord(result);
-                       assertNull(result);
-               }
-               catch (Exception ex) {
-                       ex.printStackTrace();
-                       fail("Test failed due to a " + ex.getClass().getName() 
+ ": " + ex.getMessage());
-               }
-       }
-
-       @Test
-       public void ignoreSingleCharPrefixComments() {
-               try {
-                       final String fileContent =
-                                       "#description of the data\n" +
-                                       "#successive commented line\n" +
-                                       "this is|1|2.0|\n" +
-                                       "a test|3|4.0|\n" +
-                                       "#next|5|6.0|\n";
-
-                       FileInputSplit split = createTempFile(fileContent);
-
-                       RowTypeInfo typeInfo = new RowTypeInfo(new 
TypeInformation<?>[] {
-                               BasicTypeInfo.STRING_TYPE_INFO,
-                               BasicTypeInfo.INT_TYPE_INFO,
-                               BasicTypeInfo.DOUBLE_TYPE_INFO });
-                       CsvInputFormat<Row> format = new 
RowCsvInputFormat(PATH, "\n", "|", typeInfo);
-                       format.setCommentPrefix("#");
-
-                       Configuration parameters = new Configuration();
-                       format.configure(parameters);
-                       format.open(split);
-
-                       Row result = new Row(3);
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals("this is", result.productElement(0));
-                       assertEquals(new Integer(1), result.productElement(1));
-                       assertEquals(new Double(2.0), result.productElement(2));
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals("a test", result.productElement(0));
-                       assertEquals(new Integer(3), result.productElement(1));
-                       assertEquals(new Double(4.0), result.productElement(2));
-
-                       result = format.nextRecord(result);
-                       assertNull(result);
-               }
-               catch (Exception ex) {
-                       ex.printStackTrace();
-                       fail("Test failed due to a " + ex.getClass().getName() 
+ ": " + ex.getMessage());
-               }
-       }
-
-       @Test
-       public void ignoreMultiCharPrefixComments() {
-               try {
-
-
-                       final String fileContent = "//description of the 
data\n" +
-                                       "//successive commented line\n" +
-                                       "this is|1|2.0|\n"+
-                                       "a test|3|4.0|\n" +
-                                       "//next|5|6.0|\n";
-
-                       final FileInputSplit split = 
createTempFile(fileContent);
-
-                       final RowTypeInfo typeInfo = new RowTypeInfo(new 
TypeInformation<?>[] {
-                               BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO });
-                       final CsvInputFormat<Row> format = new 
RowCsvInputFormat(PATH, "\n", "|", typeInfo);
-                       format.setCommentPrefix("//");
-
-                       final Configuration parameters = new Configuration();
-                       format.configure(parameters);
-                       format.open(split);
-
-                       Row result = new Row(3);
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals("this is", result.productElement(0));
-                       assertEquals(new Integer(1), result.productElement(1));
-                       assertEquals(new Double(2.0), result.productElement(2));
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals("a test", result.productElement(0));
-                       assertEquals(new Integer(3), result.productElement(1));
-                       assertEquals(new Double(4.0), result.productElement(2));
-
-                       result = format.nextRecord(result);
-                       assertNull(result);
-               }
-               catch (Exception ex) {
-                       ex.printStackTrace();
-                       fail("Test failed due to a " + ex.getClass().getName() 
+ ": " + ex.getMessage());
-               }
-       }
-
-       @Test
-       public void readStringFields() {
-               try {
-                       String fileContent = "abc|def|ghijk\nabc||hhg\n|||";
-                       FileInputSplit split = createTempFile(fileContent);
-
-                       RowTypeInfo typeInfo = new RowTypeInfo(new 
TypeInformation<?>[] {
-                               BasicTypeInfo.STRING_TYPE_INFO,
-                               BasicTypeInfo.STRING_TYPE_INFO,
-                               BasicTypeInfo.STRING_TYPE_INFO
-                       });
-                       CsvInputFormat<Row> format = new 
RowCsvInputFormat(PATH, "\n", "|", typeInfo);
-
-                       final Configuration parameters = new Configuration();
-                       format.configure(parameters);
-                       format.open(split);
-
-                       Row result = new Row(3);
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals("abc", result.productElement(0));
-                       assertEquals("def", result.productElement(1));
-                       assertEquals("ghijk", result.productElement(2));
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals("abc", result.productElement(0));
-                       assertEquals("", result.productElement(1));
-                       assertEquals("hhg", result.productElement(2));
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals("", result.productElement(0));
-                       assertEquals("", result.productElement(1));
-                       assertEquals("", result.productElement(2));
-
-                       result = format.nextRecord(result);
-                       assertNull(result);
-                       assertTrue(format.reachedEnd());
-               }
-               catch (Exception ex) {
-                       ex.printStackTrace();
-                       fail("Test failed due to a " + ex.getClass().getName() 
+ ": " + ex.getMessage());
-               }
-       }
-
-       @Test
-       public void readMixedQuotedStringFields() {
-               try {
-                       String fileContent = 
"@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||";
-                       FileInputSplit split = createTempFile(fileContent);
-
-                       RowTypeInfo typeInfo = new RowTypeInfo(new 
TypeInformation<?>[] {
-                               BasicTypeInfo.STRING_TYPE_INFO,
-                               BasicTypeInfo.STRING_TYPE_INFO,
-                               BasicTypeInfo.STRING_TYPE_INFO
-                       });
-                       CsvInputFormat<Row> format = new 
RowCsvInputFormat(PATH, "\n", "|", typeInfo);
-
-                       Configuration parameters = new Configuration();
-                       format.configure(parameters);
-                       format.enableQuotedStringParsing('@');
-                       format.open(split);
-
-                       Row result = new Row(3);
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals("a|b|c", result.productElement(0));
-                       assertEquals("def", result.productElement(1));
-                       assertEquals("ghijk", result.productElement(2));
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals("abc", result.productElement(0));
-                       assertEquals("", result.productElement(1));
-                       assertEquals("|hhg", result.productElement(2));
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals("", result.productElement(0));
-                       assertEquals("", result.productElement(1));
-                       assertEquals("", result.productElement(2));
-
-                       result = format.nextRecord(result);
-                       assertNull(result);
-                       assertTrue(format.reachedEnd());
-               }
-               catch (Exception ex) {
-                       ex.printStackTrace();
-                       fail("Test failed due to a " + ex.getClass().getName() 
+ ": " + ex.getMessage());
-               }
-       }
-
-       @Test
-       public void readStringFieldsWithTrailingDelimiters() {
-               try {
-                       String fileContent = 
"abc|-def|-ghijk\nabc|-|-hhg\n|-|-|-\n";
-                       FileInputSplit split = createTempFile(fileContent);
-
-                       RowTypeInfo typeInfo = new RowTypeInfo(new 
TypeInformation<?>[] {
-                               BasicTypeInfo.STRING_TYPE_INFO,
-                               BasicTypeInfo.STRING_TYPE_INFO,
-                               BasicTypeInfo.STRING_TYPE_INFO
-                       });
-                       CsvInputFormat<Row> format = new 
RowCsvInputFormat(PATH, "\n", "|", typeInfo);
-
-                       format.setFieldDelimiter("|-");
-
-                       format.configure(new Configuration());
-                       format.open(split);
-
-                       Row result = new Row(3);
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals("abc", result.productElement(0));
-                       assertEquals("def", result.productElement(1));
-                       assertEquals("ghijk", result.productElement(2));
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals("abc", result.productElement(0));
-                       assertEquals("", result.productElement(1));
-                       assertEquals("hhg", result.productElement(2));
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals("", result.productElement(0));
-                       assertEquals("", result.productElement(1));
-                       assertEquals("", result.productElement(2));
-
-                       result = format.nextRecord(result);
-                       assertNull(result);
-                       assertTrue(format.reachedEnd());
-               }
-               catch (Exception ex) {
-                       fail("Test failed due to a " + ex.getClass().getName() 
+ ": " + ex.getMessage());
-               }
-       }
-
-       @Test
-       public void testIntegerFields() throws IOException {
-               try {
-                       String fileContent = 
"111|222|333|444|555\n666|777|888|999|000|\n";
-                       FileInputSplit split = createTempFile(fileContent);
-
-                       RowTypeInfo typeInfo = new RowTypeInfo(
-                                       new TypeInformation<?>[] {
-                                               BasicTypeInfo.INT_TYPE_INFO,
-                                               BasicTypeInfo.INT_TYPE_INFO,
-                                               BasicTypeInfo.INT_TYPE_INFO,
-                                               BasicTypeInfo.INT_TYPE_INFO,
-                                               BasicTypeInfo.INT_TYPE_INFO
-                                       });
-                       CsvInputFormat<Row> format = new 
RowCsvInputFormat(PATH, "\n", "|", typeInfo);
-
-                       format.setFieldDelimiter("|");
-
-                       format.configure(new Configuration());
-                       format.open(split);
-
-                       Row result = new Row(5);
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals(Integer.valueOf(111), 
result.productElement(0));
-                       assertEquals(Integer.valueOf(222), 
result.productElement(1));
-                       assertEquals(Integer.valueOf(333), 
result.productElement(2));
-                       assertEquals(Integer.valueOf(444), 
result.productElement(3));
-                       assertEquals(Integer.valueOf(555), 
result.productElement(4));
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals(Integer.valueOf(666), 
result.productElement(0));
-                       assertEquals(Integer.valueOf(777), 
result.productElement(1));
-                       assertEquals(Integer.valueOf(888), 
result.productElement(2));
-                       assertEquals(Integer.valueOf(999), 
result.productElement(3));
-                       assertEquals(Integer.valueOf(000), 
result.productElement(4));
-
-                       result = format.nextRecord(result);
-                       assertNull(result);
-                       assertTrue(format.reachedEnd());
-               }
-               catch (Exception ex) {
-                       fail("Test failed due to a " + ex.getClass().getName() 
+ ": " + ex.getMessage());
-               }
-       }
-
-       @Test
-       public void testEmptyFields() throws IOException {
-               try{
-                       String fileContent =
-                                       "|0|0|0|0|0|\n" +
-                                       "1||1|1|1|1|\n" +
-                                       "2|2||2|2|2|\n" +
-                                       "3|3|3||3|3|\n" +
-                                       "4|4|4|4||4|\n" +
-                                       "5|5|5|5|5||\n";
-
-                       FileInputSplit split = createTempFile(fileContent);
-
-                       //TODO: FLOAT_TYPE_INFO and DOUBLE_TYPE_INFO don't 
handle correctly null values
-                       RowTypeInfo typeInfo = new RowTypeInfo(new 
TypeInformation<?>[] {
-                               BasicTypeInfo.SHORT_TYPE_INFO,
-                               BasicTypeInfo.INT_TYPE_INFO,
-                               BasicTypeInfo.LONG_TYPE_INFO,
-                               BasicTypeInfo.INT_TYPE_INFO,
-//                             BasicTypeInfo.FLOAT_TYPE_INFO,
-                               BasicTypeInfo.INT_TYPE_INFO,
-//                             BasicTypeInfo.DOUBLE_TYPE_INFO,
-                               BasicTypeInfo.BYTE_TYPE_INFO
-                       });
-                       CsvInputFormat<Row> format = new 
RowCsvInputFormat(PATH, typeInfo);
-
-                       format.setFieldDelimiter("|");
-
-                       format.configure(new Configuration());
-                       format.open(split);
-
-                       Row result = new Row(6);
-                       int linesCnt = fileContent.split("\n").length;
-
-                       for (int i = 0; i < linesCnt; i++) {
-                               result = format.nextRecord(result);
-                               assertNull(result.productElement(i));
-                       }
-
-                       //ensure no more rows
-                       assertNull(format.nextRecord(result));
-                       assertTrue(format.reachedEnd());
-               } catch (Exception ex) {
-                       fail("Test failed due to a " + ex.getClass().getName() 
+ ": " + ex.getMessage());
-               }
-       }
-
-       @Test
-       public void testDoubleFields() throws IOException {
-               try {
-                       String fileContent = 
"11.1|22.2|33.3|44.4|55.5\n66.6|77.7|88.8|99.9|00.0|\n";
-                       FileInputSplit split = createTempFile(fileContent);
-
-                       RowTypeInfo typeInfo = new RowTypeInfo(new 
TypeInformation<?>[] {
-                               BasicTypeInfo.DOUBLE_TYPE_INFO,
-                               BasicTypeInfo.DOUBLE_TYPE_INFO,
-                               BasicTypeInfo.DOUBLE_TYPE_INFO,
-                               BasicTypeInfo.DOUBLE_TYPE_INFO,
-                               BasicTypeInfo.DOUBLE_TYPE_INFO
-                       });
-                       CsvInputFormat<Row> format = new 
RowCsvInputFormat(PATH, typeInfo);
-
-                       format.setFieldDelimiter("|");
-
-                       format.configure(new Configuration());
-                       format.open(split);
-
-                       Row result = new Row(5);
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals(Double.valueOf(11.1), 
result.productElement(0));
-                       assertEquals(Double.valueOf(22.2), 
result.productElement(1));
-                       assertEquals(Double.valueOf(33.3), 
result.productElement(2));
-                       assertEquals(Double.valueOf(44.4), 
result.productElement(3));
-                       assertEquals(Double.valueOf(55.5), 
result.productElement(4));
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals(Double.valueOf(66.6), 
result.productElement(0));
-                       assertEquals(Double.valueOf(77.7), 
result.productElement(1));
-                       assertEquals(Double.valueOf(88.8), 
result.productElement(2));
-                       assertEquals(Double.valueOf(99.9), 
result.productElement(3));
-                       assertEquals(Double.valueOf(00.0), 
result.productElement(4));
-
-                       result = format.nextRecord(result);
-                       assertNull(result);
-                       assertTrue(format.reachedEnd());
-               }
-               catch (Exception ex) {
-                       fail("Test failed due to a " + ex.getClass().getName() 
+ ": " + ex.getMessage());
-               }
-       }
-
-       @Test
-       public void testReadFirstN() throws IOException {
-               try {
-                       final String fileContent = 
"111|222|333|444|555|\n666|777|888|999|000|\n";
-                       final FileInputSplit split = 
createTempFile(fileContent);
-
-                       final RowTypeInfo typeInfo = new RowTypeInfo(
-                                       new TypeInformation<?>[] {
-                                               BasicTypeInfo.INT_TYPE_INFO,
-                                               BasicTypeInfo.INT_TYPE_INFO
-                                       });
-                       CsvInputFormat<Row> format = new 
RowCsvInputFormat(PATH, typeInfo);
-
-                       format.setFieldDelimiter("|");
-
-                       format.configure(new Configuration());
-                       format.open(split);
-
-                       Row result = new Row(2);
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals(Integer.valueOf(111), 
result.productElement(0));
-                       assertEquals(Integer.valueOf(222), 
result.productElement(1));
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals(Integer.valueOf(666), 
result.productElement(0));
-                       assertEquals(Integer.valueOf(777), 
result.productElement(1));
-
-                       result = format.nextRecord(result);
-                       assertNull(result);
-                       assertTrue(format.reachedEnd());
-               }
-               catch (Exception ex) {
-                       fail("Test failed due to a " + ex.getClass().getName() 
+ ": " + ex.getMessage());
-               }
-
-       }
-
-       @Test
-       public void testReadSparseWithNullFieldsForTypes() throws IOException {
-               try {
-                       String fileContent = 
"111|x|222|x|333|x|444|x|555|x|666|x|777|x|888|x|999|x|000|x|\n" +
-                                       
"000|x|999|x|888|x|777|x|666|x|555|x|444|x|333|x|222|x|111|x|";
-                       FileInputSplit split = createTempFile(fileContent);
-
-                       RowTypeInfo typeInfo = new RowTypeInfo(new 
TypeInformation<?>[] {
-                               BasicTypeInfo.INT_TYPE_INFO,
-                               BasicTypeInfo.INT_TYPE_INFO,
-                               BasicTypeInfo.INT_TYPE_INFO
-                       });
-                       CsvInputFormat<Row> format = new 
RowCsvInputFormat(PATH, typeInfo,
-                                       new boolean[] { true, false, false, 
true, false, false, false, true });
-
-                       format.setFieldDelimiter("|x|");
-
-                       format.setFieldDelimiter("|x|");
-
-                       format.configure(new Configuration());
-                       format.open(split);
-
-                       Row result = new Row(3);
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals(Integer.valueOf(111), 
result.productElement(0));
-                       assertEquals(Integer.valueOf(444), 
result.productElement(1));
-                       assertEquals(Integer.valueOf(888), 
result.productElement(2));
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals(Integer.valueOf(000), 
result.productElement(0));
-                       assertEquals(Integer.valueOf(777), 
result.productElement(1));
-                       assertEquals(Integer.valueOf(333), 
result.productElement(2));
-
-                       result = format.nextRecord(result);
-                       assertNull(result);
-                       assertTrue(format.reachedEnd());
-               }
-               catch (Exception ex) {
-                       fail("Test failed due to a " + ex.getClass().getName() 
+ ": " + ex.getMessage());
-               }
-       }
-
-       @Test
-       public void testReadSparseWithPositionSetter() throws IOException {
-               try {
-                       String fileContent = 
"111|222|333|444|555|666|777|888|999|000|\n000|999|888|777|666|555|444|333|222|111|";
-                       FileInputSplit split = createTempFile(fileContent);
-
-                       RowTypeInfo typeInfo = new RowTypeInfo(new 
TypeInformation<?>[] {
-                               BasicTypeInfo.INT_TYPE_INFO,
-                               BasicTypeInfo.INT_TYPE_INFO,
-                               BasicTypeInfo.INT_TYPE_INFO
-                       });
-                       CsvInputFormat<Row> format = new 
RowCsvInputFormat(PATH, typeInfo, new int[] { 0, 3, 7 });
-
-                       format.setFieldDelimiter("|");
-
-                       format.configure(new Configuration());
-                       format.open(split);
-
-                       Row result = new Row(3);
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals(Integer.valueOf(111), 
result.productElement(0));
-                       assertEquals(Integer.valueOf(444), 
result.productElement(1));
-                       assertEquals(Integer.valueOf(888), 
result.productElement(2));
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals(Integer.valueOf(000), 
result.productElement(0));
-                       assertEquals(Integer.valueOf(777), 
result.productElement(1));
-                       assertEquals(Integer.valueOf(333), 
result.productElement(2));
-
-                       result = format.nextRecord(result);
-                       assertNull(result);
-                       assertTrue(format.reachedEnd());
-               }
-               catch (Exception ex) {
-                       fail("Test failed due to a " + ex.getClass().getName() 
+ ": " + ex.getMessage());
-               }
-       }
-
-       @Test
-       public void testReadSparseWithMask() throws IOException {
-               try {
-                       String fileContent =
-                                       
"111&&222&&333&&444&&555&&666&&777&&888&&999&&000&&\n" +
-                                       
"000&&999&&888&&777&&666&&555&&444&&333&&222&&111&&";
-                       FileInputSplit split = createTempFile(fileContent);
-
-                       RowTypeInfo typeInfo = new RowTypeInfo(new 
TypeInformation<?>[] {
-                               BasicTypeInfo.INT_TYPE_INFO,
-                               BasicTypeInfo.INT_TYPE_INFO,
-                               BasicTypeInfo.INT_TYPE_INFO
-                       });
-                       CsvInputFormat<Row> format = new 
RowCsvInputFormat(PATH, typeInfo,
-                                       new boolean[] { true, false, false, 
true, false, false, false, true });
-
-                       format.setFieldDelimiter("&&");
-
-                       format.configure(new Configuration());
-                       format.open(split);
-
-                       Row result = new Row(3);
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals(Integer.valueOf(111), 
result.productElement(0));
-                       assertEquals(Integer.valueOf(444), 
result.productElement(1));
-                       assertEquals(Integer.valueOf(888), 
result.productElement(2));
-
-                       result = format.nextRecord(result);
-                       assertNotNull(result);
-                       assertEquals(Integer.valueOf(000), 
result.productElement(0));
-                       assertEquals(Integer.valueOf(777), 
result.productElement(1));
-                       assertEquals(Integer.valueOf(333), 
result.productElement(2));
-
-                       result = format.nextRecord(result);
-                       assertNull(result);
-                       assertTrue(format.reachedEnd());
-               }
-               catch (Exception ex) {
-                       fail("Test failed due to a " + ex.getClass().getName() 
+ ": " + ex.getMessage());
-               }
-       }
-
-       @Test
-       public void testParseStringErrors() throws Exception {
-               StringParser stringParser = new StringParser();
-               stringParser.enableQuotedStringParsing((byte)'"');
-
-               Object[][] failures = {
-                               {"\"string\" trailing", 
FieldParser.ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING},
-                               {"\"unterminated ", 
FieldParser.ParseErrorState.UNTERMINATED_QUOTED_STRING}
-               };
-
-               for (Object[] failure : failures) {
-                       String input = (String) failure[0];
-
-                       int result = stringParser.parseField(input.getBytes(), 
0, input.length(), new byte[]{'|'}, null);
-
-                       assertThat(result, is(-1));
-                       assertThat(stringParser.getErrorState(), 
is(failure[1]));
-               }
-
-
-       }
-
-       // Test disabled becase we do not support double-quote escaped quotes 
right now.
-       // @Test
-       public void testParserCorrectness() throws Exception {
-               // RFC 4180 Compliance Test content
-               // Taken from 
http://en.wikipedia.org/wiki/Comma-separated_values#Example
-               String fileContent =
-                       "Year,Make,Model,Description,Price\n" +
-                       "1997,Ford,E350,\"ac, abs, moon\",3000.00\n" +
-                       "1999,Chevy,\"Venture \"\"Extended 
Edition\"\"\",\"\",4900.00\n" +
-                       "1996,Jeep,Grand Cherokee,\"MUST SELL! air, moon roof, 
loaded\",4799.00\n" +
-                       "1999,Chevy,\"Venture \"\"Extended Edition, Very 
Large\"\"\",,5000.00\n" +
-                       ",,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00";
-
-               FileInputSplit split = createTempFile(fileContent);
-
-               RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] 
{
-                       BasicTypeInfo.INT_TYPE_INFO,
-                       BasicTypeInfo.STRING_TYPE_INFO,
-                       BasicTypeInfo.STRING_TYPE_INFO,
-                       BasicTypeInfo.STRING_TYPE_INFO,
-                       BasicTypeInfo.DOUBLE_TYPE_INFO
-               });
-               CsvInputFormat<Row> format = new RowCsvInputFormat(PATH, 
typeInfo);
-
-               format.setSkipFirstLineAsHeader(true);
-               format.setFieldDelimiter(',');
-
-               format.configure(new Configuration());
-               format.open(split);
-
-               Row result = new Row(5);
-
-               Row r1 = new Row(5);
-               r1.setField(0, 1997);
-               r1.setField(1, "Ford");
-               r1.setField(2, "E350");
-               r1.setField(3, "ac, abs, moon");
-               r1.setField(4, 3000.0);
-               Row r2 = new Row(5);
-               r2.setField(0, 1999);
-               r2.setField(1, "Chevy");
-               r2.setField(2, "Venture \"Extended Edition\"");
-               r2.setField(3, "");
-               r2.setField(4, 4900.0);
-               Row r3 = new Row(5);
-               r3.setField(0, 1996);
-               r3.setField(1, "Jeep");
-               r3.setField(2, "Grand Cherokee");
-               r3.setField(3, "MUST SELL! air, moon roof, loaded");
-               r3.setField(4, 4799.0);
-               Row r4 = new Row(5);
-               r4.setField(0, 1999);
-               r4.setField(1, "Chevy");
-               r4.setField(2, "Venture \"Extended Edition, Very Large\"");
-               r4.setField(3, "");
-               r4.setField(4, 5000.0);
-               Row r5 = new Row(5);
-               r5.setField(0, 0);
-               r5.setField(1, "");
-               r5.setField(2, "Venture \"Extended Edition\"");
-               r5.setField(3, "");
-               r5.setField(4, 4900.0);
-
-               Row[] expectedLines = new Row[] { r1, r2, r3, r4, r5 };
-               try {
-                       for (Row expected : expectedLines) {
-                               result = format.nextRecord(result);
-                               assertEquals(expected, result);
-                       }
-
-                       assertNull(format.nextRecord(result));
-                       assertTrue(format.reachedEnd());
-
-               } catch (Exception ex) {
-                       fail("Test failed due to a " + ex.getClass().getName() 
+ ": " + ex.getMessage());
-               }
-
-       }
-
-       private FileInputSplit createTempFile(String content) throws 
IOException {
-               File tempFile = File.createTempFile("test_contents", "tmp");
-               tempFile.deleteOnExit();
-
-               OutputStreamWriter wrt = new OutputStreamWriter(new 
FileOutputStream(tempFile), StandardCharsets.UTF_8);
-               wrt.write(content);
-               wrt.close();
-
-               return new FileInputSplit(0, new 
Path(tempFile.toURI().toString()), 0, tempFile.length(), new String[] 
{"localhost"});
-       }
-
-       @Test
-       public void testWindowsLineEndRemoval() {
-
-               //Check typical use case -- linux file is correct and it is set 
up to linuc(\n)
-               this.testRemovingTrailingCR("\n", "\n");
-
-               //Check typical windows case -- windows file endings and file 
has windows file endings set up
-               this.testRemovingTrailingCR("\r\n", "\r\n");
-
-               //Check problematic case windows file -- windows file 
endings(\r\n) but linux line endings (\n) set up
-               this.testRemovingTrailingCR("\r\n", "\n");
-
-               //Check problematic case linux file -- linux file endings (\n) 
but windows file endings set up (\r\n)
-               //Specific setup for windows line endings will expect \r\n 
because it has to be set up and is not standard.
-       }
-
-       private void testRemovingTrailingCR(String lineBreakerInFile, String 
lineBreakerSetup) {
-               File tempFile=null;
-
-               String fileContent = FIRST_PART + lineBreakerInFile + 
SECOND_PART + lineBreakerInFile;
-
-               try {
-                       // create input file
-                       tempFile = File.createTempFile("CsvInputFormatTest", 
"tmp");
-                       tempFile.deleteOnExit();
-                       tempFile.setWritable(true);
-
-                       OutputStreamWriter wrt = new OutputStreamWriter(new 
FileOutputStream(tempFile));
-                       wrt.write(fileContent);
-                       wrt.close();
-
-                       RowTypeInfo typeInfo = new RowTypeInfo(new 
TypeInformation<?>[] {BasicTypeInfo.STRING_TYPE_INFO });
-                       CsvInputFormat<Row> inputFormat = new 
RowCsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo);
-
-                       Configuration parameters = new Configuration();
-                       inputFormat.configure(parameters);
-
-                       inputFormat.setDelimiter(lineBreakerSetup);
-
-                       FileInputSplit[] splits = 
inputFormat.createInputSplits(1);
-
-                       inputFormat.open(splits[0]);
-
-                       Row result = inputFormat.nextRecord(new Row(1));
-
-                       assertNotNull("Expecting to not return null", result);
-
-                       assertEquals(FIRST_PART, result.productElement(0));
-
-                       result = inputFormat.nextRecord(result);
-
-                       assertNotNull("Expecting to not return null", result);
-                       assertEquals(SECOND_PART, result.productElement(0));
-
-               }
-               catch (Throwable t) {
-                       System.err.println("test failed with exception: " + 
t.getMessage());
-                       t.printStackTrace(System.err);
-                       fail("Test erroneous");
-               }
-       }
-
-       @Test
-       public void testQuotedStringParsingWithIncludeFields() throws Exception 
{
-               final String fileContent = "\"20:41:52-1-3-2015\"|\"Re: 
Taskmanager memory error in Eclipse\"|" +
-                               "\"Blahblah 
<b...@blahblah.org>\"|\"blaaa|\"blubb\"";
-
-               final File tempFile = 
File.createTempFile("CsvReaderQuotedString", "tmp");
-               tempFile.deleteOnExit();
-               tempFile.setWritable(true);
-
-               OutputStreamWriter writer = new OutputStreamWriter(new 
FileOutputStream(tempFile));
-               writer.write(fileContent);
-               writer.close();
-
-               final RowTypeInfo typeInfo = new RowTypeInfo(
-                               new TypeInformation<?>[] { 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO });
-               CsvInputFormat<Row> inputFormat = new RowCsvInputFormat(
-                               new Path(tempFile.toURI().toString()), 
typeInfo, new boolean[] { true, false, true });
-
-               inputFormat.enableQuotedStringParsing('"');
-               inputFormat.setFieldDelimiter('|');
-               inputFormat.setDelimiter('\n');
-
-               inputFormat.configure(new Configuration());
-               FileInputSplit[] splits = inputFormat.createInputSplits(1);
-
-               inputFormat.open(splits[0]);
-
-               Row record = inputFormat.nextRecord(new Row(2));
-
-               assertEquals("20:41:52-1-3-2015", record.productElement(0));
-               assertEquals("Blahblah <b...@blahblah.org>", 
record.productElement(1));
-       }
-
-       @Test
-       public void testQuotedStringParsingWithEscapedQuotes() throws Exception 
{
-               final String fileContent = "\"\\\"Hello\\\" World\"|\"We 
are\\\" young\"";
-
-               final File tempFile = 
File.createTempFile("CsvReaderQuotedString", "tmp");
-               tempFile.deleteOnExit();
-               tempFile.setWritable(true);
-
-               OutputStreamWriter writer = new OutputStreamWriter(new 
FileOutputStream(tempFile));
-               writer.write(fileContent);
-               writer.close();
-
-               RowTypeInfo typeInfo = new RowTypeInfo(new TypeInformation<?>[] 
{ BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO });
-               CsvInputFormat<Row> inputFormat = new RowCsvInputFormat(new 
Path(tempFile.toURI().toString()), typeInfo);
-
-               inputFormat.enableQuotedStringParsing('"');
-               inputFormat.setFieldDelimiter('|');
-               inputFormat.setDelimiter('\n');
-
-               inputFormat.configure(new Configuration());
-               FileInputSplit[] splits = inputFormat.createInputSplits(1);
-
-               inputFormat.open(splits[0]);
-
-               Row record = inputFormat.nextRecord(new Row(2));
-
-               assertEquals("\\\"Hello\\\" World", record.productElement(0));
-               assertEquals("We are\\\" young", record.productElement(1));
-       }
-
-       /**
-        * Tests that the CSV input format can deal with POJOs which are 
subclasses.
-        *
-        * @throws Exception
-        */
-       @Test
-       public void testPojoSubclassType() throws Exception {
-               final String fileContent = "t1,foobar,tweet2\nt2,barfoo,tweet2";
-
-               final File tempFile = 
File.createTempFile("CsvReaderPOJOSubclass", "tmp");
-               tempFile.deleteOnExit();
-
-               OutputStreamWriter writer = new OutputStreamWriter(new 
FileOutputStream(tempFile));
-               writer.write(fileContent);
-               writer.close();
-
-               PojoTypeInfo<TwitterPOJO> typeInfo = 
(PojoTypeInfo<TwitterPOJO>)TypeExtractor.createTypeInfo(TwitterPOJO.class);
-               CsvInputFormat<TwitterPOJO> inputFormat = new 
PojoCsvInputFormat<>(new Path(tempFile.toURI().toString()), typeInfo);
-
-               inputFormat.configure(new Configuration());
-               FileInputSplit[] splits = inputFormat.createInputSplits(1);
-
-               inputFormat.open(splits[0]);
-
-               List<TwitterPOJO> expected = new ArrayList<>();
-
-               for (String line: fileContent.split("\n")) {
-                       String[] elements = line.split(",");
-                       expected.add(new TwitterPOJO(elements[0], elements[1], 
elements[2]));
-               }
-
-               List<TwitterPOJO> actual = new ArrayList<>();
-
-               TwitterPOJO pojo;
-
-               while((pojo = inputFormat.nextRecord(new TwitterPOJO())) != 
null) {
-                       actual.add(pojo);
-               }
-
-               assertEquals(expected, actual);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // Custom types for testing
-       // 
--------------------------------------------------------------------------------------------
-
-       public static class PojoItem {
-               public int field1;
-               public String field2;
-               public Double field3;
-               public String field4;
-       }
-
-       public static class PrivatePojoItem {
-               private int field1;
-               private String field2;
-               private Double field3;
-               private String field4;
-
-               public int getField1() {
-                       return field1;
-               }
-
-               public void setField1(int field1) {
-                       this.field1 = field1;
-               }
-
-               public String getField2() {
-                       return field2;
-               }
-
-               public void setField2(String field2) {
-                       this.field2 = field2;
-               }
-
-               public Double getField3() {
-                       return field3;
-               }
-
-               public void setField3(Double field3) {
-                       this.field3 = field3;
-               }
-
-               public String getField4() {
-                       return field4;
-               }
-
-               public void setField4(String field4) {
-                       this.field4 = field4;
-               }
-       }
-
-       public static class POJO {
-               public String table;
-               public String time;
-
-               public POJO() {
-                       this("", "");
-               }
-
-               public POJO(String table, String time) {
-                       this.table = table;
-                       this.time = time;
-               }
-
-               @Override
-               public boolean equals(Object obj) {
-                       if (obj instanceof POJO) {
-                               POJO other = (POJO) obj;
-                               return table.equals(other.table) && 
time.equals(other.time);
-                       } else {
-                               return false;
-                       }
-               }
-       }
-
-       public static class TwitterPOJO extends POJO {
-               public String tweet;
-
-               public TwitterPOJO() {
-                       this("", "", "");
-               }
-
-               public TwitterPOJO(String table, String time, String tweet) {
-                       super(table, time);
-                       this.tweet = tweet;
-               }
-
-               @Override
-               public boolean equals(Object obj) {
-                       if (obj instanceof TwitterPOJO) {
-                               TwitterPOJO other = (TwitterPOJO) obj;
-                               return super.equals(other) && 
tweet.equals(other.tweet);
-                       } else {
-                               return false;
-                       }
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f9a28ae/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
new file mode 100644
index 0000000..540776d
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/runtime/io/RowCsvInputFormatTest.scala
@@ -0,0 +1,841 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime.io
+
+import java.io.{File, FileOutputStream, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+
+import org.apache.flink.api.common.io.ParseException
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.runtime.io.RowCsvInputFormatTest.{PATH, 
createTempFile, testRemovingTrailingCR}
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.core.fs.{FileInputSplit, Path}
+import org.apache.flink.types.parser.{FieldParser, StringParser}
+import org.junit.Assert._
+import org.junit.{Ignore, Test}
+
+class RowCsvInputFormatTest {
+
+  @Test
+  def ignoreInvalidLines() {
+    val fileContent =
+      "#description of the data\n" +
+        "header1|header2|header3|\n" +
+        "this is|1|2.0|\n" +
+        "//a comment\n" +
+        "a test|3|4.0|\n" +
+        "#next|5|6.0|\n"
+
+    val split = createTempFile(fileContent)
+
+    val typeInfo = new RowTypeInfo(Seq(
+      BasicTypeInfo.STRING_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.DOUBLE_TYPE_INFO))
+
+    val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|")
+    format.setLenient(false)
+    val parameters = new Configuration
+    format.configure(parameters)
+    format.open(split)
+
+    var result = new Row(3)
+    try {
+      result = format.nextRecord(result)
+      fail("Parse Exception was not thrown! (Row too short)")
+    }
+    catch {
+      case ex: ParseException =>  // ok
+    }
+
+    try {
+      result = format.nextRecord(result)
+      fail("Parse Exception was not thrown! (Invalid int value)")
+    }
+    catch {
+      case ex: ParseException => // ok
+    }
+
+    result = format.nextRecord(result)
+    assertNotNull(result)
+    assertEquals("this is", result.productElement(0))
+    assertEquals(1, result.productElement(1))
+    assertEquals(2.0, result.productElement(2))
+
+    try {
+      result = format.nextRecord(result)
+      fail("Parse Exception was not thrown! (Row too short)")
+    }
+    catch {
+      case ex: ParseException => // ok
+    }
+
+    result = format.nextRecord(result)
+    assertNotNull(result)
+    assertEquals("a test", result.productElement(0))
+    assertEquals(3, result.productElement(1))
+    assertEquals(4.0, result.productElement(2))
+
+    result = format.nextRecord(result)
+    assertNotNull(result)
+    assertEquals("#next", result.productElement(0))
+    assertEquals(5, result.productElement(1))
+    assertEquals(6.0, result.productElement(2))
+
+    result = format.nextRecord(result)
+    assertNull(result)
+
+    // re-open with lenient = true
+    format.setLenient(true)
+    format.configure(parameters)
+    format.open(split)
+
+    result = new Row(3)
+
+    result = format.nextRecord(result)
+    assertNotNull(result)
+    assertEquals("header1", result.productElement(0))
+    assertNull(result.productElement(1))
+    assertNull(result.productElement(2))
+
+    result = format.nextRecord(result)
+    assertNotNull(result)
+    assertEquals("this is", result.productElement(0))
+    assertEquals(1, result.productElement(1))
+    assertEquals(2.0, result.productElement(2))
+
+    result = format.nextRecord(result)
+    assertNotNull(result)
+    assertEquals("a test", result.productElement(0))
+    assertEquals(3, result.productElement(1))
+    assertEquals(4.0, result.productElement(2))
+
+    result = format.nextRecord(result)
+    assertNotNull(result)
+    assertEquals("#next", result.productElement(0))
+    assertEquals(5, result.productElement(1))
+    assertEquals(6.0, result.productElement(2))
+    result = format.nextRecord(result)
+    assertNull(result)
+  }
+
+  @Test
+  def ignoreSingleCharPrefixComments() {
+    val fileContent =
+      "#description of the data\n" +
+        "#successive commented line\n" +
+        "this is|1|2.0|\n" +
+        "a test|3|4.0|\n" +
+        "#next|5|6.0|\n"
+
+    val split = createTempFile(fileContent)
+
+    val typeInfo = new RowTypeInfo(Seq(
+      BasicTypeInfo.STRING_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.DOUBLE_TYPE_INFO))
+
+    val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|")
+    format.setCommentPrefix("#")
+    format.configure(new Configuration)
+    format.open(split)
+
+    var result = new Row(3)
+
+    result = format.nextRecord(result)
+    assertNotNull(result)
+    assertEquals("this is", result.productElement(0))
+    assertEquals(1, result.productElement(1))
+    assertEquals(2.0, result.productElement(2))
+
+    result = format.nextRecord(result)
+    assertNotNull(result)
+    assertEquals("a test", result.productElement(0))
+    assertEquals(3, result.productElement(1))
+    assertEquals(4.0, result.productElement(2))
+
+    result = format.nextRecord(result)
+    assertNull(result)
+  }
+
+  @Test
+  def ignoreMultiCharPrefixComments() {
+    val fileContent =
+      "//description of the data\n" +
+        "//successive commented line\n" +
+        "this is|1|2.0|\n" +
+        "a test|3|4.0|\n" +
+        "//next|5|6.0|\n"
+
+    val split = createTempFile(fileContent)
+
+    val typeInfo = new RowTypeInfo(Seq(
+      BasicTypeInfo.STRING_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.DOUBLE_TYPE_INFO))
+
+    val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|")
+    format.setCommentPrefix("//")
+    format.configure(new Configuration)
+    format.open(split)
+
+    var result = new Row(3)
+
+    result = format.nextRecord(result)
+    assertNotNull(result)
+    assertEquals("this is", result.productElement(0))
+    assertEquals(1, result.productElement(1))
+    assertEquals(2.0, result.productElement(2))
+
+    result = format.nextRecord(result)
+    assertNotNull(result)
+    assertEquals("a test", result.productElement(0))
+    assertEquals(3, result.productElement(1))
+    assertEquals(4.0, result.productElement(2))
+    result = format.nextRecord(result)
+    assertNull(result)
+  }
+
+  @Test
+  def readStringFields() {
+    val fileContent = "abc|def|ghijk\nabc||hhg\n|||"
+    
+    val split = createTempFile(fileContent)
+    
+    val typeInfo = new RowTypeInfo(Seq(
+      BasicTypeInfo.STRING_TYPE_INFO,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      BasicTypeInfo.STRING_TYPE_INFO))
+    
+    val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|")
+    format.configure(new Configuration)
+    format.open(split)
+    
+    var result = new Row(3)
+    
+    result = format.nextRecord(result)
+    assertNotNull(result)
+    assertEquals("abc", result.productElement(0))
+    assertEquals("def", result.productElement(1))
+    assertEquals("ghijk", result.productElement(2))
+    
+    result = format.nextRecord(result)
+    assertNotNull(result)
+    assertEquals("abc", result.productElement(0))
+    assertEquals("", result.productElement(1))
+    assertEquals("hhg", result.productElement(2))
+    
+    result = format.nextRecord(result)
+    assertNotNull(result)
+    assertEquals("", result.productElement(0))
+    assertEquals("", result.productElement(1))
+    assertEquals("", result.productElement(2))
+    
+    result = format.nextRecord(result)
+    assertNull(result)
+    assertTrue(format.reachedEnd)
+  }
+
+  @Test def readMixedQuotedStringFields() {
+      val fileContent = "@a|b|c@|def|@ghijk@\nabc||@|hhg@\n|||"
+      
+      val split = createTempFile(fileContent)
+      
+      val typeInfo = new RowTypeInfo(Seq(
+        BasicTypeInfo.STRING_TYPE_INFO,
+        BasicTypeInfo.STRING_TYPE_INFO,
+        BasicTypeInfo.STRING_TYPE_INFO))
+      
+      val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|")
+      format.configure(new Configuration)
+      format.enableQuotedStringParsing('@')
+      format.open(split)
+    
+      var result = new Row(3)
+    
+      result = format.nextRecord(result)
+      assertNotNull(result)
+      assertEquals("a|b|c", result.productElement(0))
+      assertEquals("def", result.productElement(1))
+      assertEquals("ghijk", result.productElement(2))
+    
+      result = format.nextRecord(result)
+      assertNotNull(result)
+      assertEquals("abc", result.productElement(0))
+      assertEquals("", result.productElement(1))
+      assertEquals("|hhg", result.productElement(2))
+    
+      result = format.nextRecord(result)
+      assertNotNull(result)
+      assertEquals("", result.productElement(0))
+      assertEquals("", result.productElement(1))
+      assertEquals("", result.productElement(2))
+    
+      result = format.nextRecord(result)
+      assertNull(result)
+      assertTrue(format.reachedEnd)
+  }
+
+  @Test def readStringFieldsWithTrailingDelimiters() {
+    val fileContent = "abc|-def|-ghijk\nabc|-|-hhg\n|-|-|-\n"
+
+    val split = createTempFile(fileContent)
+
+    val typeInfo = new RowTypeInfo(Seq(
+      BasicTypeInfo.STRING_TYPE_INFO,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      BasicTypeInfo.STRING_TYPE_INFO))
+
+    val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|")
+    format.setFieldDelimiter("|-")
+    format.configure(new Configuration)
+    format.open(split)
+
+    var result = new Row(3)
+
+    result = format.nextRecord(result)
+    assertNotNull(result)
+    assertEquals("abc", result.productElement(0))
+    assertEquals("def", result.productElement(1))
+    assertEquals("ghijk", result.productElement(2))
+
+    result = format.nextRecord(result)
+    assertNotNull(result)
+    assertEquals("abc", result.productElement(0))
+    assertEquals("", result.productElement(1))
+    assertEquals("hhg", result.productElement(2))
+
+    result = format.nextRecord(result)
+    assertNotNull(result)
+    assertEquals("", result.productElement(0))
+    assertEquals("", result.productElement(1))
+    assertEquals("", result.productElement(2))
+
+    result = format.nextRecord(result)
+    assertNull(result)
+    assertTrue(format.reachedEnd)
+  }
+
+  @Test
+  def testIntegerFields() {
+    val fileContent = "111|222|333|444|555\n666|777|888|999|000|\n"
+    
+    val split = createTempFile(fileContent)
+    
+    val typeInfo = new RowTypeInfo(Seq(
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO))
+    
+    val format = new RowCsvInputFormat(PATH, typeInfo, "\n", "|")
+
+    format.setFieldDelimiter("|")
+    format.configure(new Configuration)
+    format.open(split)
+
+    var result = new Row(5)
+
+    result = format.nextRecord(result)
+    assertNotNull(result)
+    assertEquals(111, result.productElement(0))
+    assertEquals(222, result.productElement(1))
+    assertEquals(333, result.productElement(2))
+    assertEquals(444, result.productElement(3))
+    assertEquals(555, result.productElement(4))
+
+    result = format.nextRecord(result)
+    assertNotNull(result)
+    assertEquals(666, result.productElement(0))
+    assertEquals(777, result.productElement(1))
+    assertEquals(888, result.productElement(2))
+    assertEquals(999, result.productElement(3))
+    assertEquals(0, result.productElement(4))
+
+    result = format.nextRecord(result)
+    assertNull(result)
+    assertTrue(format.reachedEnd)
+  }
+
+  @Test
+  def testEmptyFields() {
+    val fileContent =
+      "|0|0|0|0|0|\n" +
+        "1||1|1|1|1|\n" +
+        "2|2||2|2|2|\n" +
+        "3|3|3||3|3|\n" +
+        "4|4|4|4||4|\n" +
+        "5|5|5|5|5||\n"
+
+    val split = createTempFile(fileContent)
+
+    // TODO: FLOAT_TYPE_INFO and DOUBLE_TYPE_INFO don't handle correctly null 
values
+    val typeInfo = new RowTypeInfo(Seq(
+      BasicTypeInfo.SHORT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.LONG_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.BYTE_TYPE_INFO))
+
+    val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo)
+    format.setFieldDelimiter("|")
+    format.configure(new Configuration)
+    format.open(split)
+
+    var result = new Row(6)
+    val linesCnt = fileContent.split("\n").length
+
+    var i = 0
+    while (i < linesCnt) {
+      result = format.nextRecord(result)
+      assertNull(result.productElement(i))
+      i += 1
+    }
+    
+    // ensure no more rows
+    assertNull(format.nextRecord(result))
+    assertTrue(format.reachedEnd)
+  }
+
+  @Test
+  def testDoubleFields() {
+    val fileContent = "11.1|22.2|33.3|44.4|55.5\n66.6|77.7|88.8|99.9|00.0|\n"
+
+    val split = createTempFile(fileContent)
+
+    val typeInfo = new RowTypeInfo(Seq(
+      BasicTypeInfo.DOUBLE_TYPE_INFO,
+      BasicTypeInfo.DOUBLE_TYPE_INFO,
+      BasicTypeInfo.DOUBLE_TYPE_INFO,
+      BasicTypeInfo.DOUBLE_TYPE_INFO,
+      BasicTypeInfo.DOUBLE_TYPE_INFO))
+
+    val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo)
+    format.setFieldDelimiter("|")
+    format.configure(new Configuration)
+    format.open(split)
+
+    var result = new Row(5)
+
+    result = format.nextRecord(result)
+    assertNotNull(result)
+    assertEquals(11.1, result.productElement(0))
+    assertEquals(22.2, result.productElement(1))
+    assertEquals(33.3, result.productElement(2))
+    assertEquals(44.4, result.productElement(3))
+    assertEquals(55.5, result.productElement(4))
+
+    result = format.nextRecord(result)
+    assertNotNull(result)
+    assertEquals(66.6, result.productElement(0))
+    assertEquals(77.7, result.productElement(1))
+    assertEquals(88.8, result.productElement(2))
+    assertEquals(99.9, result.productElement(3))
+    assertEquals(0.0, result.productElement(4))
+
+    result = format.nextRecord(result)
+    assertNull(result)
+    assertTrue(format.reachedEnd)
+  }
+
+  @Test
+  def testReadFirstN() {
+    val fileContent = "111|222|333|444|555|\n666|777|888|999|000|\n"
+
+    val split = createTempFile(fileContent)
+
+    val typeInfo = new RowTypeInfo(Seq(
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO))
+
+    val format = new RowCsvInputFormat(PATH, rowTypeInfo = typeInfo)
+    format.setFieldDelimiter("|")
+    format.configure(new Configuration)
+    format.open(split)
+
+    var result = new Row(2)
+
+    result = format.nextRecord(result)
+    assertNotNull(result)
+    assertEquals(111, result.productElement(0))
+    assertEquals(222, result.productElement(1))
+
+    result = format.nextRecord(result)
+    assertNotNull(result)
+    assertEquals(666, result.productElement(0))
+    assertEquals(777, result.productElement(1))
+
+    result = format.nextRecord(result)
+    assertNull(result)
+    assertTrue(format.reachedEnd)
+  }
+
+  @Test
+  def testReadSparseWithNullFieldsForTypes() {
+    val fileContent = 
"111|x|222|x|333|x|444|x|555|x|666|x|777|x|888|x|999|x|000|x|\n" +
+      "000|x|999|x|888|x|777|x|666|x|555|x|444|x|333|x|222|x|111|x|"
+
+    val split = createTempFile(fileContent)
+
+    val typeInfo: RowTypeInfo = new RowTypeInfo(Seq(
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO))
+
+    val format = new RowCsvInputFormat(
+      PATH,
+      rowTypeInfo = typeInfo,
+      includedFieldsMask = Array(true, false, false, true, false, false, 
false, true))
+    format.setFieldDelimiter("|x|")
+    format.configure(new Configuration)
+    format.open(split)
+
+    var result = new Row(3)
+
+    result = format.nextRecord(result)
+    assertNotNull(result)
+    assertEquals(111, result.productElement(0))
+    assertEquals(444, result.productElement(1))
+    assertEquals(888, result.productElement(2))
+
+    result = format.nextRecord(result)
+    assertNotNull(result)
+    assertEquals(0, result.productElement(0))
+    assertEquals(777, result.productElement(1))
+    assertEquals(333, result.productElement(2))
+
+    result = format.nextRecord(result)
+    assertNull(result)
+    assertTrue(format.reachedEnd)
+  }
+
+  @Test
+  def testReadSparseWithPositionSetter() {
+      val fileContent = "111|222|333|444|555|666|777|888|999|000|\n" +
+        "000|999|888|777|666|555|444|333|222|111|"
+
+      val split = createTempFile(fileContent)
+
+      val typeInfo = new RowTypeInfo(Seq(
+        BasicTypeInfo.INT_TYPE_INFO,
+        BasicTypeInfo.INT_TYPE_INFO,
+        BasicTypeInfo.INT_TYPE_INFO))
+
+      val format = new RowCsvInputFormat(
+        PATH,
+        typeInfo,
+        Array(0, 3, 7))
+      format.setFieldDelimiter("|")
+      format.configure(new Configuration)
+      format.open(split)
+
+      var result = new Row(3)
+      result = format.nextRecord(result)
+
+      assertNotNull(result)
+      assertEquals(111, result.productElement(0))
+      assertEquals(444, result.productElement(1))
+      assertEquals(888, result.productElement(2))
+
+      result = format.nextRecord(result)
+      assertNotNull(result)
+      assertEquals(0, result.productElement(0))
+      assertEquals(777, result.productElement(1))
+      assertEquals(333, result.productElement(2))
+
+      result = format.nextRecord(result)
+      assertNull(result)
+      assertTrue(format.reachedEnd)
+  }
+
+  @Test
+  def testReadSparseWithMask() {
+    val fileContent = "111&&222&&333&&444&&555&&666&&777&&888&&999&&000&&\n" +
+      "000&&999&&888&&777&&666&&555&&444&&333&&222&&111&&"
+
+    val split = RowCsvInputFormatTest.createTempFile(fileContent)
+
+    val typeInfo = new RowTypeInfo(Seq(
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO))
+
+    val format = new RowCsvInputFormat(
+      PATH,
+      rowTypeInfo = typeInfo,
+      includedFieldsMask = Array(true, false, false, true, false, false, 
false, true))
+    format.setFieldDelimiter("&&")
+    format.configure(new Configuration)
+    format.open(split)
+
+    var result = new Row(3)
+
+    result = format.nextRecord(result)
+    assertNotNull(result)
+    assertEquals(111, result.productElement(0))
+    assertEquals(444, result.productElement(1))
+    assertEquals(888, result.productElement(2))
+
+    result = format.nextRecord(result)
+    assertNotNull(result)
+    assertEquals(0, result.productElement(0))
+    assertEquals(777, result.productElement(1))
+    assertEquals(333, result.productElement(2))
+
+    result = format.nextRecord(result)
+    assertNull(result)
+    assertTrue(format.reachedEnd)
+  }
+
+  @Test
+  def testParseStringErrors() {
+    val stringParser = new StringParser
+    stringParser.enableQuotedStringParsing('"'.toByte)
+
+    val failures = Seq(
+      ("\"string\" trailing", 
FieldParser.ParseErrorState.UNQUOTED_CHARS_AFTER_QUOTED_STRING),
+      ("\"unterminated ", 
FieldParser.ParseErrorState.UNTERMINATED_QUOTED_STRING)
+    )
+
+    for (failure <- failures) {
+      val result = stringParser.parseField(
+        failure._1.getBytes,
+        0,
+        failure._1.length,
+        Array[Byte]('|'),
+        null)
+
+      assertEquals(-1, result)
+      assertEquals(failure._2, stringParser.getErrorState)
+    }
+  }
+
+  // Test disabled because we do not support double-quote escaped quotes right 
now.
+  @Test
+  @Ignore
+  def testParserCorrectness() {
+    // RFC 4180 Compliance Test content
+    // Taken from http://en.wikipedia.org/wiki/Comma-separated_values#Example
+    val fileContent = "Year,Make,Model,Description,Price\n" +
+      "1997,Ford,E350,\"ac, abs, moon\",3000.00\n" +
+      "1999,Chevy,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00\n" +
+      "1996,Jeep,Grand Cherokee,\"MUST SELL! air, moon roof, 
loaded\",4799.00\n" +
+      "1999,Chevy,\"Venture \"\"Extended Edition, Very Large\"\"\",,5000.00\n" 
+
+      ",,\"Venture \"\"Extended Edition\"\"\",\"\",4900.00"
+
+    val split = createTempFile(fileContent)
+
+    val typeInfo = new RowTypeInfo(Seq(
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      BasicTypeInfo.DOUBLE_TYPE_INFO))
+
+    val format = new RowCsvInputFormat(PATH, typeInfo)
+    format.setSkipFirstLineAsHeader(true)
+    format.setFieldDelimiter(',')
+    format.configure(new Configuration)
+    format.open(split)
+
+    var result = new Row(5)
+    val r1: Row = new Row(5)
+    r1.setField(0, 1997)
+    r1.setField(1, "Ford")
+    r1.setField(2, "E350")
+    r1.setField(3, "ac, abs, moon")
+    r1.setField(4, 3000.0)
+
+    val r2: Row = new Row(5)
+    r2.setField(0, 1999)
+    r2.setField(1, "Chevy")
+    r2.setField(2, "Venture \"Extended Edition\"")
+    r2.setField(3, "")
+    r2.setField(4, 4900.0)
+
+    val r3: Row = new Row(5)
+    r3.setField(0, 1996)
+    r3.setField(1, "Jeep")
+    r3.setField(2, "Grand Cherokee")
+    r3.setField(3, "MUST SELL! air, moon roof, loaded")
+    r3.setField(4, 4799.0)
+
+    val r4: Row = new Row(5)
+    r4.setField(0, 1999)
+    r4.setField(1, "Chevy")
+    r4.setField(2, "Venture \"Extended Edition, Very Large\"")
+    r4.setField(3, "")
+    r4.setField(4, 5000.0)
+
+    val r5: Row = new Row(5)
+    r5.setField(0, 0)
+    r5.setField(1, "")
+    r5.setField(2, "Venture \"Extended Edition\"")
+    r5.setField(3, "")
+    r5.setField(4, 4900.0)
+
+    val expectedLines = Array(r1, r2, r3, r4, r5)
+    for (expected <- expectedLines) {
+      result = format.nextRecord(result)
+      assertEquals(expected, result)
+    }
+    assertNull(format.nextRecord(result))
+    assertTrue(format.reachedEnd)
+  }
+
+  @Test
+  def testWindowsLineEndRemoval() {
+
+    // check typical use case -- linux file is correct and it is set up to 
linux(\n)
+    testRemovingTrailingCR("\n", "\n")
+
+    // check typical windows case -- windows file endings and file has windows 
file endings set up
+    testRemovingTrailingCR("\r\n", "\r\n")
+
+    // check problematic case windows file -- windows file endings(\r\n)
+    // but linux line endings (\n) set up
+    testRemovingTrailingCR("\r\n", "\n")
+
+    // check problematic case linux file -- linux file endings (\n)
+    // but windows file endings set up (\r\n)
+    // specific setup for windows line endings will expect \r\n because
+    // it has to be set up and is not standard.
+  }
+
+  @Test
+  def testQuotedStringParsingWithIncludeFields() {
+    val fileContent = "\"20:41:52-1-3-2015\"|\"Re: Taskmanager memory error in 
Eclipse\"|" +
+      "\"Blahblah <b...@blahblah.org>\"|\"blaaa|\"blubb\""
+    val tempFile = File.createTempFile("CsvReaderQuotedString", "tmp")
+    tempFile.deleteOnExit()
+    tempFile.setWritable(true)
+
+    val writer = new OutputStreamWriter(new FileOutputStream(tempFile))
+    writer.write(fileContent)
+    writer.close()
+
+    val typeInfo = new RowTypeInfo(Seq(
+      BasicTypeInfo.STRING_TYPE_INFO,
+      BasicTypeInfo.STRING_TYPE_INFO))
+
+    val inputFormat = new RowCsvInputFormat(
+      new Path(tempFile.toURI.toString),
+      rowTypeInfo = typeInfo,
+      includedFieldsMask = Array(true, false, true))
+    inputFormat.enableQuotedStringParsing('"')
+    inputFormat.setFieldDelimiter('|')
+    inputFormat.setDelimiter('\n')
+    inputFormat.configure(new Configuration)
+
+    val splits = inputFormat.createInputSplits(1)
+    inputFormat.open(splits(0))
+
+    val record = inputFormat.nextRecord(new Row(2))
+    assertEquals("20:41:52-1-3-2015", record.productElement(0))
+    assertEquals("Blahblah <b...@blahblah.org>", record.productElement(1))
+  }
+
+  @Test
+  def testQuotedStringParsingWithEscapedQuotes() {
+    val fileContent = "\"\\\"Hello\\\" World\"|\"We are\\\" young\""
+    val tempFile = File.createTempFile("CsvReaderQuotedString", "tmp")
+    tempFile.deleteOnExit()
+    tempFile.setWritable(true)
+
+    val writer = new OutputStreamWriter(new FileOutputStream(tempFile))
+    writer.write(fileContent)
+    writer.close()
+
+    val typeInfo = new RowTypeInfo(Seq(
+      BasicTypeInfo.STRING_TYPE_INFO,
+      BasicTypeInfo.STRING_TYPE_INFO))
+
+    val inputFormat = new RowCsvInputFormat(
+      new Path(tempFile.toURI.toString),
+      rowTypeInfo = typeInfo)
+    inputFormat.enableQuotedStringParsing('"')
+    inputFormat.setFieldDelimiter('|')
+    inputFormat.setDelimiter('\n')
+    inputFormat.configure(new Configuration)
+
+    val splits = inputFormat.createInputSplits(1)
+    inputFormat.open(splits(0))
+
+    val record = inputFormat.nextRecord(new Row(2))
+    assertEquals("\\\"Hello\\\" World", record.productElement(0))
+    assertEquals("We are\\\" young", record.productElement(1))
+  }
+}
+
+object RowCsvInputFormatTest {
+
+  private val PATH = new Path("an/ignored/file/")
+
+  // static variables for testing the removal of \r\n to \n
+  private val FIRST_PART = "That is the first part"
+  private val SECOND_PART = "That is the second part"
+
+  private def createTempFile(content: String): FileInputSplit = {
+      val tempFile = File.createTempFile("test_contents", "tmp")
+      tempFile.deleteOnExit()
+      val wrt = new OutputStreamWriter(new FileOutputStream(tempFile), 
StandardCharsets.UTF_8)
+      wrt.write(content)
+      wrt.close()
+      new FileInputSplit(
+        0,
+        new Path(tempFile.toURI.toString),
+        0,
+        tempFile.length,
+        Array("localhost"))
+  }
+
+  private def testRemovingTrailingCR(lineBreakerInFile: String, 
lineBreakerSetup: String) {
+    val fileContent = FIRST_PART + lineBreakerInFile + SECOND_PART + 
lineBreakerInFile
+
+    // create input file
+    val tempFile = File.createTempFile("CsvInputFormatTest", "tmp")
+    tempFile.deleteOnExit()
+    tempFile.setWritable(true)
+
+    val wrt = new OutputStreamWriter(new FileOutputStream(tempFile))
+    wrt.write(fileContent)
+    wrt.close()
+
+    val typeInfo = new RowTypeInfo(Seq(BasicTypeInfo.STRING_TYPE_INFO))
+
+    val inputFormat = new RowCsvInputFormat(new Path(tempFile.toURI.toString), 
typeInfo)
+    inputFormat.configure(new Configuration)
+    inputFormat.setDelimiter(lineBreakerSetup)
+
+    val splits = inputFormat.createInputSplits(1)
+    inputFormat.open(splits(0))
+
+    var result = inputFormat.nextRecord(new Row(1))
+    assertNotNull("Expecting to not return null", result)
+    assertEquals(FIRST_PART, result.productElement(0))
+
+    result = inputFormat.nextRecord(result)
+    assertNotNull("Expecting to not return null", result)
+    assertEquals(SECOND_PART, result.productElement(0))
+  }
+}

Reply via email to