[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/426#issuecomment-85866221 Cool, thanks. Will have a look shortly. Did you rebase to the latest master? We had a few build issues with the master branch a few days ago. It is a good sign if it builds on your machine. On Mar 25, 2015 5:47 AM, Chiwan Park notificati...@github.com wrote: Hi, I updated this PR. - Remove pojoType(Class? targetType) method in CsvReader to force the user to explicitly specify the fields order. - Add checking the fields order existence routine in readCsvFile method. - Add two integration tests for above 2 modifications. By the way, I cannot find out why Travis fails. In my computer, mvn clean install -DskipTests and mvn verify succeed. From travis log https://travis-ci.org/chiwanpark/flink/jobs/55747686#L7074, It seems that the problem relates with Gelly. Although I read some codes in Gelly, I cannot find what is exactly problem. Could anyone help me with this problem? â Reply to this email directly or view it on GitHub https://github.com/apache/flink/pull/426#issuecomment-85830275. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/426#issuecomment-86011099 Oops, I pushed a intermediate commit a8a5c37. I will fix it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/426#issuecomment-86011531 Sure, no problem :-) Can I check it now or do you need a bit more time? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/426#issuecomment-86011782 @fhueske You can check it now :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/426#issuecomment-86079595 @chiwanpark excellent job, thanks! Will merge it after a final round of Travis tests passed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/426 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/426#discussion_r27018880 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala --- @@ -223,8 +224,11 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { * @param lenient Whether the parser should silently ignore malformed lines. * @param includedFields The fields in the file that should be read. Per default all fields * are read. + * @param fieldsOrder The order information between CSV data and POJO fields. Without order --- End diff -- change parameter name to `pojoFieldOrder`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/426#discussion_r27018277 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java --- @@ -52,103 +50,86 @@ public static final String DEFAULT_FIELD_DELIMITER = ,; - private transient Object[] parsedValues; - - private byte[] commentPrefix = null; - - // To speed up readRecord processing. Used to find windows line endings. - // It is set when open so that readRecord does not have to evaluate it - private boolean lineDelimiterIsLinebreak = false; - - private transient int commentCount; - private transient int invalidLineCount; - - - public CsvInputFormat(Path filePath) { - super(filePath); - } - - public CsvInputFormat(Path filePath, Class? ... types) { - this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, types); - } + private ClassOUT pojoTypeClass = null; + private String[] pojoFieldsName = null; + private transient Field[] pojoFields = null; + private transient PojoTypeInfoOUT pojoTypeInfo = null; + + public CsvInputFormat(Path filePath, TypeInformationOUT typeInformation) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, typeInformation); + } - public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, Class?... types) { + public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TypeInformationOUT typeInformation) { super(filePath); + Preconditions.checkArgument(typeInformation instanceof CompositeType); + CompositeTypeOUT compositeType = (CompositeTypeOUT) typeInformation; + setDelimiter(lineDelimiter); setFieldDelimiter(fieldDelimiter); - setFieldTypes(types); - } - - - public byte[] getCommentPrefix() { - return commentPrefix; - } - - public void setCommentPrefix(byte[] commentPrefix) { - this.commentPrefix = commentPrefix; - } - - public void setCommentPrefix(char commentPrefix) { - setCommentPrefix(String.valueOf(commentPrefix)); - } - - public void setCommentPrefix(String commentPrefix) { - setCommentPrefix(commentPrefix, Charsets.UTF_8); - } - - public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException { - if (charsetName == null) { - throw new IllegalArgumentException(Charset name must not be null); + Class?[] classes = new Class?[typeInformation.getArity()]; + for (int i = 0, arity = typeInformation.getArity(); i arity; i++) { + classes[i] = compositeType.getTypeAt(i).getTypeClass(); } - - if (commentPrefix != null) { - Charset charset = Charset.forName(charsetName); - setCommentPrefix(commentPrefix, charset); - } else { - this.commentPrefix = null; + setFieldTypes(classes); + + if (typeInformation instanceof PojoTypeInfo) { + pojoTypeInfo = (PojoTypeInfoOUT) typeInformation; + pojoTypeClass = typeInformation.getTypeClass(); + pojoFieldsName = compositeType.getFieldNames(); + setFieldsOrder(pojoFieldsName); } } - - public void setCommentPrefix(String commentPrefix, Charset charset) { - if (charset == null) { - throw new IllegalArgumentException(Charset must not be null); + + public void setFieldsOrder(String[] fieldsOrder) { + Preconditions.checkNotNull(pojoTypeClass, Field ordering feature can be used only with POJO fields.); --- End diff -- How about `Field order can only be specified if output type is a POJO` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/426#discussion_r27018316 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java --- @@ -52,103 +50,86 @@ public static final String DEFAULT_FIELD_DELIMITER = ,; - private transient Object[] parsedValues; - - private byte[] commentPrefix = null; - - // To speed up readRecord processing. Used to find windows line endings. - // It is set when open so that readRecord does not have to evaluate it - private boolean lineDelimiterIsLinebreak = false; - - private transient int commentCount; - private transient int invalidLineCount; - - - public CsvInputFormat(Path filePath) { - super(filePath); - } - - public CsvInputFormat(Path filePath, Class? ... types) { - this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, types); - } + private ClassOUT pojoTypeClass = null; + private String[] pojoFieldsName = null; + private transient Field[] pojoFields = null; + private transient PojoTypeInfoOUT pojoTypeInfo = null; + + public CsvInputFormat(Path filePath, TypeInformationOUT typeInformation) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, typeInformation); + } - public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, Class?... types) { + public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TypeInformationOUT typeInformation) { super(filePath); + Preconditions.checkArgument(typeInformation instanceof CompositeType); + CompositeTypeOUT compositeType = (CompositeTypeOUT) typeInformation; + setDelimiter(lineDelimiter); setFieldDelimiter(fieldDelimiter); - setFieldTypes(types); - } - - - public byte[] getCommentPrefix() { - return commentPrefix; - } - - public void setCommentPrefix(byte[] commentPrefix) { - this.commentPrefix = commentPrefix; - } - - public void setCommentPrefix(char commentPrefix) { - setCommentPrefix(String.valueOf(commentPrefix)); - } - - public void setCommentPrefix(String commentPrefix) { - setCommentPrefix(commentPrefix, Charsets.UTF_8); - } - - public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException { - if (charsetName == null) { - throw new IllegalArgumentException(Charset name must not be null); + Class?[] classes = new Class?[typeInformation.getArity()]; + for (int i = 0, arity = typeInformation.getArity(); i arity; i++) { + classes[i] = compositeType.getTypeAt(i).getTypeClass(); } - - if (commentPrefix != null) { - Charset charset = Charset.forName(charsetName); - setCommentPrefix(commentPrefix, charset); - } else { - this.commentPrefix = null; + setFieldTypes(classes); + + if (typeInformation instanceof PojoTypeInfo) { + pojoTypeInfo = (PojoTypeInfoOUT) typeInformation; + pojoTypeClass = typeInformation.getTypeClass(); + pojoFieldsName = compositeType.getFieldNames(); + setFieldsOrder(pojoFieldsName); } } - - public void setCommentPrefix(String commentPrefix, Charset charset) { - if (charset == null) { - throw new IllegalArgumentException(Charset must not be null); + + public void setFieldsOrder(String[] fieldsOrder) { --- End diff -- Change method name to `setOrderOfPOJOFields`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/426#issuecomment-85455383 Very good! Let me know, when you want me to have a look again :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/426#discussion_r27018151 --- Diff: flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java --- @@ -98,98 +123,66 @@ public void setFields(int[] sourceFieldIndices, Class?[] fieldTypes) { setFieldsGeneric(sourceFieldIndices, fieldTypes); } - public byte[] getCommentPrefix() { - return commentPrefix; - } - - public void setCommentPrefix(byte[] commentPrefix) { - this.commentPrefix = commentPrefix; - } - - public void setCommentPrefix(char commentPrefix) { - setCommentPrefix(String.valueOf(commentPrefix)); - } + public void setFields(boolean[] sourceFieldMask, Class?[] fieldTypes) { + Preconditions.checkNotNull(sourceFieldMask); + Preconditions.checkNotNull(fieldTypes); - public void setCommentPrefix(String commentPrefix) { - setCommentPrefix(commentPrefix, Charsets.UTF_8); + setFieldsGeneric(sourceFieldMask, fieldTypes); } - public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException { - if (charsetName == null) { - throw new IllegalArgumentException(Charset name must not be null); - } - - if (commentPrefix != null) { - Charset charset = Charset.forName(charsetName); - setCommentPrefix(commentPrefix, charset); - } else { - this.commentPrefix = null; - } + public Class?[] getFieldTypes() { + return super.getGenericFieldTypes(); } - public void setCommentPrefix(String commentPrefix, Charset charset) { - if (charset == null) { - throw new IllegalArgumentException(Charset must not be null); - } - if (commentPrefix != null) { - this.commentPrefix = commentPrefix.getBytes(charset); - } else { - this.commentPrefix = null; - } - } - - @Override - public void close() throws IOException { --- End diff -- Did the `close()` method got lost in the refactoring? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/426#issuecomment-85447697 I see the issue with the non-deterministic field order and FLINK-1665 as follows. Both, FLINK-1665 and Option 3, solve the problem of non-deterministic field order. - FLINK-1665 by specifying the order in the POJO using annoations. - Option 3 by forcing the user to explicitly specify the field order. While we don't have FLINK-1665 implemented, I would go with Option 3. Once we have FLINK-1665 we could relax it for POJOs with `@Position` annotations. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/426#discussion_r27018812 --- Diff: flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java --- @@ -19,66 +19,91 @@ package org.apache.flink.api.scala.operators; -import com.google.common.base.Charsets; import com.google.common.base.Preconditions; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.io.GenericCsvInputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; -import org.apache.flink.types.parser.FieldParser; -import org.apache.flink.util.StringUtils; +import org.apache.flink.types.parser.FieldParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.nio.charset.Charset; -import java.nio.charset.IllegalCharsetNameException; -import java.nio.charset.UnsupportedCharsetException; -import java.util.Map; -import java.util.TreeMap; +import java.lang.reflect.Field; +import java.util.Arrays; -import scala.Product; - -public class ScalaCsvInputFormatOUT extends Product extends GenericCsvInputFormatOUT { +public class ScalaCsvInputFormatOUT extends GenericCsvInputFormatOUT { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(ScalaCsvInputFormat.class); - - private transient Object[] parsedValues; - - // To speed up readRecord processing. Used to find windows line endings. - // It is set when open so that readRecord does not have to evaluate it - private boolean lineDelimiterIsLinebreak = false; - private final TupleSerializerBaseOUT serializer; + private transient Object[] parsedValues; - private byte[] commentPrefix = null; + private final TupleSerializerBaseOUT tupleSerializer; - private transient int commentCount; - private transient int invalidLineCount; + private ClassOUT pojoTypeClass = null; + private String[] pojoFieldsName = null; + private transient Field[] pojoFields = null; + private transient PojoTypeInfoOUT pojoTypeInfo = null; public ScalaCsvInputFormat(Path filePath, TypeInformationOUT typeInfo) { super(filePath); - if (!(typeInfo.isTupleType())) { - throw new UnsupportedOperationException(This only works on tuple types.); + Class?[] classes = new Class[typeInfo.getArity()]; + + if (typeInfo instanceof TupleTypeInfoBase) { + TupleTypeInfoBaseOUT tupleType = (TupleTypeInfoBaseOUT) typeInfo; + // We can use an empty config here, since we only use the serializer to create + // the top-level case class + tupleSerializer = (TupleSerializerBaseOUT) tupleType.createSerializer(new ExecutionConfig()); + + for (int i = 0; i tupleType.getArity(); i++) { + classes[i] = tupleType.getTypeAt(i).getTypeClass(); + } + + setFieldTypes(classes); + } else { + tupleSerializer = null; + pojoTypeInfo = (PojoTypeInfoOUT) typeInfo; + pojoTypeClass = typeInfo.getTypeClass(); + pojoFieldsName = pojoTypeInfo.getFieldNames(); + + for (int i = 0, arity = pojoTypeInfo.getArity(); i arity; i++) { + classes[i] = pojoTypeInfo.getTypeAt(i).getTypeClass(); + } + + setFieldTypes(classes); + setFieldsOrder(pojoFieldsName); + } + } + + public void setFieldsOrder(String[] fieldsOrder) { --- End diff -- Rename method to `setOrderOfPOJOFields` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/426#discussion_r27018473 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java --- @@ -52,103 +50,86 @@ public static final String DEFAULT_FIELD_DELIMITER = ,; - private transient Object[] parsedValues; - - private byte[] commentPrefix = null; - - // To speed up readRecord processing. Used to find windows line endings. - // It is set when open so that readRecord does not have to evaluate it - private boolean lineDelimiterIsLinebreak = false; - - private transient int commentCount; - private transient int invalidLineCount; - - - public CsvInputFormat(Path filePath) { - super(filePath); - } - - public CsvInputFormat(Path filePath, Class? ... types) { - this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, types); - } + private ClassOUT pojoTypeClass = null; + private String[] pojoFieldsName = null; + private transient Field[] pojoFields = null; + private transient PojoTypeInfoOUT pojoTypeInfo = null; + + public CsvInputFormat(Path filePath, TypeInformationOUT typeInformation) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, typeInformation); + } - public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, Class?... types) { + public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TypeInformationOUT typeInformation) { super(filePath); + Preconditions.checkArgument(typeInformation instanceof CompositeType); + CompositeTypeOUT compositeType = (CompositeTypeOUT) typeInformation; + setDelimiter(lineDelimiter); setFieldDelimiter(fieldDelimiter); - setFieldTypes(types); - } - - - public byte[] getCommentPrefix() { - return commentPrefix; - } - - public void setCommentPrefix(byte[] commentPrefix) { - this.commentPrefix = commentPrefix; - } - - public void setCommentPrefix(char commentPrefix) { - setCommentPrefix(String.valueOf(commentPrefix)); - } - - public void setCommentPrefix(String commentPrefix) { - setCommentPrefix(commentPrefix, Charsets.UTF_8); - } - - public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException { - if (charsetName == null) { - throw new IllegalArgumentException(Charset name must not be null); + Class?[] classes = new Class?[typeInformation.getArity()]; + for (int i = 0, arity = typeInformation.getArity(); i arity; i++) { + classes[i] = compositeType.getTypeAt(i).getTypeClass(); } - - if (commentPrefix != null) { - Charset charset = Charset.forName(charsetName); - setCommentPrefix(commentPrefix, charset); - } else { - this.commentPrefix = null; + setFieldTypes(classes); + + if (typeInformation instanceof PojoTypeInfo) { + pojoTypeInfo = (PojoTypeInfoOUT) typeInformation; + pojoTypeClass = typeInformation.getTypeClass(); + pojoFieldsName = compositeType.getFieldNames(); + setFieldsOrder(pojoFieldsName); } } - - public void setCommentPrefix(String commentPrefix, Charset charset) { - if (charset == null) { - throw new IllegalArgumentException(Charset must not be null); + + public void setFieldsOrder(String[] fieldsOrder) { + Preconditions.checkNotNull(pojoTypeClass, Field ordering feature can be used only with POJO fields.); + Preconditions.checkNotNull(fieldsOrder); + + int includedCount = 0; + for (boolean isIncluded : fieldIncluded) { + if (isIncluded) { + includedCount++; + } } - if (commentPrefix != null) { - this.commentPrefix = commentPrefix.getBytes(charset); - } else { - this.commentPrefix = null; + + Preconditions.checkArgument(includedCount == fieldsOrder.length, + The number of selected POJO fields should be the same as that of CSV fields.); --- End diff -- How about
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/426#discussion_r27018597 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java --- @@ -52,103 +50,86 @@ public static final String DEFAULT_FIELD_DELIMITER = ,; - private transient Object[] parsedValues; - - private byte[] commentPrefix = null; - - // To speed up readRecord processing. Used to find windows line endings. - // It is set when open so that readRecord does not have to evaluate it - private boolean lineDelimiterIsLinebreak = false; - - private transient int commentCount; - private transient int invalidLineCount; - - - public CsvInputFormat(Path filePath) { - super(filePath); - } - - public CsvInputFormat(Path filePath, Class? ... types) { - this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, types); - } + private ClassOUT pojoTypeClass = null; + private String[] pojoFieldsName = null; + private transient Field[] pojoFields = null; + private transient PojoTypeInfoOUT pojoTypeInfo = null; + + public CsvInputFormat(Path filePath, TypeInformationOUT typeInformation) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, typeInformation); + } - public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, Class?... types) { + public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TypeInformationOUT typeInformation) { super(filePath); + Preconditions.checkArgument(typeInformation instanceof CompositeType); + CompositeTypeOUT compositeType = (CompositeTypeOUT) typeInformation; + setDelimiter(lineDelimiter); setFieldDelimiter(fieldDelimiter); - setFieldTypes(types); - } - - - public byte[] getCommentPrefix() { - return commentPrefix; - } - - public void setCommentPrefix(byte[] commentPrefix) { - this.commentPrefix = commentPrefix; - } - - public void setCommentPrefix(char commentPrefix) { - setCommentPrefix(String.valueOf(commentPrefix)); - } - - public void setCommentPrefix(String commentPrefix) { - setCommentPrefix(commentPrefix, Charsets.UTF_8); - } - - public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException { - if (charsetName == null) { - throw new IllegalArgumentException(Charset name must not be null); + Class?[] classes = new Class?[typeInformation.getArity()]; + for (int i = 0, arity = typeInformation.getArity(); i arity; i++) { + classes[i] = compositeType.getTypeAt(i).getTypeClass(); } - - if (commentPrefix != null) { - Charset charset = Charset.forName(charsetName); - setCommentPrefix(commentPrefix, charset); - } else { - this.commentPrefix = null; + setFieldTypes(classes); + + if (typeInformation instanceof PojoTypeInfo) { + pojoTypeInfo = (PojoTypeInfoOUT) typeInformation; + pojoTypeClass = typeInformation.getTypeClass(); + pojoFieldsName = compositeType.getFieldNames(); + setFieldsOrder(pojoFieldsName); } } - - public void setCommentPrefix(String commentPrefix, Charset charset) { - if (charset == null) { - throw new IllegalArgumentException(Charset must not be null); + + public void setFieldsOrder(String[] fieldsOrder) { + Preconditions.checkNotNull(pojoTypeClass, Field ordering feature can be used only with POJO fields.); + Preconditions.checkNotNull(fieldsOrder); + + int includedCount = 0; + for (boolean isIncluded : fieldIncluded) { + if (isIncluded) { + includedCount++; + } } - if (commentPrefix != null) { - this.commentPrefix = commentPrefix.getBytes(charset); - } else { - this.commentPrefix = null; + + Preconditions.checkArgument(includedCount == fieldsOrder.length, + The number of selected POJO fields should be the same as that of CSV fields.); + + for (String field :
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/426#issuecomment-85830275 Hi, I updated this PR. * Remove `pojoType(Class? targetType)` method in `CsvReader` to force the user to explicitly specify the fields order. * Add checking the fields order existence routine in `readCsvFile` method. * Add two integration tests for above 2 modifications. By the way, I cannot find out why Travis fails. In my computer, `mvn clean install -DskipTests` and `mvn verify` succeed. From [travis log](https://travis-ci.org/chiwanpark/flink/jobs/55747686#L7074), It seems that the problem relates with Gelly. Although I read some codes in Gelly, I cannot find what is exactly problem. Could anyone help me with this problem? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/426#issuecomment-84977748 Hi @chiwanpark Thanks for updating the PR! :-) I was gone for a few days. Will have a look at your PR shortly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/426#discussion_r26991431 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java --- @@ -234,9 +301,23 @@ public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws } if (parseRecord(parsedValues, bytes, offset, numBytes)) { - // valid parse, map values into pact record - for (int i = 0; i parsedValues.length; i++) { - reuse.setField(parsedValues[i], i); + if (pojoTypeClass == null) { + // result type is tuple + Tuple result = (Tuple) reuse; + for (int i = 0; i parsedValues.length; i++) { + result.setField(parsedValues[i], i); + } + } else { + // result type is POJO + for (int i = 0; i parsedValues.length; i++) { + try { + pojoFields[i].set(reuse, parsedValues[i]); + } catch (IllegalAccessException e) { + LOG.error(Cannot set value to given type object!, e); --- End diff -- This error will not be caused by invalid data. You should rethrow this exception to kill the job. If the field cannot be set, something is seriously wrong and the job should not continue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/426#discussion_r26992871 --- Diff: flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java --- @@ -684,4 +693,249 @@ private void testRemovingTrailingCR(String lineBreakerInFile, String lineBreaker } } + private void validatePojoItem(CsvInputFormatPojoItem format) throws Exception { + PojoItem item = new PojoItem(); + + format.nextRecord(item); + + assertEquals(123, item.field1); + assertEquals(AAA, item.field2); + assertEquals(Double.valueOf(3.123), item.field3); + assertEquals(BBB, item.field4); + + format.nextRecord(item); + + assertEquals(456, item.field1); + assertEquals(BBB, item.field2); + assertEquals(Double.valueOf(1.123), item.field3); + assertEquals(AAA, item.field4); + } + + @Test + public void testPojoType() throws Exception { + File tempFile = File.createTempFile(CsvReaderPojoType, tmp); + tempFile.deleteOnExit(); + tempFile.setWritable(true); + + OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile)); + wrt.write(123,AAA,3.123,BBB\n); + wrt.write(456,BBB,1.123,AAA\n); + wrt.close(); + + @SuppressWarnings(unchecked) + TypeInformationPojoItem typeInfo = (TypeInformationPojoItem) TypeExtractor.createTypeInfo(PojoItem.class); + CsvInputFormatPojoItem inputFormat = new CsvInputFormatPojoItem(new Path(tempFile.toURI().toString()), typeInfo); + + inputFormat.configure(new Configuration()); + FileInputSplit[] splits = inputFormat.createInputSplits(1); + + inputFormat.open(splits[0]); + + validatePojoItem(inputFormat); + } + + @Test + public void testPojoTypeWithPrivateField() throws Exception { + File tempFile = File.createTempFile(CsvReaderPojoType, tmp); + tempFile.deleteOnExit(); + tempFile.setWritable(true); + + OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile)); + wrt.write(123,AAA,3.123,BBB\n); + wrt.write(456,BBB,1.123,AAA\n); + wrt.close(); + + @SuppressWarnings(unchecked) + TypeInformationPrivatePojoItem typeInfo = (TypeInformationPrivatePojoItem) TypeExtractor.createTypeInfo(PrivatePojoItem.class); + CsvInputFormatPrivatePojoItem inputFormat = new CsvInputFormatPrivatePojoItem(new Path(tempFile.toURI().toString()), typeInfo); + + inputFormat.configure(new Configuration()); + + FileInputSplit[] splits = inputFormat.createInputSplits(1); + inputFormat.open(splits[0]); + + PrivatePojoItem item = new PrivatePojoItem(); + inputFormat.nextRecord(item); --- End diff -- you can use the `validatePojoItem()` method here as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/426#discussion_r26993897 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala --- @@ -247,16 +252,27 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { inputFormat.enableQuotedStringParsing(quoteCharacter); } -val classes: Array[Class[_]] = new Array[Class[_]](typeInfo.getArity) +val classesBuf: ArrayBuffer[Class[_]] = new ArrayBuffer[Class[_]] for (i - 0 until typeInfo.getArity) { - classes(i) = typeInfo.getTypeAt(i).getTypeClass + typeInfo match { +case info: TupleTypeInfoBase[T] = classesBuf += info.getTypeAt(i).getTypeClass() +case info: PojoTypeInfo[T] = + if (includedFields == null || includedFields.indexOf(i) != -1) { --- End diff -- Just add all POJO fields without checking against includedFields. IncludedFields refer to the fields of the CSV file not to the fields of the POJO. The later check ensures that the number of read fields and the number of POJO/Tuple fields is equal. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/426#issuecomment-85261367 Hi @chiwanpark, the PR improved a lot! Besides a few inline comments, the logic and test coverage looks pretty good. Unfortunately, the PR does not seem to build correctly (Check [Travis](https://travis-ci.org/apache/flink/builds/54853098)). I'll do another quick pass tomorrow and have a closer look at the error messages. IMO, there are just two things left to be discussed before we can test it on a cluster: **Order of POJO fields** This is actually a crucial point. We need to make sure, that the field order is deterministic for all setups. I think (not sure!) right now, the order of the fields depends on the order in which the fields are returned by Java's reflection tools. This order needs to be standardized to ensure that all JVMs that obey the standard are compatible. However, even a standardized order (lexicographic order) might not be very helpful. There are several options here: 1. If there the order of fields is standardized, just use that by default. 2. If not, we can deterministically sort the fields ourselves in the PojoTypeInfo. 3. We make the definition of a field order mandatory until we can define the order of POJO fields (e.g., via the proposed `@Position` annotation). I am leaning towards option 3. **Changed Inheritance of ScalaCsvInputFormat** This is a very good observation, however I would keep the original inheritance for now. At the moment, GenericCsvInputFormat is extended by CsvInputFormat (for Java API), ScalaCsvInputFormat (for Scala API), CsvInputFormat (for Record API), and a test class. We will soon remove the RecordAPI such that only the Java and the Scala API will remain. I think we can then move all common operations to GenericCsvInputFormat (maybe we can do that already...). @chiwanpark does that make sense to you or do you disagree? @chiwanpark What do you think? Any other opinions? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/426#discussion_r26993011 --- Diff: flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java --- @@ -684,4 +693,249 @@ private void testRemovingTrailingCR(String lineBreakerInFile, String lineBreaker } } + private void validatePojoItem(CsvInputFormatPojoItem format) throws Exception { + PojoItem item = new PojoItem(); + + format.nextRecord(item); + + assertEquals(123, item.field1); + assertEquals(AAA, item.field2); + assertEquals(Double.valueOf(3.123), item.field3); + assertEquals(BBB, item.field4); + + format.nextRecord(item); + + assertEquals(456, item.field1); + assertEquals(BBB, item.field2); + assertEquals(Double.valueOf(1.123), item.field3); + assertEquals(AAA, item.field4); + } + + @Test + public void testPojoType() throws Exception { + File tempFile = File.createTempFile(CsvReaderPojoType, tmp); + tempFile.deleteOnExit(); + tempFile.setWritable(true); + + OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile)); + wrt.write(123,AAA,3.123,BBB\n); + wrt.write(456,BBB,1.123,AAA\n); + wrt.close(); + + @SuppressWarnings(unchecked) + TypeInformationPojoItem typeInfo = (TypeInformationPojoItem) TypeExtractor.createTypeInfo(PojoItem.class); + CsvInputFormatPojoItem inputFormat = new CsvInputFormatPojoItem(new Path(tempFile.toURI().toString()), typeInfo); + + inputFormat.configure(new Configuration()); + FileInputSplit[] splits = inputFormat.createInputSplits(1); + + inputFormat.open(splits[0]); + + validatePojoItem(inputFormat); + } + + @Test + public void testPojoTypeWithPrivateField() throws Exception { + File tempFile = File.createTempFile(CsvReaderPojoType, tmp); + tempFile.deleteOnExit(); + tempFile.setWritable(true); + + OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile)); + wrt.write(123,AAA,3.123,BBB\n); + wrt.write(456,BBB,1.123,AAA\n); + wrt.close(); + + @SuppressWarnings(unchecked) + TypeInformationPrivatePojoItem typeInfo = (TypeInformationPrivatePojoItem) TypeExtractor.createTypeInfo(PrivatePojoItem.class); + CsvInputFormatPrivatePojoItem inputFormat = new CsvInputFormatPrivatePojoItem(new Path(tempFile.toURI().toString()), typeInfo); + + inputFormat.configure(new Configuration()); + + FileInputSplit[] splits = inputFormat.createInputSplits(1); + inputFormat.open(splits[0]); + + PrivatePojoItem item = new PrivatePojoItem(); + inputFormat.nextRecord(item); + + assertEquals(123, item.field1); + assertEquals(AAA, item.field2); + assertEquals(Double.valueOf(3.123), item.field3); + assertEquals(BBB, item.field4); + + inputFormat.nextRecord(item); + + assertEquals(456, item.field1); + assertEquals(BBB, item.field2); + assertEquals(Double.valueOf(1.123), item.field3); + assertEquals(AAA, item.field4); + } + + @Test + public void testPojoTypeWithMappingInformation() throws Exception { + File tempFile = File.createTempFile(CsvReaderPojoType, tmp); + tempFile.deleteOnExit(); + tempFile.setWritable(true); + + OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile)); + wrt.write(123,3.123,AAA,BBB\n); + wrt.write(456,1.123,BBB,AAA\n); + wrt.close(); + + @SuppressWarnings(unchecked) + TypeInformationPojoItem typeInfo = (TypeInformationPojoItem) TypeExtractor.createTypeInfo(PojoItem.class); + CsvInputFormatPojoItem inputFormat = new CsvInputFormatPojoItem(new Path(tempFile.toURI().toString()), typeInfo); + inputFormat.setFieldsOrder(new String[]{field1, field3, field2, field4}); + + inputFormat.configure(new Configuration()); + FileInputSplit[] splits = inputFormat.createInputSplits(1); + + inputFormat.open(splits[0]); + + validatePojoItem(inputFormat); + } + + @Test + public void testPojoTypeWithPartialFieldInCSV() throws Exception { + File tempFile =
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/426#discussion_r26994167 --- Diff: flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderWithPOJOITCase.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.io; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; + +@RunWith(Parameterized.class) +public class CsvReaderWithPOJOITCase extends MultipleProgramsTestBase { + private String resultPath; + private String expected; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + public CsvReaderWithPOJOITCase(ExecutionMode mode) { + super(mode); + } + + @Before + public void before() throws Exception { + resultPath = tempFolder.newFile(result).toURI().toString(); + } + + @After + public void after() throws Exception { + compareResultsByLinesInMemory(expected, resultPath); + } + + private String createInputData(String data) throws Exception { + File file = tempFolder.newFile(input); + Files.write(data, file, Charsets.UTF_8); + + return file.toURI().toString(); + } + + @Test + public void testPOJOType() throws Exception { + final String dataPath = createInputData(ABC,2.20,3\nDEF,5.1,5\nDEF,3.30,1\nGHI,3.30,10); + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSetPOJOItem data = env.readCsvFile(dataPath).pojoType(POJOItem.class, new String[]{f1, f3, f2}); + DataSetTuple2String, Integer result = data.map(new POJOToTupleMapper()).groupBy(0).sum(1); --- End diff -- I would remove the `POJOToTupleMapper` and the aggregation and instead directly emit the POJO data with `writeAsText()`. This will use the `toString()` method of the POJO to write the data. Simplifies the test and restricts it to the essential part. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/426#discussion_r26993339 --- Diff: flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java --- @@ -219,73 +92,24 @@ public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) { } if (parseRecord(parsedValues, bytes, offset, numBytes)) { - OUT result = serializer.createInstance(parsedValues); - return result; + if (tupleSerializer != null) { + return tupleSerializer.createInstance(parsedValues); + } else { + try { + for (int i = 0; i pojoFields.length; i++) { + pojoFields[i].set(reuse, parsedValues[i]); + } + } catch (IllegalAccessException e) { + LOG.error(Cannot set value to given type object!, e); --- End diff -- same as in the `CsvInputFormat`. Forward the exception to kill the job. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/426#issuecomment-85285193 @fhueske Hi, Thanks for your kindly advice! I will fix them soon. About the order of POJO fields, I think also that the option 3 is good. However, [FLINK-1665](https://issues.apache.org/jira/browse/FLINK-1665) is not implemented yet. I would implement the option 1 and 2 now. After FLINK-1665 completed, we can implement the option 3. About the inheritance of `ScalaCsvInputFormat`, I didn't think about Record API. Your opinion looks good. I will revert the changes and refactor `GenericCsvInputFormat` to contain duplicated methods. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/426#issuecomment-82885499 I updated this PR. * Change method of obtaining `Field` object from using `PojoTypeInfo` to saving field names. (Thanks @fhueske for advice!) * `ScalaCsvInputFormat` extends `CsvInputFormat` because there are many duplicated code between the two classes. * Add integration tests for `CsvReader` (Java API) and `ExecutionEnvironment.readCsvFile` (Scala API) Any feedback is welcome! (especially error message because of my poor english) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/426#issuecomment-78876571 Hello. I have a question about object reuse in `readRecord` method of `ScalaCsvInputFormat`. In java implementation, `CsvInputFormat` reuse result object. But in `ScalaCsvInputFormat`, we don't reuse object and create instance for each record. Why don't `ScalaCsvInputFormat` reuse object? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/426#issuecomment-79066240 @aljoscha Thanks! I understand about it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/426#discussion_r26215940 --- Diff: flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java --- @@ -684,4 +693,178 @@ private void testRemovingTrailingCR(String lineBreakerInFile, String lineBreaker } } + @Test + public void testPojoType() throws Exception { + File tempFile = File.createTempFile(CsvReaderPojoType, tmp); + tempFile.deleteOnExit(); + tempFile.setWritable(true); + + OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile)); + wrt.write(123,AAA,3.123,BBB\n); + wrt.write(456,BBB,1.123,AAA\n); + wrt.close(); + + @SuppressWarnings(unchecked) + TypeInformationPojoItem typeInfo = (TypeInformationPojoItem) TypeExtractor.createTypeInfo(PojoItem.class); + CsvInputFormatPojoItem inputFormat = new CsvInputFormatPojoItem(new Path(tempFile.toURI().toString()), typeInfo); + + Configuration parameters = new Configuration(); + inputFormat.configure(parameters); + + inputFormat.setDelimiter('\n'); + inputFormat.setFieldDelimiter(','); + + FileInputSplit[] splits = inputFormat.createInputSplits(1); + + inputFormat.open(splits[0]); + + PojoItem item = new PojoItem(); + inputFormat.nextRecord(item); + + assertEquals(123, item.field1); + assertEquals(AAA, item.field2); + assertEquals(Double.valueOf(3.123), item.field3); + assertEquals(BBB, item.field4); + + inputFormat.nextRecord(item); + + assertEquals(456, item.field1); + assertEquals(BBB, item.field2); + assertEquals(Double.valueOf(1.123), item.field3); + assertEquals(AAA, item.field4); + } + + @Test + public void testPojoTypeWithPrivateField() throws Exception { + File tempFile = File.createTempFile(CsvReaderPojoType, tmp); + tempFile.deleteOnExit(); + tempFile.setWritable(true); + + OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile)); + wrt.write(123,AAA,3.123,BBB\n); + wrt.write(456,BBB,1.123,AAA\n); + wrt.close(); + + @SuppressWarnings(unchecked) + TypeInformationPrivatePojoItem typeInfo = (TypeInformationPrivatePojoItem) TypeExtractor.createTypeInfo(PrivatePojoItem.class); + CsvInputFormatPrivatePojoItem inputFormat = new CsvInputFormatPrivatePojoItem(new Path(tempFile.toURI().toString()), typeInfo); + + Configuration parameters = new Configuration(); + inputFormat.configure(parameters); + + inputFormat.setDelimiter('\n'); + inputFormat.setFieldDelimiter(','); + + FileInputSplit[] splits = inputFormat.createInputSplits(1); + + inputFormat.open(splits[0]); + + PrivatePojoItem item = new PrivatePojoItem(); + inputFormat.nextRecord(item); + + assertEquals(123, item.field1); + assertEquals(AAA, item.field2); + assertEquals(Double.valueOf(3.123), item.field3); + assertEquals(BBB, item.field4); + + inputFormat.nextRecord(item); + + assertEquals(456, item.field1); + assertEquals(BBB, item.field2); + assertEquals(Double.valueOf(1.123), item.field3); + assertEquals(AAA, item.field4); + } + + @Test + public void testPojoTypeWithMappingInformation() throws Exception { + File tempFile = File.createTempFile(CsvReaderPojoType, tmp); + tempFile.deleteOnExit(); + tempFile.setWritable(true); + + OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile)); + wrt.write(123,3.123,AAA,BBB\n); + wrt.write(456,1.123,BBB,AAA\n); + wrt.close(); + + @SuppressWarnings(unchecked) + TypeInformationPojoItem typeInfo = (TypeInformationPojoItem) TypeExtractor.createTypeInfo(PojoItem.class); + CsvInputFormatPojoItem inputFormat = new CsvInputFormatPojoItem(new Path(tempFile.toURI().toString()), typeInfo); + inputFormat.setFieldsMap(new String[]{field1, field3, field2, field4}); + + Configuration parameters = new Configuration(); + inputFormat.configure(parameters); + + inputFormat.setDelimiter('\n'); + inputFormat.setFieldDelimiter(','); +
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/426#discussion_r26203612 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java --- @@ -64,26 +70,45 @@ private transient int commentCount; private transient int invalidLineCount; + + private CompositeTypeOUT typeInformation = null; + + private String[] fieldsMap = null; + public CsvInputFormat(Path filePath, TypeInformationOUT typeInformation) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, typeInformation); + } - public CsvInputFormat(Path filePath) { - super(filePath); - } - - public CsvInputFormat(Path filePath, Class? ... types) { - this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, types); - } - - public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, Class?... types) { + public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TypeInformationOUT typeInformation) { super(filePath); + Preconditions.checkArgument(typeInformation instanceof CompositeType); + this.typeInformation = (CompositeTypeOUT) typeInformation; + setDelimiter(lineDelimiter); setFieldDelimiter(fieldDelimiter); - setFieldTypes(types); + Class?[] classes = new Class?[typeInformation.getArity()]; + for (int i = 0, arity = typeInformation.getArity(); i arity; i++) { + classes[i] = this.typeInformation.getTypeAt(i).getTypeClass(); + } + setFieldTypes(classes); + + if (typeInformation instanceof PojoTypeInfo) { + setFieldsMap(this.typeInformation.getFieldNames()); + setAccessibleToField(); + } } - - + + public void setAccessibleToField() { --- End diff -- Can we make this method private? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/426#discussion_r26205234 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java --- @@ -152,6 +177,38 @@ public void setFields(boolean[] sourceFieldMask, Class?[] fieldTypes) { public Class?[] getFieldTypes() { return super.getGenericFieldTypes(); } + + public void setFieldsMap(String[] fieldsMap) { + Preconditions.checkNotNull(fieldsMap); + Preconditions.checkState(typeInformation instanceof PojoTypeInfo); + + PojoTypeInfoOUT pojoTypeInfo = (PojoTypeInfoOUT) typeInformation; + + String[] fields = pojoTypeInfo.getFieldNames(); + Class?[] fieldTypes = getFieldTypes(); + this.fieldsMap = Arrays.copyOfRange(fieldsMap, 0, fieldsMap.length); + + boolean[] includeMask = new boolean[fieldsMap.length]; --- End diff -- The ``includeMask`` refers to the fields in the CsvFile and allows to skip fields of the file. For example if a line in your file looks like: ``Sam,Smith,09-15-1963,123.123``and you only want to read the first name and the date field, you would set the ``includeMask`` to ``[true, false, true]`` (missing fields are treated as ``false``). So the ``includeMask`` should not depend on the ``fieldsMap``, but the number of ``true`` entries in the ``includeMask`` must be equal to the number for fields in the ``fieldsMap``. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/426#discussion_r26206307 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java --- @@ -234,9 +291,29 @@ public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws } if (parseRecord(parsedValues, bytes, offset, numBytes)) { - // valid parse, map values into pact record - for (int i = 0; i parsedValues.length; i++) { - reuse.setField(parsedValues[i], i); + if (typeInformation instanceof TupleTypeInfoBase) { + // result type is tuple + Tuple result = (Tuple) reuse; + for (int i = 0; i parsedValues.length; i++) { + result.setField(parsedValues[i], i); + } + } else { + // result type is POJO + PojoTypeInfoOUT pojoTypeInfo = (PojoTypeInfoOUT) typeInformation; + for (int i = 0; i parsedValues.length; i++) { + if (fieldsMap[i] == null) { --- End diff -- May not be null. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/426#discussion_r26206381 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java --- @@ -234,9 +291,29 @@ public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws } if (parseRecord(parsedValues, bytes, offset, numBytes)) { - // valid parse, map values into pact record - for (int i = 0; i parsedValues.length; i++) { - reuse.setField(parsedValues[i], i); + if (typeInformation instanceof TupleTypeInfoBase) { + // result type is tuple + Tuple result = (Tuple) reuse; + for (int i = 0; i parsedValues.length; i++) { + result.setField(parsedValues[i], i); + } + } else { + // result type is POJO + PojoTypeInfoOUT pojoTypeInfo = (PojoTypeInfoOUT) typeInformation; + for (int i = 0; i parsedValues.length; i++) { + if (fieldsMap[i] == null) { + continue; + } + + try { + int fieldIndex = typeInformation.getFieldIndex(fieldsMap[i]); + pojoTypeInfo.getPojoFieldAt(fieldIndex).field.set(reuse, parsedValues[i]); --- End diff -- Same for the ``field``. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/426#discussion_r26205969 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java --- @@ -152,6 +177,38 @@ public void setFields(boolean[] sourceFieldMask, Class?[] fieldTypes) { public Class?[] getFieldTypes() { return super.getGenericFieldTypes(); } + + public void setFieldsMap(String[] fieldsMap) { + Preconditions.checkNotNull(fieldsMap); + Preconditions.checkState(typeInformation instanceof PojoTypeInfo); + + PojoTypeInfoOUT pojoTypeInfo = (PojoTypeInfoOUT) typeInformation; + + String[] fields = pojoTypeInfo.getFieldNames(); + Class?[] fieldTypes = getFieldTypes(); + this.fieldsMap = Arrays.copyOfRange(fieldsMap, 0, fieldsMap.length); + + boolean[] includeMask = new boolean[fieldsMap.length]; + Class?[] newFieldTypes = new Class?[fieldsMap.length]; + + for (int i = 0; i fieldsMap.length; i++) { + if (fieldsMap[i] == null) { + includeMask[i] = false; + newFieldTypes[i] = null; + continue; + } + + for (int j = 0; j fields.length; j++) { + if (fields[j].equals(fieldsMap[i])) { + includeMask[i] = true; + newFieldTypes[i] = fieldTypes[j]; + break; + } + } --- End diff -- Can you throw an exception if the provided field name was not found in the POJO type information? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/426#discussion_r26206133 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java --- @@ -152,6 +177,38 @@ public void setFields(boolean[] sourceFieldMask, Class?[] fieldTypes) { public Class?[] getFieldTypes() { return super.getGenericFieldTypes(); } + + public void setFieldsMap(String[] fieldsMap) { + Preconditions.checkNotNull(fieldsMap); + Preconditions.checkState(typeInformation instanceof PojoTypeInfo); + + PojoTypeInfoOUT pojoTypeInfo = (PojoTypeInfoOUT) typeInformation; + + String[] fields = pojoTypeInfo.getFieldNames(); + Class?[] fieldTypes = getFieldTypes(); + this.fieldsMap = Arrays.copyOfRange(fieldsMap, 0, fieldsMap.length); + + boolean[] includeMask = new boolean[fieldsMap.length]; + Class?[] newFieldTypes = new Class?[fieldsMap.length]; + + for (int i = 0; i fieldsMap.length; i++) { + if (fieldsMap[i] == null) { --- End diff -- IMO, ``null`` values should not be allowed in the ``fieldsMap``. Can you throw an exception in that case? The ``fieldsMap`` should be a list of fields that are mapped to columns in the CSV file. As said before, the columns that are read by the format are defined by the ``includeMask``. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/426#discussion_r26206329 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java --- @@ -234,9 +291,29 @@ public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws } if (parseRecord(parsedValues, bytes, offset, numBytes)) { - // valid parse, map values into pact record - for (int i = 0; i parsedValues.length; i++) { - reuse.setField(parsedValues[i], i); + if (typeInformation instanceof TupleTypeInfoBase) { + // result type is tuple + Tuple result = (Tuple) reuse; + for (int i = 0; i parsedValues.length; i++) { + result.setField(parsedValues[i], i); + } + } else { + // result type is POJO + PojoTypeInfoOUT pojoTypeInfo = (PojoTypeInfoOUT) typeInformation; + for (int i = 0; i parsedValues.length; i++) { + if (fieldsMap[i] == null) { + continue; + } + + try { + int fieldIndex = typeInformation.getFieldIndex(fieldsMap[i]); --- End diff -- We could compute the index upfront and avoid the String look-up. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/426#issuecomment-78401359 @fhueske Thanks for your kindly advice. I will fix as soon as possible. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/426#issuecomment-77912526 @teabot: We've created this JIRA for the feature you've suggested: https://issues.apache.org/jira/browse/FLINK-1665 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/426#discussion_r25846058 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java --- @@ -64,15 +66,25 @@ private transient int commentCount; private transient int invalidLineCount; + + private PojoTypeInfoOUT pojoTypeInfo = null; public CsvInputFormat(Path filePath) { super(filePath); - } + } + + public CsvInputFormat(Path filePath, PojoTypeInfoOUT pojoTypeInfo) { + super(filePath); + + Preconditions.checkNotNull(pojoTypeInfo, The TypeInformation is required for getting the POJO fields.); + this.pojoTypeInfo = pojoTypeInfo; + setAccessibleToField(); + } public CsvInputFormat(Path filePath, Class? ... types) { --- End diff -- Yes, if we go for the more general TypeInformation we need to check whether it is a TupleTypeInfo or PojoTypeInfo and can get rid of one pair of constructors. Actually, I think both solutions are fine. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user teabot commented on the pull request: https://github.com/apache/flink/pull/426#issuecomment-77347164 I like that this request adds what I would consider to be a very useful feature. Data is often stored in text delimited formats and one of the nice features of Flink is the ability to work with POJOs. Therefore some mechanism to map from delimited files to POJOs is useful. However, I have found that such mappings are rarely restricted to the simple cases. Therefore might I suggest that some other factors be considered as part of this request so as not to limit the usefulness of and to provide some extensibility with this API change: * The position fields within the delimited file and how they are mapped to POJO properties could indeed be modelled with a `String[]` as suggested. However, this will be arduous to maintain and prone to human error for types with a large number of fields. An alternative could be to have a `@Position` annotation on the POJO object fields to indicate the CSV column index. ``` public class MyPojo { @Position(0) public int id; @Position(1) public String name; ... ``` * Flink + POJOs bring the benefit of richer types to data processing pipelines. But it seems to me that this implementation imposes a restriction on the range of types that can be used within the target POJO type negating this benefit somewhat. If I want to use a POJO with a `DateTime` field then I must still create my own mapping function to do so. Therefore It would be useful to provide some hook into the CSV-POJO mapping process to allow the specification of user declared/defined type converters. This can then enable users to easily map to field types of their choosing. Again, this could be modelled as an annotation: ``` public class MyPojo { // DateTimeConverter implemented by the user @Converter(type=DateTimeConverter.class, properties=format=/MM/dd:HH;timezone=Europe/London) public DateTime id; ... ``` You might consider that these additional features are better served by some separate type-mapping component or API (and I'd probably agree). But in that case is it then wise to also add a simpler, less flexible form to the core Flink API? Thank you for your time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/426#issuecomment-77293135 @fhueske Oh, you are right. Currently, users cannot decide order of fields. I will add a parameter to set order of fields. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/426#issuecomment-77272091 How are fields in the CSV file mapped to POJO fields? I assume it is the order of fields in the POJO type information, right? Is that order the same as in the POJO definition or some other such as alphanumeric ordering of the field name? This might not be obvious for users. Would it make sense to add a `String[]` to map POJO fields to field positions in the CSV file, i.e., `String[] {name, age, zip}` would map the POJO field `name` to the first CSV field, and so on. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/426#discussion_r25760453 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java --- @@ -235,8 +252,21 @@ public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws if (parseRecord(parsedValues, bytes, offset, numBytes)) { // valid parse, map values into pact record - for (int i = 0; i parsedValues.length; i++) { - reuse.setField(parsedValues[i], i); + if (pojoTypeInfo == null) { + Tuple result = (Tuple) reuse; + for (int i = 0; i parsedValues.length; i++) { + result.setField(parsedValues[i], i); + } + } else { + for (int i = 0; i parsedValues.length; i++) { + try { + pojoTypeInfo.getPojoFieldAt(i).field.set(reuse, parsedValues[i]); --- End diff -- @rmetzger Thanks! I modify my implementation to set the fields accessible in `CsvInputFormat` and `ScalaCsvInputFormat` and add a test case with private fields. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/426#discussion_r25237152 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java --- @@ -82,6 +92,13 @@ public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter setFieldTypes(types); } + + public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, PojoTypeInfoOUT pojoTypeInfo) { + this(filePath, lineDelimiter, fieldDelimiter); + + Preconditions.checkNotNull(pojoTypeInfo, Type information must be required to set values to pojo); --- End diff -- same here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/426#discussion_r25237861 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java --- @@ -235,8 +252,21 @@ public OUT readRecord(OUT reuse, byte[] bytes, int offset, int numBytes) throws if (parseRecord(parsedValues, bytes, offset, numBytes)) { // valid parse, map values into pact record - for (int i = 0; i parsedValues.length; i++) { - reuse.setField(parsedValues[i], i); + if (pojoTypeInfo == null) { + Tuple result = (Tuple) reuse; + for (int i = 0; i parsedValues.length; i++) { + result.setField(parsedValues[i], i); + } + } else { + for (int i = 0; i parsedValues.length; i++) { + try { + pojoTypeInfo.getPojoFieldAt(i).field.set(reuse, parsedValues[i]); --- End diff -- I'm not 100% sure if this also works for pojo's with private fields. In your testcase, all the fields are public. I'm not sure where exactly we are setting the fields accessible, but it might be the case that they are not set accessible here. Can you add a private field with a getter/setter to the test pojo to validate this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...
GitHub user chiwanpark opened a pull request: https://github.com/apache/flink/pull/426 [FLINK-1512] Add CsvReader for reading into POJOs. This PR contains following changes. * `CsvInputFormat` and `ScalaCsvInputFormat` can receive POJO type as generic parameter * Add `pojoType(ClassT targetType)` into `CsvReader` (Java API) * Modify `readCsvFile` method in `ExecutionEnvironment` (Scala API) * Add unit tests for `CsvInputFormat` and `ScalaCsvInputFormat` You can merge this pull request into a Git repository by running: $ git pull https://github.com/chiwanpark/flink FLINK-1512 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/426.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #426 commit 2463d6b7d244528e1625288e1b780335769f14ee Author: Chiwan Park chiwanp...@icloud.com Date: 2015-02-18T18:27:59Z [FLINK-1512] [java api] Add CsvReader for reading into POJOs commit 8fe5f8d1bd402382e6fa93014c5b2fec8e22cbd0 Author: Chiwan Park chiwanp...@icloud.com Date: 2015-02-19T17:23:56Z [FLINK-1512] [scala api] Add CsvReader for reading into POJOs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---