[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat
[ https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15011916#comment-15011916 ] ASF GitHub Bot commented on FLINK-2692: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1266 > Untangle CsvInputFormat into PojoTypeCsvInputFormat and > TupleTypeCsvInputFormat > > > Key: FLINK-2692 > URL: https://issues.apache.org/jira/browse/FLINK-2692 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Minor > > The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a > {{Pojo}} type. As a consequence, the processing logic, which has to work for > both types, is overly complex. For example, the {{CsvInputFormat}} contains > fields which are only used when a Pojo is returned. Moreover, the pojo field > information are constructed by calling setter methods which have to be called > in a very specific order, otherwise they fail. E.g. one first has to call > {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the > number of fields might be different. Furthermore, some of the methods can > only be called if the return type is a {{Pojo}} type, because they expect > that a {{PojoTypeInfo}} is present. > I think the {{CsvInputFormat}} should be refactored to make the code more > easily maintainable. I propose to split it up into a > {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all > the required information via their constructors instead of using the > {{setFields}} and {{setOrderOfPOJOFields}} approach. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat
[ https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15011917#comment-15011917 ] Chesnay Schepler commented on FLINK-2692: - Implemented in bd61f2dbdf1a0215363ffa8416329e1dbf277593 > Untangle CsvInputFormat into PojoTypeCsvInputFormat and > TupleTypeCsvInputFormat > > > Key: FLINK-2692 > URL: https://issues.apache.org/jira/browse/FLINK-2692 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Minor > > The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a > {{Pojo}} type. As a consequence, the processing logic, which has to work for > both types, is overly complex. For example, the {{CsvInputFormat}} contains > fields which are only used when a Pojo is returned. Moreover, the pojo field > information are constructed by calling setter methods which have to be called > in a very specific order, otherwise they fail. E.g. one first has to call > {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the > number of fields might be different. Furthermore, some of the methods can > only be called if the return type is a {{Pojo}} type, because they expect > that a {{PojoTypeInfo}} is present. > I think the {{CsvInputFormat}} should be refactored to make the code more > easily maintainable. I propose to split it up into a > {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all > the required information via their constructors instead of using the > {{setFields}} and {{setOrderOfPOJOFields}} approach. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat
[ https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15010918#comment-15010918 ] ASF GitHub Bot commented on FLINK-2692: --- Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1266#issuecomment-157702242 Merging this. > Untangle CsvInputFormat into PojoTypeCsvInputFormat and > TupleTypeCsvInputFormat > > > Key: FLINK-2692 > URL: https://issues.apache.org/jira/browse/FLINK-2692 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Minor > > The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a > {{Pojo}} type. As a consequence, the processing logic, which has to work for > both types, is overly complex. For example, the {{CsvInputFormat}} contains > fields which are only used when a Pojo is returned. Moreover, the pojo field > information are constructed by calling setter methods which have to be called > in a very specific order, otherwise they fail. E.g. one first has to call > {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the > number of fields might be different. Furthermore, some of the methods can > only be called if the return type is a {{Pojo}} type, because they expect > that a {{PojoTypeInfo}} is present. > I think the {{CsvInputFormat}} should be refactored to make the code more > easily maintainable. I propose to split it up into a > {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all > the required information via their constructors instead of using the > {{setFields}} and {{setOrderOfPOJOFields}} approach. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat
[ https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15010732#comment-15010732 ] ASF GitHub Bot commented on FLINK-2692: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1266#issuecomment-157675819 I think it's good :+1: > Untangle CsvInputFormat into PojoTypeCsvInputFormat and > TupleTypeCsvInputFormat > > > Key: FLINK-2692 > URL: https://issues.apache.org/jira/browse/FLINK-2692 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Minor > > The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a > {{Pojo}} type. As a consequence, the processing logic, which has to work for > both types, is overly complex. For example, the {{CsvInputFormat}} contains > fields which are only used when a Pojo is returned. Moreover, the pojo field > information are constructed by calling setter methods which have to be called > in a very specific order, otherwise they fail. E.g. one first has to call > {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the > number of fields might be different. Furthermore, some of the methods can > only be called if the return type is a {{Pojo}} type, because they expect > that a {{PojoTypeInfo}} is present. > I think the {{CsvInputFormat}} should be refactored to make the code more > easily maintainable. I propose to split it up into a > {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all > the required information via their constructors instead of using the > {{setFields}} and {{setOrderOfPOJOFields}} approach. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat
[ https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15010656#comment-15010656 ] ASF GitHub Bot commented on FLINK-2692: --- Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1266#issuecomment-157662815 any other comments? I'll merge it other wise later on. > Untangle CsvInputFormat into PojoTypeCsvInputFormat and > TupleTypeCsvInputFormat > > > Key: FLINK-2692 > URL: https://issues.apache.org/jira/browse/FLINK-2692 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Minor > > The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a > {{Pojo}} type. As a consequence, the processing logic, which has to work for > both types, is overly complex. For example, the {{CsvInputFormat}} contains > fields which are only used when a Pojo is returned. Moreover, the pojo field > information are constructed by calling setter methods which have to be called > in a very specific order, otherwise they fail. E.g. one first has to call > {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the > number of fields might be different. Furthermore, some of the methods can > only be called if the return type is a {{Pojo}} type, because they expect > that a {{PojoTypeInfo}} is present. > I think the {{CsvInputFormat}} should be refactored to make the code more > easily maintainable. I propose to split it up into a > {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all > the required information via their constructors instead of using the > {{setFields}} and {{setOrderOfPOJOFields}} approach. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat
[ https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15000309#comment-15000309 ] ASF GitHub Bot commented on FLINK-2692: --- Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1266#issuecomment-155764846 @fhueske I've addressed your comments. > Untangle CsvInputFormat into PojoTypeCsvInputFormat and > TupleTypeCsvInputFormat > > > Key: FLINK-2692 > URL: https://issues.apache.org/jira/browse/FLINK-2692 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Minor > > The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a > {{Pojo}} type. As a consequence, the processing logic, which has to work for > both types, is overly complex. For example, the {{CsvInputFormat}} contains > fields which are only used when a Pojo is returned. Moreover, the pojo field > information are constructed by calling setter methods which have to be called > in a very specific order, otherwise they fail. E.g. one first has to call > {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the > number of fields might be different. Furthermore, some of the methods can > only be called if the return type is a {{Pojo}} type, because they expect > that a {{PojoTypeInfo}} is present. > I think the {{CsvInputFormat}} should be refactored to make the code more > easily maintainable. I propose to split it up into a > {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all > the required information via their constructors instead of using the > {{setFields}} and {{setOrderOfPOJOFields}} approach. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat
[ https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998926#comment-14998926 ] ASF GitHub Bot commented on FLINK-2692: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1266#discussion_r44434469 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java --- @@ -18,32 +18,97 @@ package org.apache.flink.api.java.io; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; +import org.apache.flink.api.common.io.GenericCsvInputFormat; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.types.parser.FieldParser; -import org.apache.flink.api.common.typeutils.CompositeType; -import org.apache.flink.api.java.tuple.Tuple; +import java.io.IOException; import org.apache.flink.core.fs.Path; import org.apache.flink.util.StringUtils; -public class CsvInputFormat extends CommonCsvInputFormat { +public abstract class CsvInputFormat extends GenericCsvInputFormat { private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + protected transient Object[] parsedValues; - public CsvInputFormat(Path filePath, CompositeType typeInformation) { - super(filePath, typeInformation); + protected CsvInputFormat(Path filePath) { + super(filePath); } - - public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, CompositeType typeInformation) { - super(filePath, lineDelimiter, fieldDelimiter, typeInformation); + + @Override + public void open(FileInputSplit split) throws IOException { + super.open(split); + + @SuppressWarnings("unchecked") + FieldParser[] fieldParsers = (FieldParser[]) getFieldParsers(); + + //throw exception if no field parsers are available + if (fieldParsers.length == 0) { + throw new IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to parse input"); + } + + // create the value holders + this.parsedValues = new Object[fieldParsers.length]; + for (int i = 0; i < fieldParsers.length; i++) { + this.parsedValues[i] = fieldParsers[i].createValue(); + } + + // left to right evaluation makes access [0] okay + // this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default + if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) { + this.lineDelimiterIsLinebreak = true; + } + + this.commentCount = 0; + this.invalidLineCount = 0; } @Override - protected OUT createTuple(OUT reuse) { - Tuple result = (Tuple) reuse; - for (int i = 0; i < parsedValues.length; i++) { - result.setField(parsedValues[i], i); + public OUT nextRecord(OUT record) throws IOException { + OUT returnRecord = null; + do { + returnRecord = super.nextRecord(record); + } while (returnRecord == null && !reachedEnd()); + + return returnRecord; + } + + public Class[] getFieldTypes() { + return super.getGenericFieldTypes(); + } + + protected static boolean[] createDefaultMask(int size) { + boolean[] includedMask = new boolean[size]; + for (int x=0; xUntangle CsvInputFormat into PojoTypeCsvInputFormat and > TupleTypeCsvInputFormat > > > Key: FLINK-2692 > URL: https://issues.apache.org/jira/browse/FLINK-2692 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Minor > > The
[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat
[ https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998999#comment-14998999 ] ASF GitHub Bot commented on FLINK-2692: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1266#discussion_r44439588 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java --- @@ -18,32 +18,97 @@ package org.apache.flink.api.java.io; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; +import org.apache.flink.api.common.io.GenericCsvInputFormat; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.types.parser.FieldParser; -import org.apache.flink.api.common.typeutils.CompositeType; -import org.apache.flink.api.java.tuple.Tuple; +import java.io.IOException; import org.apache.flink.core.fs.Path; import org.apache.flink.util.StringUtils; -public class CsvInputFormat extends CommonCsvInputFormat { +public abstract class CsvInputFormat extends GenericCsvInputFormat { private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + protected transient Object[] parsedValues; - public CsvInputFormat(Path filePath, CompositeType typeInformation) { - super(filePath, typeInformation); + protected CsvInputFormat(Path filePath) { + super(filePath); } - - public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, CompositeType typeInformation) { - super(filePath, lineDelimiter, fieldDelimiter, typeInformation); + + @Override + public void open(FileInputSplit split) throws IOException { + super.open(split); + + @SuppressWarnings("unchecked") + FieldParser[] fieldParsers = (FieldParser[]) getFieldParsers(); + + //throw exception if no field parsers are available + if (fieldParsers.length == 0) { + throw new IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to parse input"); + } + + // create the value holders + this.parsedValues = new Object[fieldParsers.length]; + for (int i = 0; i < fieldParsers.length; i++) { + this.parsedValues[i] = fieldParsers[i].createValue(); + } + + // left to right evaluation makes access [0] okay + // this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default + if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) { + this.lineDelimiterIsLinebreak = true; + } + + this.commentCount = 0; + this.invalidLineCount = 0; } @Override - protected OUT createTuple(OUT reuse) { - Tuple result = (Tuple) reuse; - for (int i = 0; i < parsedValues.length; i++) { - result.setField(parsedValues[i], i); + public OUT nextRecord(OUT record) throws IOException { + OUT returnRecord = null; + do { + returnRecord = super.nextRecord(record); + } while (returnRecord == null && !reachedEnd()); + + return returnRecord; + } + + public Class[] getFieldTypes() { + return super.getGenericFieldTypes(); + } + + protected static boolean[] createDefaultMask(int size) { + boolean[] includedMask = new boolean[size]; + for (int x=0; xUntangle CsvInputFormat into PojoTypeCsvInputFormat and > TupleTypeCsvInputFormat > > > Key: FLINK-2692 >
[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat
[ https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998985#comment-14998985 ] ASF GitHub Bot commented on FLINK-2692: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1266#discussion_r44438028 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java --- @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.io; + +import com.google.common.base.Preconditions; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +public class PojoCsvInputFormat extends CsvInputFormat { + + private static final long serialVersionUID = 1L; + + private Class pojoTypeClass; + + private String[] pojoFieldNames; + + private transient PojoTypeInfo pojoTypeInfo; + private transient Field[] pojoFields; + + public PojoCsvInputFormat(Path filePath, PojoTypeInfo pojoTypeInfo) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, pojoTypeInfo); + } + + public PojoCsvInputFormat(Path filePath, PojoTypeInfo pojoTypeInfo, String[] fieldNames) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, pojoTypeInfo, fieldNames, createDefaultMask(pojoTypeInfo.getArity())); + } + + public PojoCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, PojoTypeInfo pojoTypeInfo) { + this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, pojoTypeInfo.getFieldNames(), createDefaultMask(pojoTypeInfo.getArity())); + } + + public PojoCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, PojoTypeInfo pojoTypeInfo, String[] fieldNames) { + this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, fieldNames, createDefaultMask(fieldNames.length)); + } + + public PojoCsvInputFormat(Path filePath, PojoTypeInfo pojoTypeInfo, int[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, pojoTypeInfo, pojoTypeInfo.getFieldNames(), toBooleanMask(includedFieldsMask)); + } + + public PojoCsvInputFormat(Path filePath, PojoTypeInfo pojoTypeInfo, String[] fieldNames, int[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, pojoTypeInfo, fieldNames, includedFieldsMask); + } + + public PojoCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, PojoTypeInfo pojoTypeInfo, int[] includedFieldsMask) { + this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, pojoTypeInfo.getFieldNames(), includedFieldsMask); + } + + public PojoCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, PojoTypeInfo pojoTypeInfo, String[] fieldNames, int[] includedFieldsMask) { + super(filePath); + boolean[] mask = (includedFieldsMask == null) + ? createDefaultMask(fieldNames.length) + : toBooleanMask(includedFieldsMask); + configure(lineDelimiter, fieldDelimiter, pojoTypeInfo, fieldNames, mask); + } + + public PojoCsvInputFormat(Path filePath, PojoTypeInfo pojoTypeInfo, boolean[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, pojoTypeInfo, pojoTypeInfo.getFieldNames(), includedFieldsMask); + } + + public PojoCsvInputFormat(Path filePath, PojoTypeInfo pojoTypeInfo, String[] fieldNames, boolean[] includedFieldsMask) { + this(filePath, DEFAULT_LINE_DELIMITER,
[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat
[ https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14999015#comment-14999015 ] ASF GitHub Bot commented on FLINK-2692: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1266#issuecomment-155515305 I agree with @aljoscha and you on the `readRecord()` code. It would be nice to have the common parts of `readRecord()` in the `CsvInputFormat` and specific `fillRecord` in the tuple and POJO formats. Otherwise, the PR looks really good. > Untangle CsvInputFormat into PojoTypeCsvInputFormat and > TupleTypeCsvInputFormat > > > Key: FLINK-2692 > URL: https://issues.apache.org/jira/browse/FLINK-2692 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Minor > > The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a > {{Pojo}} type. As a consequence, the processing logic, which has to work for > both types, is overly complex. For example, the {{CsvInputFormat}} contains > fields which are only used when a Pojo is returned. Moreover, the pojo field > information are constructed by calling setter methods which have to be called > in a very specific order, otherwise they fail. E.g. one first has to call > {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the > number of fields might be different. Furthermore, some of the methods can > only be called if the return type is a {{Pojo}} type, because they expect > that a {{PojoTypeInfo}} is present. > I think the {{CsvInputFormat}} should be refactored to make the code more > easily maintainable. I propose to split it up into a > {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all > the required information via their constructors instead of using the > {{setFields}} and {{setOrderOfPOJOFields}} approach. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat
[ https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14999027#comment-14999027 ] ASF GitHub Bot commented on FLINK-2692: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1266#discussion_r1239 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java --- @@ -18,32 +18,97 @@ package org.apache.flink.api.java.io; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; +import org.apache.flink.api.common.io.GenericCsvInputFormat; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.types.parser.FieldParser; -import org.apache.flink.api.common.typeutils.CompositeType; -import org.apache.flink.api.java.tuple.Tuple; +import java.io.IOException; import org.apache.flink.core.fs.Path; import org.apache.flink.util.StringUtils; -public class CsvInputFormat extends CommonCsvInputFormat { +public abstract class CsvInputFormat extends GenericCsvInputFormat { private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + protected transient Object[] parsedValues; - public CsvInputFormat(Path filePath, CompositeType typeInformation) { - super(filePath, typeInformation); + protected CsvInputFormat(Path filePath) { + super(filePath); } - - public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, CompositeType typeInformation) { - super(filePath, lineDelimiter, fieldDelimiter, typeInformation); + + @Override + public void open(FileInputSplit split) throws IOException { + super.open(split); + + @SuppressWarnings("unchecked") + FieldParser[] fieldParsers = (FieldParser[]) getFieldParsers(); + + //throw exception if no field parsers are available + if (fieldParsers.length == 0) { + throw new IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to parse input"); + } + + // create the value holders + this.parsedValues = new Object[fieldParsers.length]; + for (int i = 0; i < fieldParsers.length; i++) { + this.parsedValues[i] = fieldParsers[i].createValue(); + } + + // left to right evaluation makes access [0] okay + // this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default + if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) { + this.lineDelimiterIsLinebreak = true; + } + + this.commentCount = 0; + this.invalidLineCount = 0; } @Override - protected OUT createTuple(OUT reuse) { - Tuple result = (Tuple) reuse; - for (int i = 0; i < parsedValues.length; i++) { - result.setField(parsedValues[i], i); + public OUT nextRecord(OUT record) throws IOException { + OUT returnRecord = null; + do { + returnRecord = super.nextRecord(record); + } while (returnRecord == null && !reachedEnd()); + + return returnRecord; + } + + public Class[] getFieldTypes() { + return super.getGenericFieldTypes(); + } + + protected static boolean[] createDefaultMask(int size) { + boolean[] includedMask = new boolean[size]; + for (int x=0; xUntangle CsvInputFormat into PojoTypeCsvInputFormat and > TupleTypeCsvInputFormat > > > Key: FLINK-2692 > URL: https://issues.apache.org/jira/browse/FLINK-2692 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Minor > > The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a > {{Pojo}} type. As a consequence, the processing logic,
[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat
[ https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998914#comment-14998914 ] ASF GitHub Bot commented on FLINK-2692: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1266#discussion_r44434150 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java --- @@ -18,32 +18,97 @@ package org.apache.flink.api.java.io; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; +import org.apache.flink.api.common.io.GenericCsvInputFormat; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.types.parser.FieldParser; -import org.apache.flink.api.common.typeutils.CompositeType; -import org.apache.flink.api.java.tuple.Tuple; +import java.io.IOException; import org.apache.flink.core.fs.Path; import org.apache.flink.util.StringUtils; -public class CsvInputFormat extends CommonCsvInputFormat { +public abstract class CsvInputFormat extends GenericCsvInputFormat { private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + protected transient Object[] parsedValues; - public CsvInputFormat(Path filePath, CompositeType typeInformation) { - super(filePath, typeInformation); + protected CsvInputFormat(Path filePath) { + super(filePath); } - - public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, CompositeType typeInformation) { - super(filePath, lineDelimiter, fieldDelimiter, typeInformation); + + @Override + public void open(FileInputSplit split) throws IOException { + super.open(split); + + @SuppressWarnings("unchecked") + FieldParser[] fieldParsers = (FieldParser[]) getFieldParsers(); + + //throw exception if no field parsers are available + if (fieldParsers.length == 0) { + throw new IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to parse input"); + } + + // create the value holders + this.parsedValues = new Object[fieldParsers.length]; + for (int i = 0; i < fieldParsers.length; i++) { + this.parsedValues[i] = fieldParsers[i].createValue(); + } + + // left to right evaluation makes access [0] okay + // this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default + if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) { + this.lineDelimiterIsLinebreak = true; + } + + this.commentCount = 0; + this.invalidLineCount = 0; } @Override - protected OUT createTuple(OUT reuse) { - Tuple result = (Tuple) reuse; - for (int i = 0; i < parsedValues.length; i++) { - result.setField(parsedValues[i], i); + public OUT nextRecord(OUT record) throws IOException { + OUT returnRecord = null; + do { + returnRecord = super.nextRecord(record); + } while (returnRecord == null && !reachedEnd()); + + return returnRecord; + } + + public Class[] getFieldTypes() { + return super.getGenericFieldTypes(); + } + + protected static boolean[] createDefaultMask(int size) { --- End diff -- Isn't the default that fields are read one after the other from the start of a line? Why do we need this method then? > Untangle CsvInputFormat into PojoTypeCsvInputFormat and > TupleTypeCsvInputFormat > > > Key: FLINK-2692 > URL: https://issues.apache.org/jira/browse/FLINK-2692 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Minor > > The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a > {{Pojo}} type. As a consequence, the processing logic, which has to work for > both types, is overly complex. For example, the {{CsvInputFormat}} contains > fields which are only used when a Pojo is returned. Moreover, the pojo field > information are constructed by calling setter methods which have to be called > in a very specific order, otherwise they fail. E.g. one
[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat
[ https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998947#comment-14998947 ] ASF GitHub Bot commented on FLINK-2692: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1266#discussion_r44435810 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java --- @@ -18,32 +18,97 @@ package org.apache.flink.api.java.io; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; +import org.apache.flink.api.common.io.GenericCsvInputFormat; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.types.parser.FieldParser; -import org.apache.flink.api.common.typeutils.CompositeType; -import org.apache.flink.api.java.tuple.Tuple; +import java.io.IOException; import org.apache.flink.core.fs.Path; import org.apache.flink.util.StringUtils; -public class CsvInputFormat extends CommonCsvInputFormat { +public abstract class CsvInputFormat extends GenericCsvInputFormat { private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + protected transient Object[] parsedValues; - public CsvInputFormat(Path filePath, CompositeType typeInformation) { - super(filePath, typeInformation); + protected CsvInputFormat(Path filePath) { + super(filePath); } - - public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, CompositeType typeInformation) { - super(filePath, lineDelimiter, fieldDelimiter, typeInformation); + + @Override + public void open(FileInputSplit split) throws IOException { + super.open(split); + + @SuppressWarnings("unchecked") + FieldParser[] fieldParsers = (FieldParser[]) getFieldParsers(); + + //throw exception if no field parsers are available + if (fieldParsers.length == 0) { + throw new IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to parse input"); + } + + // create the value holders + this.parsedValues = new Object[fieldParsers.length]; + for (int i = 0; i < fieldParsers.length; i++) { + this.parsedValues[i] = fieldParsers[i].createValue(); + } + + // left to right evaluation makes access [0] okay + // this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default + if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) { + this.lineDelimiterIsLinebreak = true; + } + + this.commentCount = 0; + this.invalidLineCount = 0; } @Override - protected OUT createTuple(OUT reuse) { - Tuple result = (Tuple) reuse; - for (int i = 0; i < parsedValues.length; i++) { - result.setField(parsedValues[i], i); + public OUT nextRecord(OUT record) throws IOException { + OUT returnRecord = null; + do { + returnRecord = super.nextRecord(record); + } while (returnRecord == null && !reachedEnd()); + + return returnRecord; + } + + public Class[] getFieldTypes() { + return super.getGenericFieldTypes(); + } + + protected static boolean[] createDefaultMask(int size) { --- End diff -- *cover it in an obvious manner > Untangle CsvInputFormat into PojoTypeCsvInputFormat and > TupleTypeCsvInputFormat > > > Key: FLINK-2692 > URL: https://issues.apache.org/jira/browse/FLINK-2692 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Minor > > The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a > {{Pojo}} type. As a consequence, the processing logic, which has to work for > both types, is overly complex. For example, the {{CsvInputFormat}} contains > fields which are only used when a Pojo is returned. Moreover, the pojo field > information are constructed by calling setter methods which have to be called > in a very specific order, otherwise they fail. E.g. one first has to call > {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise
[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat
[ https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998946#comment-14998946 ] ASF GitHub Bot commented on FLINK-2692: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1266#discussion_r44435787 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java --- @@ -18,32 +18,97 @@ package org.apache.flink.api.java.io; +import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; +import org.apache.flink.api.common.io.GenericCsvInputFormat; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.types.parser.FieldParser; -import org.apache.flink.api.common.typeutils.CompositeType; -import org.apache.flink.api.java.tuple.Tuple; +import java.io.IOException; import org.apache.flink.core.fs.Path; import org.apache.flink.util.StringUtils; -public class CsvInputFormat extends CommonCsvInputFormat { +public abstract class CsvInputFormat extends GenericCsvInputFormat { private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + protected transient Object[] parsedValues; - public CsvInputFormat(Path filePath, CompositeType typeInformation) { - super(filePath, typeInformation); + protected CsvInputFormat(Path filePath) { + super(filePath); } - - public CsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, CompositeType typeInformation) { - super(filePath, lineDelimiter, fieldDelimiter, typeInformation); + + @Override + public void open(FileInputSplit split) throws IOException { + super.open(split); + + @SuppressWarnings("unchecked") + FieldParser[] fieldParsers = (FieldParser[]) getFieldParsers(); + + //throw exception if no field parsers are available + if (fieldParsers.length == 0) { + throw new IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to parse input"); + } + + // create the value holders + this.parsedValues = new Object[fieldParsers.length]; + for (int i = 0; i < fieldParsers.length; i++) { + this.parsedValues[i] = fieldParsers[i].createValue(); + } + + // left to right evaluation makes access [0] okay + // this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default + if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) { + this.lineDelimiterIsLinebreak = true; + } + + this.commentCount = 0; + this.invalidLineCount = 0; } @Override - protected OUT createTuple(OUT reuse) { - Tuple result = (Tuple) reuse; - for (int i = 0; i < parsedValues.length; i++) { - result.setField(parsedValues[i], i); + public OUT nextRecord(OUT record) throws IOException { + OUT returnRecord = null; + do { + returnRecord = super.nextRecord(record); + } while (returnRecord == null && !reachedEnd()); + + return returnRecord; + } + + public Class[] getFieldTypes() { + return super.getGenericFieldTypes(); + } + + protected static boolean[] createDefaultMask(int size) { --- End diff -- I wanted to cover that case directly in the InputFormat instead of *somewhere* else. This method is used to create a mask for exactly that case, when we can infer the mask from the number of field types. > Untangle CsvInputFormat into PojoTypeCsvInputFormat and > TupleTypeCsvInputFormat > > > Key: FLINK-2692 > URL: https://issues.apache.org/jira/browse/FLINK-2692 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Minor > > The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a > {{Pojo}} type. As a consequence, the processing logic, which has to work for > both types, is overly complex. For example, the {{CsvInputFormat}} contains > fields which are only used when a Pojo is returned. Moreover, the pojo field > information are constructed by calling setter methods
[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat
[ https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14985255#comment-14985255 ] ASF GitHub Bot commented on FLINK-2692: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1266#issuecomment-153024659 I think you can go ahead and merge it then. > Untangle CsvInputFormat into PojoTypeCsvInputFormat and > TupleTypeCsvInputFormat > > > Key: FLINK-2692 > URL: https://issues.apache.org/jira/browse/FLINK-2692 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Minor > > The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a > {{Pojo}} type. As a consequence, the processing logic, which has to work for > both types, is overly complex. For example, the {{CsvInputFormat}} contains > fields which are only used when a Pojo is returned. Moreover, the pojo field > information are constructed by calling setter methods which have to be called > in a very specific order, otherwise they fail. E.g. one first has to call > {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the > number of fields might be different. Furthermore, some of the methods can > only be called if the return type is a {{Pojo}} type, because they expect > that a {{PojoTypeInfo}} is present. > I think the {{CsvInputFormat}} should be refactored to make the code more > easily maintainable. I propose to split it up into a > {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all > the required information via their constructors instead of using the > {{setFields}} and {{setOrderOfPOJOFields}} approach. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat
[ https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14982647#comment-14982647 ] ASF GitHub Bot commented on FLINK-2692: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1266#issuecomment-152544715 Looks good to me. Is there maybe a way to also make `readRecord()` common to tuples and POJOs, because there we duplicate all the code and only the call that creates the tuple or POJO (or fills it) is different, if I'm not mistaken. Thanks for the work you're putting in there, I know it's not glorious but some parts need cleanup/refactoring. :smile: > Untangle CsvInputFormat into PojoTypeCsvInputFormat and > TupleTypeCsvInputFormat > > > Key: FLINK-2692 > URL: https://issues.apache.org/jira/browse/FLINK-2692 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Minor > > The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a > {{Pojo}} type. As a consequence, the processing logic, which has to work for > both types, is overly complex. For example, the {{CsvInputFormat}} contains > fields which are only used when a Pojo is returned. Moreover, the pojo field > information are constructed by calling setter methods which have to be called > in a very specific order, otherwise they fail. E.g. one first has to call > {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the > number of fields might be different. Furthermore, some of the methods can > only be called if the return type is a {{Pojo}} type, because they expect > that a {{PojoTypeInfo}} is present. > I think the {{CsvInputFormat}} should be refactored to make the code more > easily maintainable. I propose to split it up into a > {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all > the required information via their constructors instead of using the > {{setFields}} and {{setOrderOfPOJOFields}} approach. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat
[ https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14982859#comment-14982859 ] ASF GitHub Bot commented on FLINK-2692: --- Github user zentol commented on the pull request: https://github.com/apache/flink/pull/1266#issuecomment-152584371 We could move the differing lines (Csv:L114 Pojo:L218->L225) into separate methods that is called from a generic readRecord() method. something like fillRecord(OUT reuse, Object[] parsedValues). > Untangle CsvInputFormat into PojoTypeCsvInputFormat and > TupleTypeCsvInputFormat > > > Key: FLINK-2692 > URL: https://issues.apache.org/jira/browse/FLINK-2692 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Minor > > The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a > {{Pojo}} type. As a consequence, the processing logic, which has to work for > both types, is overly complex. For example, the {{CsvInputFormat}} contains > fields which are only used when a Pojo is returned. Moreover, the pojo field > information are constructed by calling setter methods which have to be called > in a very specific order, otherwise they fail. E.g. one first has to call > {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the > number of fields might be different. Furthermore, some of the methods can > only be called if the return type is a {{Pojo}} type, because they expect > that a {{PojoTypeInfo}} is present. > I think the {{CsvInputFormat}} should be refactored to make the code more > easily maintainable. I propose to split it up into a > {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all > the required information via their constructors instead of using the > {{setFields}} and {{setOrderOfPOJOFields}} approach. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat
[ https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14982871#comment-14982871 ] ASF GitHub Bot commented on FLINK-2692: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1266#issuecomment-152585852 Could, yes, but I'm fine with both, it is already cleaner than what we had before. The user facing API is not changed, right? So I think this is good to merge. > Untangle CsvInputFormat into PojoTypeCsvInputFormat and > TupleTypeCsvInputFormat > > > Key: FLINK-2692 > URL: https://issues.apache.org/jira/browse/FLINK-2692 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Minor > > The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a > {{Pojo}} type. As a consequence, the processing logic, which has to work for > both types, is overly complex. For example, the {{CsvInputFormat}} contains > fields which are only used when a Pojo is returned. Moreover, the pojo field > information are constructed by calling setter methods which have to be called > in a very specific order, otherwise they fail. E.g. one first has to call > {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the > number of fields might be different. Furthermore, some of the methods can > only be called if the return type is a {{Pojo}} type, because they expect > that a {{PojoTypeInfo}} is present. > I think the {{CsvInputFormat}} should be refactored to make the code more > easily maintainable. I propose to split it up into a > {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all > the required information via their constructors instead of using the > {{setFields}} and {{setOrderOfPOJOFields}} approach. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat
[ https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14961847#comment-14961847 ] Chesnay Schepler commented on FLINK-2692: - [~aljoscha] Great, that will work. Another question: the CommonCsvInputFormat contains a method setFields(int[] sourceFieldIndices, Class[] fieldTypes) . This method is not exposed in the CsvReader and only used in tests. Can it be removed? (together with setFieldsGeneric(int[] ...) in GenericCsvInputFormat) > Untangle CsvInputFormat into PojoTypeCsvInputFormat and > TupleTypeCsvInputFormat > > > Key: FLINK-2692 > URL: https://issues.apache.org/jira/browse/FLINK-2692 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Minor > > The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a > {{Pojo}} type. As a consequence, the processing logic, which has to work for > both types, is overly complex. For example, the {{CsvInputFormat}} contains > fields which are only used when a Pojo is returned. Moreover, the pojo field > information are constructed by calling setter methods which have to be called > in a very specific order, otherwise they fail. E.g. one first has to call > {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the > number of fields might be different. Furthermore, some of the methods can > only be called if the return type is a {{Pojo}} type, because they expect > that a {{PojoTypeInfo}} is present. > I think the {{CsvInputFormat}} should be refactored to make the code more > easily maintainable. I propose to split it up into a > {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all > the required information via their constructors instead of using the > {{setFields}} and {{setOrderOfPOJOFields}} approach. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat
[ https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14961848#comment-14961848 ] Aljoscha Krettek commented on FLINK-2692: - If it is only used in tests, I would think so. > Untangle CsvInputFormat into PojoTypeCsvInputFormat and > TupleTypeCsvInputFormat > > > Key: FLINK-2692 > URL: https://issues.apache.org/jira/browse/FLINK-2692 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Minor > > The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a > {{Pojo}} type. As a consequence, the processing logic, which has to work for > both types, is overly complex. For example, the {{CsvInputFormat}} contains > fields which are only used when a Pojo is returned. Moreover, the pojo field > information are constructed by calling setter methods which have to be called > in a very specific order, otherwise they fail. E.g. one first has to call > {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the > number of fields might be different. Furthermore, some of the methods can > only be called if the return type is a {{Pojo}} type, because they expect > that a {{PojoTypeInfo}} is present. > I think the {{CsvInputFormat}} should be refactored to make the code more > easily maintainable. I propose to split it up into a > {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all > the required information via their constructors instead of using the > {{setFields}} and {{setOrderOfPOJOFields}} approach. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat
[ https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14961858#comment-14961858 ] Chesnay Schepler commented on FLINK-2692: - ah my bad, its available through the Scala API's ExecutionEnvironment, so it's staying then... > Untangle CsvInputFormat into PojoTypeCsvInputFormat and > TupleTypeCsvInputFormat > > > Key: FLINK-2692 > URL: https://issues.apache.org/jira/browse/FLINK-2692 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Minor > > The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a > {{Pojo}} type. As a consequence, the processing logic, which has to work for > both types, is overly complex. For example, the {{CsvInputFormat}} contains > fields which are only used when a Pojo is returned. Moreover, the pojo field > information are constructed by calling setter methods which have to be called > in a very specific order, otherwise they fail. E.g. one first has to call > {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the > number of fields might be different. Furthermore, some of the methods can > only be called if the return type is a {{Pojo}} type, because they expect > that a {{PojoTypeInfo}} is present. > I think the {{CsvInputFormat}} should be refactored to make the code more > easily maintainable. I propose to split it up into a > {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all > the required information via their constructors instead of using the > {{setFields}} and {{setOrderOfPOJOFields}} approach. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat
[ https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14960375#comment-14960375 ] Aljoscha Krettek commented on FLINK-2692: - I think the problem is that the Tuple types are different. Maybe you could extend the createInstance method to also take a reuse object, Then the ScalaTupleSerializer would ignore the reuse and work as before and the Java TupleSerializer would take the reuse object and fill it. I think then we could do with one CsvInputFormat class. > Untangle CsvInputFormat into PojoTypeCsvInputFormat and > TupleTypeCsvInputFormat > > > Key: FLINK-2692 > URL: https://issues.apache.org/jira/browse/FLINK-2692 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Minor > > The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a > {{Pojo}} type. As a consequence, the processing logic, which has to work for > both types, is overly complex. For example, the {{CsvInputFormat}} contains > fields which are only used when a Pojo is returned. Moreover, the pojo field > information are constructed by calling setter methods which have to be called > in a very specific order, otherwise they fail. E.g. one first has to call > {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the > number of fields might be different. Furthermore, some of the methods can > only be called if the return type is a {{Pojo}} type, because they expect > that a {{PojoTypeInfo}} is present. > I think the {{CsvInputFormat}} should be refactored to make the code more > easily maintainable. I propose to split it up into a > {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all > the required information via their constructors instead of using the > {{setFields}} and {{setOrderOfPOJOFields}} approach. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat
[ https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14958724#comment-14958724 ] Chesnay Schepler commented on FLINK-2692: - I'm gonna take a stab at this one. > Untangle CsvInputFormat into PojoTypeCsvInputFormat and > TupleTypeCsvInputFormat > > > Key: FLINK-2692 > URL: https://issues.apache.org/jira/browse/FLINK-2692 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Priority: Minor > > The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a > {{Pojo}} type. As a consequence, the processing logic, which has to work for > both types, is overly complex. For example, the {{CsvInputFormat}} contains > fields which are only used when a Pojo is returned. Moreover, the pojo field > information are constructed by calling setter methods which have to be called > in a very specific order, otherwise they fail. E.g. one first has to call > {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the > number of fields might be different. Furthermore, some of the methods can > only be called if the return type is a {{Pojo}} type, because they expect > that a {{PojoTypeInfo}} is present. > I think the {{CsvInputFormat}} should be refactored to make the code more > easily maintainable. I propose to split it up into a > {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all > the required information via their constructors instead of using the > {{setFields}} and {{setOrderOfPOJOFields}} approach. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat
[ https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14959022#comment-14959022 ] Chesnay Schepler commented on FLINK-2692: - is there any reason that prevents the scala api from using the CsvInputFormat class? they only differ in the createTuple method: Java: {code} @Override protected OUT createTuple(OUT reuse) { Tuple result = (Tuple) reuse; for (int i = 0; i < parsedValues.length; i++) { result.setField(parsedValues[i], i); } return reuse; } {code} Scala: {code} @Override protected OUT createTuple(OUT reuse) { Preconditions.checkNotNull(tupleSerializer, "The tuple serializer must be initialised." + " It is not initialized if the given type was not a " + TupleTypeInfoBase.class.getName() + "."); return tupleSerializer.createInstance(parsedValues); } {code} > Untangle CsvInputFormat into PojoTypeCsvInputFormat and > TupleTypeCsvInputFormat > > > Key: FLINK-2692 > URL: https://issues.apache.org/jira/browse/FLINK-2692 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Minor > > The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a > {{Pojo}} type. As a consequence, the processing logic, which has to work for > both types, is overly complex. For example, the {{CsvInputFormat}} contains > fields which are only used when a Pojo is returned. Moreover, the pojo field > information are constructed by calling setter methods which have to be called > in a very specific order, otherwise they fail. E.g. one first has to call > {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the > number of fields might be different. Furthermore, some of the methods can > only be called if the return type is a {{Pojo}} type, because they expect > that a {{PojoTypeInfo}} is present. > I think the {{CsvInputFormat}} should be refactored to make the code more > easily maintainable. I propose to split it up into a > {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all > the required information via their constructors instead of using the > {{setFields}} and {{setOrderOfPOJOFields}} approach. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat
[ https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14939568#comment-14939568 ] Till Rohrmann commented on FLINK-2692: -- No it does not refer to {{org.apache.flink.api.java.record.io.CsvInputFormat}}, which will hopefully soon be removed, but to the {{org.apache.flink.api.java.io.CsvInputFormat}}. > Untangle CsvInputFormat into PojoTypeCsvInputFormat and > TupleTypeCsvInputFormat > > > Key: FLINK-2692 > URL: https://issues.apache.org/jira/browse/FLINK-2692 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Priority: Minor > > The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a > {{Pojo}} type. As a consequence, the processing logic, which has to work for > both types, is overly complex. For example, the {{CsvInputFormat}} contains > fields which are only used when a Pojo is returned. Moreover, the pojo field > information are constructed by calling setter methods which have to be called > in a very specific order, otherwise they fail. E.g. one first has to call > {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the > number of fields might be different. Furthermore, some of the methods can > only be called if the return type is a {{Pojo}} type, because they expect > that a {{PojoTypeInfo}} is present. > I think the {{CsvInputFormat}} should be refactored to make the code more > easily maintainable. I propose to split it up into a > {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all > the required information via their constructors instead of using the > {{setFields}} and {{setOrderOfPOJOFields}} approach. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat
[ https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14938734#comment-14938734 ] Martin Liesenberg commented on FLINK-2692: -- Does this refer to the deprecated class in flink-java? > Untangle CsvInputFormat into PojoTypeCsvInputFormat and > TupleTypeCsvInputFormat > > > Key: FLINK-2692 > URL: https://issues.apache.org/jira/browse/FLINK-2692 > Project: Flink > Issue Type: Improvement >Reporter: Till Rohrmann >Priority: Minor > > The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a > {{Pojo}} type. As a consequence, the processing logic, which has to work for > both types, is overly complex. For example, the {{CsvInputFormat}} contains > fields which are only used when a Pojo is returned. Moreover, the pojo field > information are constructed by calling setter methods which have to be called > in a very specific order, otherwise they fail. E.g. one first has to call > {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the > number of fields might be different. Furthermore, some of the methods can > only be called if the return type is a {{Pojo}} type, because they expect > that a {{PojoTypeInfo}} is present. > I think the {{CsvInputFormat}} should be refactored to make the code more > easily maintainable. I propose to split it up into a > {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all > the required information via their constructors instead of using the > {{setFields}} and {{setOrderOfPOJOFields}} approach. -- This message was sent by Atlassian JIRA (v6.3.4#6332)