[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15011916#comment-15011916
 ] 

ASF GitHub Bot commented on FLINK-2692:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1266


> Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a 
> {{Pojo}} type. As a consequence, the processing logic, which has to work for 
> both types, is overly complex. For example, the {{CsvInputFormat}} contains 
> fields which are only used when a Pojo is returned. Moreover, the pojo field 
> information are constructed by calling setter methods which have to be called 
> in a very specific order, otherwise they fail. E.g. one first has to call 
> {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the 
> number of fields might be different. Furthermore, some of the methods can 
> only be called if the return type is a {{Pojo}} type, because they expect 
> that a {{PojoTypeInfo}} is present.
> I think the {{CsvInputFormat}} should be refactored to make the code more 
> easily maintainable. I propose to split it up into a 
> {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all 
> the required information via their constructors instead of using the 
> {{setFields}} and {{setOrderOfPOJOFields}} approach.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-11-18 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15011917#comment-15011917
 ] 

Chesnay Schepler commented on FLINK-2692:
-

Implemented in bd61f2dbdf1a0215363ffa8416329e1dbf277593

> Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a 
> {{Pojo}} type. As a consequence, the processing logic, which has to work for 
> both types, is overly complex. For example, the {{CsvInputFormat}} contains 
> fields which are only used when a Pojo is returned. Moreover, the pojo field 
> information are constructed by calling setter methods which have to be called 
> in a very specific order, otherwise they fail. E.g. one first has to call 
> {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the 
> number of fields might be different. Furthermore, some of the methods can 
> only be called if the return type is a {{Pojo}} type, because they expect 
> that a {{PojoTypeInfo}} is present.
> I think the {{CsvInputFormat}} should be refactored to make the code more 
> easily maintainable. I propose to split it up into a 
> {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all 
> the required information via their constructors instead of using the 
> {{setFields}} and {{setOrderOfPOJOFields}} approach.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15010918#comment-15010918
 ] 

ASF GitHub Bot commented on FLINK-2692:
---

Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1266#issuecomment-157702242
  
Merging this.


> Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a 
> {{Pojo}} type. As a consequence, the processing logic, which has to work for 
> both types, is overly complex. For example, the {{CsvInputFormat}} contains 
> fields which are only used when a Pojo is returned. Moreover, the pojo field 
> information are constructed by calling setter methods which have to be called 
> in a very specific order, otherwise they fail. E.g. one first has to call 
> {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the 
> number of fields might be different. Furthermore, some of the methods can 
> only be called if the return type is a {{Pojo}} type, because they expect 
> that a {{PojoTypeInfo}} is present.
> I think the {{CsvInputFormat}} should be refactored to make the code more 
> easily maintainable. I propose to split it up into a 
> {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all 
> the required information via their constructors instead of using the 
> {{setFields}} and {{setOrderOfPOJOFields}} approach.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15010732#comment-15010732
 ] 

ASF GitHub Bot commented on FLINK-2692:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1266#issuecomment-157675819
  
I think it's good :+1: 


> Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a 
> {{Pojo}} type. As a consequence, the processing logic, which has to work for 
> both types, is overly complex. For example, the {{CsvInputFormat}} contains 
> fields which are only used when a Pojo is returned. Moreover, the pojo field 
> information are constructed by calling setter methods which have to be called 
> in a very specific order, otherwise they fail. E.g. one first has to call 
> {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the 
> number of fields might be different. Furthermore, some of the methods can 
> only be called if the return type is a {{Pojo}} type, because they expect 
> that a {{PojoTypeInfo}} is present.
> I think the {{CsvInputFormat}} should be refactored to make the code more 
> easily maintainable. I propose to split it up into a 
> {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all 
> the required information via their constructors instead of using the 
> {{setFields}} and {{setOrderOfPOJOFields}} approach.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-11-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15010656#comment-15010656
 ] 

ASF GitHub Bot commented on FLINK-2692:
---

Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1266#issuecomment-157662815
  
any other comments? I'll merge it other wise later on.


> Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a 
> {{Pojo}} type. As a consequence, the processing logic, which has to work for 
> both types, is overly complex. For example, the {{CsvInputFormat}} contains 
> fields which are only used when a Pojo is returned. Moreover, the pojo field 
> information are constructed by calling setter methods which have to be called 
> in a very specific order, otherwise they fail. E.g. one first has to call 
> {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the 
> number of fields might be different. Furthermore, some of the methods can 
> only be called if the return type is a {{Pojo}} type, because they expect 
> that a {{PojoTypeInfo}} is present.
> I think the {{CsvInputFormat}} should be refactored to make the code more 
> easily maintainable. I propose to split it up into a 
> {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all 
> the required information via their constructors instead of using the 
> {{setFields}} and {{setOrderOfPOJOFields}} approach.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-11-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15000309#comment-15000309
 ] 

ASF GitHub Bot commented on FLINK-2692:
---

Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1266#issuecomment-155764846
  
@fhueske I've addressed your comments.


> Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a 
> {{Pojo}} type. As a consequence, the processing logic, which has to work for 
> both types, is overly complex. For example, the {{CsvInputFormat}} contains 
> fields which are only used when a Pojo is returned. Moreover, the pojo field 
> information are constructed by calling setter methods which have to be called 
> in a very specific order, otherwise they fail. E.g. one first has to call 
> {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the 
> number of fields might be different. Furthermore, some of the methods can 
> only be called if the return type is a {{Pojo}} type, because they expect 
> that a {{PojoTypeInfo}} is present.
> I think the {{CsvInputFormat}} should be refactored to make the code more 
> easily maintainable. I propose to split it up into a 
> {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all 
> the required information via their constructors instead of using the 
> {{setFields}} and {{setOrderOfPOJOFields}} approach.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998926#comment-14998926
 ] 

ASF GitHub Bot commented on FLINK-2692:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1266#discussion_r44434469
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -18,32 +18,97 @@
 
 package org.apache.flink.api.java.io;
 
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.types.parser.FieldParser;
 
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.tuple.Tuple;
+import java.io.IOException;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.StringUtils;
 
-public class CsvInputFormat extends CommonCsvInputFormat {
+public abstract class CsvInputFormat extends 
GenericCsvInputFormat {
 
private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   protected transient Object[] parsedValues;

-   public CsvInputFormat(Path filePath, CompositeType 
typeInformation) {
-   super(filePath, typeInformation);
+   protected CsvInputFormat(Path filePath) {
+   super(filePath);
}
-   
-   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, CompositeType typeInformation) {
-   super(filePath, lineDelimiter, fieldDelimiter, typeInformation);
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   super.open(split);
+
+   @SuppressWarnings("unchecked")
+   FieldParser[] fieldParsers = (FieldParser[]) 
getFieldParsers();
+
+   //throw exception if no field parsers are available
+   if (fieldParsers.length == 0) {
+   throw new 
IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to 
parse input");
+   }
+
+   // create the value holders
+   this.parsedValues = new Object[fieldParsers.length];
+   for (int i = 0; i < fieldParsers.length; i++) {
+   this.parsedValues[i] = fieldParsers[i].createValue();
+   }
+
+   // left to right evaluation makes access [0] okay
+   // this marker is used to fasten up readRecord, so that it 
doesn't have to check each call if the line ending is set to default
+   if (this.getDelimiter().length == 1 && this.getDelimiter()[0] 
== '\n' ) {
+   this.lineDelimiterIsLinebreak = true;
+   }
+
+   this.commentCount = 0;
+   this.invalidLineCount = 0;
}
 
@Override
-   protected OUT createTuple(OUT reuse) {
-   Tuple result = (Tuple) reuse;
-   for (int i = 0; i < parsedValues.length; i++) {
-   result.setField(parsedValues[i], i);
+   public OUT nextRecord(OUT record) throws IOException {
+   OUT returnRecord = null;
+   do {
+   returnRecord = super.nextRecord(record);
+   } while (returnRecord == null && !reachedEnd());
+
+   return returnRecord;
+   }
+
+   public Class[] getFieldTypes() {
+   return super.getGenericFieldTypes();
+   }
+
+   protected static boolean[] createDefaultMask(int size) {
+   boolean[] includedMask = new boolean[size];
+   for (int x=0; x Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The 

[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998999#comment-14998999
 ] 

ASF GitHub Bot commented on FLINK-2692:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1266#discussion_r44439588
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -18,32 +18,97 @@
 
 package org.apache.flink.api.java.io;
 
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.types.parser.FieldParser;
 
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.tuple.Tuple;
+import java.io.IOException;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.StringUtils;
 
-public class CsvInputFormat extends CommonCsvInputFormat {
+public abstract class CsvInputFormat extends 
GenericCsvInputFormat {
 
private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   protected transient Object[] parsedValues;

-   public CsvInputFormat(Path filePath, CompositeType 
typeInformation) {
-   super(filePath, typeInformation);
+   protected CsvInputFormat(Path filePath) {
+   super(filePath);
}
-   
-   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, CompositeType typeInformation) {
-   super(filePath, lineDelimiter, fieldDelimiter, typeInformation);
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   super.open(split);
+
+   @SuppressWarnings("unchecked")
+   FieldParser[] fieldParsers = (FieldParser[]) 
getFieldParsers();
+
+   //throw exception if no field parsers are available
+   if (fieldParsers.length == 0) {
+   throw new 
IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to 
parse input");
+   }
+
+   // create the value holders
+   this.parsedValues = new Object[fieldParsers.length];
+   for (int i = 0; i < fieldParsers.length; i++) {
+   this.parsedValues[i] = fieldParsers[i].createValue();
+   }
+
+   // left to right evaluation makes access [0] okay
+   // this marker is used to fasten up readRecord, so that it 
doesn't have to check each call if the line ending is set to default
+   if (this.getDelimiter().length == 1 && this.getDelimiter()[0] 
== '\n' ) {
+   this.lineDelimiterIsLinebreak = true;
+   }
+
+   this.commentCount = 0;
+   this.invalidLineCount = 0;
}
 
@Override
-   protected OUT createTuple(OUT reuse) {
-   Tuple result = (Tuple) reuse;
-   for (int i = 0; i < parsedValues.length; i++) {
-   result.setField(parsedValues[i], i);
+   public OUT nextRecord(OUT record) throws IOException {
+   OUT returnRecord = null;
+   do {
+   returnRecord = super.nextRecord(record);
+   } while (returnRecord == null && !reachedEnd());
+
+   return returnRecord;
+   }
+
+   public Class[] getFieldTypes() {
+   return super.getGenericFieldTypes();
+   }
+
+   protected static boolean[] createDefaultMask(int size) {
+   boolean[] includedMask = new boolean[size];
+   for (int x=0; x Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> 

[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998985#comment-14998985
 ] 

ASF GitHub Bot commented on FLINK-2692:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1266#discussion_r44438028
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java 
---
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class PojoCsvInputFormat extends CsvInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   private Class pojoTypeClass;
+
+   private String[] pojoFieldNames;
+
+   private transient PojoTypeInfo pojoTypeInfo;
+   private transient Field[] pojoFields;
+
+   public PojoCsvInputFormat(Path filePath, PojoTypeInfo 
pojoTypeInfo) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
pojoTypeInfo);
+   }
+
+   public PojoCsvInputFormat(Path filePath, PojoTypeInfo 
pojoTypeInfo, String[] fieldNames) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
pojoTypeInfo, fieldNames, createDefaultMask(pojoTypeInfo.getArity()));
+   }
+
+   public PojoCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, PojoTypeInfo pojoTypeInfo) {
+   this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, 
pojoTypeInfo.getFieldNames(), createDefaultMask(pojoTypeInfo.getArity()));
+   }
+
+   public PojoCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, PojoTypeInfo pojoTypeInfo, String[] fieldNames) {
+   this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, 
fieldNames, createDefaultMask(fieldNames.length));
+   }
+
+   public PojoCsvInputFormat(Path filePath, PojoTypeInfo 
pojoTypeInfo, int[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
pojoTypeInfo, pojoTypeInfo.getFieldNames(), toBooleanMask(includedFieldsMask));
+   }
+
+   public PojoCsvInputFormat(Path filePath, PojoTypeInfo 
pojoTypeInfo, String[] fieldNames, int[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
pojoTypeInfo, fieldNames, includedFieldsMask);
+   }
+
+   public PojoCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, PojoTypeInfo pojoTypeInfo, int[] includedFieldsMask) {
+   this(filePath, lineDelimiter, fieldDelimiter, pojoTypeInfo, 
pojoTypeInfo.getFieldNames(), includedFieldsMask);
+   }
+
+   public PojoCsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, PojoTypeInfo pojoTypeInfo, String[] fieldNames, int[] 
includedFieldsMask) {
+   super(filePath);
+   boolean[] mask = (includedFieldsMask == null)
+   ? createDefaultMask(fieldNames.length)
+   : toBooleanMask(includedFieldsMask);
+   configure(lineDelimiter, fieldDelimiter, pojoTypeInfo, 
fieldNames, mask);
+   }
+
+   public PojoCsvInputFormat(Path filePath, PojoTypeInfo 
pojoTypeInfo, boolean[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
pojoTypeInfo, pojoTypeInfo.getFieldNames(), includedFieldsMask);
+   }
+
+   public PojoCsvInputFormat(Path filePath, PojoTypeInfo 
pojoTypeInfo, String[] fieldNames, boolean[] includedFieldsMask) {
+   this(filePath, DEFAULT_LINE_DELIMITER, 

[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14999015#comment-14999015
 ] 

ASF GitHub Bot commented on FLINK-2692:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1266#issuecomment-155515305
  
I agree with @aljoscha and you on the `readRecord()` code. It would be nice 
to have the common parts of `readRecord()` in the `CsvInputFormat` and specific 
`fillRecord` in the tuple and POJO formats.

Otherwise, the PR looks really good.


> Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a 
> {{Pojo}} type. As a consequence, the processing logic, which has to work for 
> both types, is overly complex. For example, the {{CsvInputFormat}} contains 
> fields which are only used when a Pojo is returned. Moreover, the pojo field 
> information are constructed by calling setter methods which have to be called 
> in a very specific order, otherwise they fail. E.g. one first has to call 
> {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the 
> number of fields might be different. Furthermore, some of the methods can 
> only be called if the return type is a {{Pojo}} type, because they expect 
> that a {{PojoTypeInfo}} is present.
> I think the {{CsvInputFormat}} should be refactored to make the code more 
> easily maintainable. I propose to split it up into a 
> {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all 
> the required information via their constructors instead of using the 
> {{setFields}} and {{setOrderOfPOJOFields}} approach.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14999027#comment-14999027
 ] 

ASF GitHub Bot commented on FLINK-2692:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1266#discussion_r1239
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -18,32 +18,97 @@
 
 package org.apache.flink.api.java.io;
 
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.types.parser.FieldParser;
 
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.tuple.Tuple;
+import java.io.IOException;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.StringUtils;
 
-public class CsvInputFormat extends CommonCsvInputFormat {
+public abstract class CsvInputFormat extends 
GenericCsvInputFormat {
 
private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   protected transient Object[] parsedValues;

-   public CsvInputFormat(Path filePath, CompositeType 
typeInformation) {
-   super(filePath, typeInformation);
+   protected CsvInputFormat(Path filePath) {
+   super(filePath);
}
-   
-   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, CompositeType typeInformation) {
-   super(filePath, lineDelimiter, fieldDelimiter, typeInformation);
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   super.open(split);
+
+   @SuppressWarnings("unchecked")
+   FieldParser[] fieldParsers = (FieldParser[]) 
getFieldParsers();
+
+   //throw exception if no field parsers are available
+   if (fieldParsers.length == 0) {
+   throw new 
IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to 
parse input");
+   }
+
+   // create the value holders
+   this.parsedValues = new Object[fieldParsers.length];
+   for (int i = 0; i < fieldParsers.length; i++) {
+   this.parsedValues[i] = fieldParsers[i].createValue();
+   }
+
+   // left to right evaluation makes access [0] okay
+   // this marker is used to fasten up readRecord, so that it 
doesn't have to check each call if the line ending is set to default
+   if (this.getDelimiter().length == 1 && this.getDelimiter()[0] 
== '\n' ) {
+   this.lineDelimiterIsLinebreak = true;
+   }
+
+   this.commentCount = 0;
+   this.invalidLineCount = 0;
}
 
@Override
-   protected OUT createTuple(OUT reuse) {
-   Tuple result = (Tuple) reuse;
-   for (int i = 0; i < parsedValues.length; i++) {
-   result.setField(parsedValues[i], i);
+   public OUT nextRecord(OUT record) throws IOException {
+   OUT returnRecord = null;
+   do {
+   returnRecord = super.nextRecord(record);
+   } while (returnRecord == null && !reachedEnd());
+
+   return returnRecord;
+   }
+
+   public Class[] getFieldTypes() {
+   return super.getGenericFieldTypes();
+   }
+
+   protected static boolean[] createDefaultMask(int size) {
+   boolean[] includedMask = new boolean[size];
+   for (int x=0; x Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a 
> {{Pojo}} type. As a consequence, the processing logic, 

[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998914#comment-14998914
 ] 

ASF GitHub Bot commented on FLINK-2692:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/1266#discussion_r44434150
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -18,32 +18,97 @@
 
 package org.apache.flink.api.java.io;
 
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.types.parser.FieldParser;
 
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.tuple.Tuple;
+import java.io.IOException;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.StringUtils;
 
-public class CsvInputFormat extends CommonCsvInputFormat {
+public abstract class CsvInputFormat extends 
GenericCsvInputFormat {
 
private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   protected transient Object[] parsedValues;

-   public CsvInputFormat(Path filePath, CompositeType 
typeInformation) {
-   super(filePath, typeInformation);
+   protected CsvInputFormat(Path filePath) {
+   super(filePath);
}
-   
-   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, CompositeType typeInformation) {
-   super(filePath, lineDelimiter, fieldDelimiter, typeInformation);
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   super.open(split);
+
+   @SuppressWarnings("unchecked")
+   FieldParser[] fieldParsers = (FieldParser[]) 
getFieldParsers();
+
+   //throw exception if no field parsers are available
+   if (fieldParsers.length == 0) {
+   throw new 
IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to 
parse input");
+   }
+
+   // create the value holders
+   this.parsedValues = new Object[fieldParsers.length];
+   for (int i = 0; i < fieldParsers.length; i++) {
+   this.parsedValues[i] = fieldParsers[i].createValue();
+   }
+
+   // left to right evaluation makes access [0] okay
+   // this marker is used to fasten up readRecord, so that it 
doesn't have to check each call if the line ending is set to default
+   if (this.getDelimiter().length == 1 && this.getDelimiter()[0] 
== '\n' ) {
+   this.lineDelimiterIsLinebreak = true;
+   }
+
+   this.commentCount = 0;
+   this.invalidLineCount = 0;
}
 
@Override
-   protected OUT createTuple(OUT reuse) {
-   Tuple result = (Tuple) reuse;
-   for (int i = 0; i < parsedValues.length; i++) {
-   result.setField(parsedValues[i], i);
+   public OUT nextRecord(OUT record) throws IOException {
+   OUT returnRecord = null;
+   do {
+   returnRecord = super.nextRecord(record);
+   } while (returnRecord == null && !reachedEnd());
+
+   return returnRecord;
+   }
+
+   public Class[] getFieldTypes() {
+   return super.getGenericFieldTypes();
+   }
+
+   protected static boolean[] createDefaultMask(int size) {
--- End diff --

Isn't the default that fields are read one after the other from the start 
of a line?
Why do we need this method then?


> Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a 
> {{Pojo}} type. As a consequence, the processing logic, which has to work for 
> both types, is overly complex. For example, the {{CsvInputFormat}} contains 
> fields which are only used when a Pojo is returned. Moreover, the pojo field 
> information are constructed by calling setter methods which have to be called 
> in a very specific order, otherwise they fail. E.g. one 

[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998947#comment-14998947
 ] 

ASF GitHub Bot commented on FLINK-2692:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1266#discussion_r44435810
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -18,32 +18,97 @@
 
 package org.apache.flink.api.java.io;
 
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.types.parser.FieldParser;
 
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.tuple.Tuple;
+import java.io.IOException;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.StringUtils;
 
-public class CsvInputFormat extends CommonCsvInputFormat {
+public abstract class CsvInputFormat extends 
GenericCsvInputFormat {
 
private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   protected transient Object[] parsedValues;

-   public CsvInputFormat(Path filePath, CompositeType 
typeInformation) {
-   super(filePath, typeInformation);
+   protected CsvInputFormat(Path filePath) {
+   super(filePath);
}
-   
-   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, CompositeType typeInformation) {
-   super(filePath, lineDelimiter, fieldDelimiter, typeInformation);
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   super.open(split);
+
+   @SuppressWarnings("unchecked")
+   FieldParser[] fieldParsers = (FieldParser[]) 
getFieldParsers();
+
+   //throw exception if no field parsers are available
+   if (fieldParsers.length == 0) {
+   throw new 
IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to 
parse input");
+   }
+
+   // create the value holders
+   this.parsedValues = new Object[fieldParsers.length];
+   for (int i = 0; i < fieldParsers.length; i++) {
+   this.parsedValues[i] = fieldParsers[i].createValue();
+   }
+
+   // left to right evaluation makes access [0] okay
+   // this marker is used to fasten up readRecord, so that it 
doesn't have to check each call if the line ending is set to default
+   if (this.getDelimiter().length == 1 && this.getDelimiter()[0] 
== '\n' ) {
+   this.lineDelimiterIsLinebreak = true;
+   }
+
+   this.commentCount = 0;
+   this.invalidLineCount = 0;
}
 
@Override
-   protected OUT createTuple(OUT reuse) {
-   Tuple result = (Tuple) reuse;
-   for (int i = 0; i < parsedValues.length; i++) {
-   result.setField(parsedValues[i], i);
+   public OUT nextRecord(OUT record) throws IOException {
+   OUT returnRecord = null;
+   do {
+   returnRecord = super.nextRecord(record);
+   } while (returnRecord == null && !reachedEnd());
+
+   return returnRecord;
+   }
+
+   public Class[] getFieldTypes() {
+   return super.getGenericFieldTypes();
+   }
+
+   protected static boolean[] createDefaultMask(int size) {
--- End diff --

*cover it in an obvious manner


> Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a 
> {{Pojo}} type. As a consequence, the processing logic, which has to work for 
> both types, is overly complex. For example, the {{CsvInputFormat}} contains 
> fields which are only used when a Pojo is returned. Moreover, the pojo field 
> information are constructed by calling setter methods which have to be called 
> in a very specific order, otherwise they fail. E.g. one first has to call 
> {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise 

[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14998946#comment-14998946
 ] 

ASF GitHub Bot commented on FLINK-2692:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1266#discussion_r44435787
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -18,32 +18,97 @@
 
 package org.apache.flink.api.java.io;
 
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
+import org.apache.flink.api.common.io.GenericCsvInputFormat;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.types.parser.FieldParser;
 
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.tuple.Tuple;
+import java.io.IOException;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.StringUtils;
 
-public class CsvInputFormat extends CommonCsvInputFormat {
+public abstract class CsvInputFormat extends 
GenericCsvInputFormat {
 
private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   protected transient Object[] parsedValues;

-   public CsvInputFormat(Path filePath, CompositeType 
typeInformation) {
-   super(filePath, typeInformation);
+   protected CsvInputFormat(Path filePath) {
+   super(filePath);
}
-   
-   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, CompositeType typeInformation) {
-   super(filePath, lineDelimiter, fieldDelimiter, typeInformation);
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+   super.open(split);
+
+   @SuppressWarnings("unchecked")
+   FieldParser[] fieldParsers = (FieldParser[]) 
getFieldParsers();
+
+   //throw exception if no field parsers are available
+   if (fieldParsers.length == 0) {
+   throw new 
IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to 
parse input");
+   }
+
+   // create the value holders
+   this.parsedValues = new Object[fieldParsers.length];
+   for (int i = 0; i < fieldParsers.length; i++) {
+   this.parsedValues[i] = fieldParsers[i].createValue();
+   }
+
+   // left to right evaluation makes access [0] okay
+   // this marker is used to fasten up readRecord, so that it 
doesn't have to check each call if the line ending is set to default
+   if (this.getDelimiter().length == 1 && this.getDelimiter()[0] 
== '\n' ) {
+   this.lineDelimiterIsLinebreak = true;
+   }
+
+   this.commentCount = 0;
+   this.invalidLineCount = 0;
}
 
@Override
-   protected OUT createTuple(OUT reuse) {
-   Tuple result = (Tuple) reuse;
-   for (int i = 0; i < parsedValues.length; i++) {
-   result.setField(parsedValues[i], i);
+   public OUT nextRecord(OUT record) throws IOException {
+   OUT returnRecord = null;
+   do {
+   returnRecord = super.nextRecord(record);
+   } while (returnRecord == null && !reachedEnd());
+
+   return returnRecord;
+   }
+
+   public Class[] getFieldTypes() {
+   return super.getGenericFieldTypes();
+   }
+
+   protected static boolean[] createDefaultMask(int size) {
--- End diff --

I wanted to cover that case directly in the InputFormat instead of 
*somewhere* else. This method is used to create a mask for exactly that case, 
when we can infer the mask from the number of field types.


> Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a 
> {{Pojo}} type. As a consequence, the processing logic, which has to work for 
> both types, is overly complex. For example, the {{CsvInputFormat}} contains 
> fields which are only used when a Pojo is returned. Moreover, the pojo field 
> information are constructed by calling setter methods 

[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14985255#comment-14985255
 ] 

ASF GitHub Bot commented on FLINK-2692:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1266#issuecomment-153024659
  
I think you can go ahead and merge it then.


> Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a 
> {{Pojo}} type. As a consequence, the processing logic, which has to work for 
> both types, is overly complex. For example, the {{CsvInputFormat}} contains 
> fields which are only used when a Pojo is returned. Moreover, the pojo field 
> information are constructed by calling setter methods which have to be called 
> in a very specific order, otherwise they fail. E.g. one first has to call 
> {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the 
> number of fields might be different. Furthermore, some of the methods can 
> only be called if the return type is a {{Pojo}} type, because they expect 
> that a {{PojoTypeInfo}} is present.
> I think the {{CsvInputFormat}} should be refactored to make the code more 
> easily maintainable. I propose to split it up into a 
> {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all 
> the required information via their constructors instead of using the 
> {{setFields}} and {{setOrderOfPOJOFields}} approach.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14982647#comment-14982647
 ] 

ASF GitHub Bot commented on FLINK-2692:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1266#issuecomment-152544715
  
Looks good to me. Is there maybe a way to also make `readRecord()` common 
to tuples and POJOs, because there we duplicate all the code and only the call 
that creates the tuple or POJO (or fills it) is different, if I'm not mistaken.

Thanks for the work you're putting in there, I know it's not glorious but 
some parts need cleanup/refactoring. :smile: 


> Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a 
> {{Pojo}} type. As a consequence, the processing logic, which has to work for 
> both types, is overly complex. For example, the {{CsvInputFormat}} contains 
> fields which are only used when a Pojo is returned. Moreover, the pojo field 
> information are constructed by calling setter methods which have to be called 
> in a very specific order, otherwise they fail. E.g. one first has to call 
> {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the 
> number of fields might be different. Furthermore, some of the methods can 
> only be called if the return type is a {{Pojo}} type, because they expect 
> that a {{PojoTypeInfo}} is present.
> I think the {{CsvInputFormat}} should be refactored to make the code more 
> easily maintainable. I propose to split it up into a 
> {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all 
> the required information via their constructors instead of using the 
> {{setFields}} and {{setOrderOfPOJOFields}} approach.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14982859#comment-14982859
 ] 

ASF GitHub Bot commented on FLINK-2692:
---

Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/1266#issuecomment-152584371
  
We could move the differing lines (Csv:L114 Pojo:L218->L225) into separate 
methods that is called from a generic readRecord() method. something like 
fillRecord(OUT reuse, Object[] parsedValues).


> Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a 
> {{Pojo}} type. As a consequence, the processing logic, which has to work for 
> both types, is overly complex. For example, the {{CsvInputFormat}} contains 
> fields which are only used when a Pojo is returned. Moreover, the pojo field 
> information are constructed by calling setter methods which have to be called 
> in a very specific order, otherwise they fail. E.g. one first has to call 
> {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the 
> number of fields might be different. Furthermore, some of the methods can 
> only be called if the return type is a {{Pojo}} type, because they expect 
> that a {{PojoTypeInfo}} is present.
> I think the {{CsvInputFormat}} should be refactored to make the code more 
> easily maintainable. I propose to split it up into a 
> {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all 
> the required information via their constructors instead of using the 
> {{setFields}} and {{setOrderOfPOJOFields}} approach.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-10-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14982871#comment-14982871
 ] 

ASF GitHub Bot commented on FLINK-2692:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1266#issuecomment-152585852
  
Could, yes, but I'm fine with both, it is already cleaner than what we had 
before.

The user facing API is not changed, right? So I think this is good to merge.


> Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a 
> {{Pojo}} type. As a consequence, the processing logic, which has to work for 
> both types, is overly complex. For example, the {{CsvInputFormat}} contains 
> fields which are only used when a Pojo is returned. Moreover, the pojo field 
> information are constructed by calling setter methods which have to be called 
> in a very specific order, otherwise they fail. E.g. one first has to call 
> {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the 
> number of fields might be different. Furthermore, some of the methods can 
> only be called if the return type is a {{Pojo}} type, because they expect 
> that a {{PojoTypeInfo}} is present.
> I think the {{CsvInputFormat}} should be refactored to make the code more 
> easily maintainable. I propose to split it up into a 
> {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all 
> the required information via their constructors instead of using the 
> {{setFields}} and {{setOrderOfPOJOFields}} approach.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-10-17 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14961847#comment-14961847
 ] 

Chesnay Schepler commented on FLINK-2692:
-

[~aljoscha] Great, that will work.

Another question: the CommonCsvInputFormat contains a method setFields(int[] 
sourceFieldIndices, Class[] fieldTypes) . This method is not exposed in the 
CsvReader and only used in tests. Can it be removed? (together with 
setFieldsGeneric(int[] ...) in GenericCsvInputFormat)

> Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a 
> {{Pojo}} type. As a consequence, the processing logic, which has to work for 
> both types, is overly complex. For example, the {{CsvInputFormat}} contains 
> fields which are only used when a Pojo is returned. Moreover, the pojo field 
> information are constructed by calling setter methods which have to be called 
> in a very specific order, otherwise they fail. E.g. one first has to call 
> {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the 
> number of fields might be different. Furthermore, some of the methods can 
> only be called if the return type is a {{Pojo}} type, because they expect 
> that a {{PojoTypeInfo}} is present.
> I think the {{CsvInputFormat}} should be refactored to make the code more 
> easily maintainable. I propose to split it up into a 
> {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all 
> the required information via their constructors instead of using the 
> {{setFields}} and {{setOrderOfPOJOFields}} approach.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-10-17 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14961848#comment-14961848
 ] 

Aljoscha Krettek commented on FLINK-2692:
-

If it is only used in tests, I would think so.

> Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a 
> {{Pojo}} type. As a consequence, the processing logic, which has to work for 
> both types, is overly complex. For example, the {{CsvInputFormat}} contains 
> fields which are only used when a Pojo is returned. Moreover, the pojo field 
> information are constructed by calling setter methods which have to be called 
> in a very specific order, otherwise they fail. E.g. one first has to call 
> {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the 
> number of fields might be different. Furthermore, some of the methods can 
> only be called if the return type is a {{Pojo}} type, because they expect 
> that a {{PojoTypeInfo}} is present.
> I think the {{CsvInputFormat}} should be refactored to make the code more 
> easily maintainable. I propose to split it up into a 
> {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all 
> the required information via their constructors instead of using the 
> {{setFields}} and {{setOrderOfPOJOFields}} approach.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-10-17 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14961858#comment-14961858
 ] 

Chesnay Schepler commented on FLINK-2692:
-

ah my bad, its available through the Scala API's ExecutionEnvironment, so it's 
staying then...

> Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a 
> {{Pojo}} type. As a consequence, the processing logic, which has to work for 
> both types, is overly complex. For example, the {{CsvInputFormat}} contains 
> fields which are only used when a Pojo is returned. Moreover, the pojo field 
> information are constructed by calling setter methods which have to be called 
> in a very specific order, otherwise they fail. E.g. one first has to call 
> {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the 
> number of fields might be different. Furthermore, some of the methods can 
> only be called if the return type is a {{Pojo}} type, because they expect 
> that a {{PojoTypeInfo}} is present.
> I think the {{CsvInputFormat}} should be refactored to make the code more 
> easily maintainable. I propose to split it up into a 
> {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all 
> the required information via their constructors instead of using the 
> {{setFields}} and {{setOrderOfPOJOFields}} approach.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-10-16 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14960375#comment-14960375
 ] 

Aljoscha Krettek commented on FLINK-2692:
-

I think the problem is that the Tuple types are different. Maybe you could 
extend the createInstance method to also take a reuse object, Then the 
ScalaTupleSerializer would ignore the reuse and work as before and the Java 
TupleSerializer would take the reuse object and fill it. I think then we could 
do with one CsvInputFormat class.

> Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a 
> {{Pojo}} type. As a consequence, the processing logic, which has to work for 
> both types, is overly complex. For example, the {{CsvInputFormat}} contains 
> fields which are only used when a Pojo is returned. Moreover, the pojo field 
> information are constructed by calling setter methods which have to be called 
> in a very specific order, otherwise they fail. E.g. one first has to call 
> {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the 
> number of fields might be different. Furthermore, some of the methods can 
> only be called if the return type is a {{Pojo}} type, because they expect 
> that a {{PojoTypeInfo}} is present.
> I think the {{CsvInputFormat}} should be refactored to make the code more 
> easily maintainable. I propose to split it up into a 
> {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all 
> the required information via their constructors instead of using the 
> {{setFields}} and {{setOrderOfPOJOFields}} approach.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-10-15 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14958724#comment-14958724
 ] 

Chesnay Schepler commented on FLINK-2692:
-

I'm gonna take a stab at this one.

> Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a 
> {{Pojo}} type. As a consequence, the processing logic, which has to work for 
> both types, is overly complex. For example, the {{CsvInputFormat}} contains 
> fields which are only used when a Pojo is returned. Moreover, the pojo field 
> information are constructed by calling setter methods which have to be called 
> in a very specific order, otherwise they fail. E.g. one first has to call 
> {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the 
> number of fields might be different. Furthermore, some of the methods can 
> only be called if the return type is a {{Pojo}} type, because they expect 
> that a {{PojoTypeInfo}} is present.
> I think the {{CsvInputFormat}} should be refactored to make the code more 
> easily maintainable. I propose to split it up into a 
> {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all 
> the required information via their constructors instead of using the 
> {{setFields}} and {{setOrderOfPOJOFields}} approach.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-10-15 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14959022#comment-14959022
 ] 

Chesnay Schepler commented on FLINK-2692:
-

is there any reason that prevents the scala api from using the CsvInputFormat 
class? they only differ in the createTuple method:

Java:
{code}
@Override
protected OUT createTuple(OUT reuse) {
Tuple result = (Tuple) reuse;
for (int i = 0; i < parsedValues.length; i++) {
result.setField(parsedValues[i], i);
}
return reuse;
}
{code}
Scala:
{code}
@Override
protected OUT createTuple(OUT reuse) {
Preconditions.checkNotNull(tupleSerializer, "The tuple 
serializer must be initialised." +
" It is not initialized if the given type was not a " +
TupleTypeInfoBase.class.getName() + ".");

return tupleSerializer.createInstance(parsedValues);
}
{code}

> Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a 
> {{Pojo}} type. As a consequence, the processing logic, which has to work for 
> both types, is overly complex. For example, the {{CsvInputFormat}} contains 
> fields which are only used when a Pojo is returned. Moreover, the pojo field 
> information are constructed by calling setter methods which have to be called 
> in a very specific order, otherwise they fail. E.g. one first has to call 
> {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the 
> number of fields might be different. Furthermore, some of the methods can 
> only be called if the return type is a {{Pojo}} type, because they expect 
> that a {{PojoTypeInfo}} is present.
> I think the {{CsvInputFormat}} should be refactored to make the code more 
> easily maintainable. I propose to split it up into a 
> {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all 
> the required information via their constructors instead of using the 
> {{setFields}} and {{setOrderOfPOJOFields}} approach.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-10-01 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14939568#comment-14939568
 ] 

Till Rohrmann commented on FLINK-2692:
--

No it does not refer to {{org.apache.flink.api.java.record.io.CsvInputFormat}}, 
which will hopefully soon be removed, but to the 
{{org.apache.flink.api.java.io.CsvInputFormat}}.

> Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a 
> {{Pojo}} type. As a consequence, the processing logic, which has to work for 
> both types, is overly complex. For example, the {{CsvInputFormat}} contains 
> fields which are only used when a Pojo is returned. Moreover, the pojo field 
> information are constructed by calling setter methods which have to be called 
> in a very specific order, otherwise they fail. E.g. one first has to call 
> {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the 
> number of fields might be different. Furthermore, some of the methods can 
> only be called if the return type is a {{Pojo}} type, because they expect 
> that a {{PojoTypeInfo}} is present.
> I think the {{CsvInputFormat}} should be refactored to make the code more 
> easily maintainable. I propose to split it up into a 
> {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all 
> the required information via their constructors instead of using the 
> {{setFields}} and {{setOrderOfPOJOFields}} approach.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2692) Untangle CsvInputFormat into PojoTypeCsvInputFormat and TupleTypeCsvInputFormat

2015-09-30 Thread Martin Liesenberg (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14938734#comment-14938734
 ] 

Martin Liesenberg commented on FLINK-2692:
--

Does this refer to the deprecated class in flink-java?

> Untangle CsvInputFormat into PojoTypeCsvInputFormat and 
> TupleTypeCsvInputFormat 
> 
>
> Key: FLINK-2692
> URL: https://issues.apache.org/jira/browse/FLINK-2692
> Project: Flink
>  Issue Type: Improvement
>Reporter: Till Rohrmann
>Priority: Minor
>
> The {{CsvInputFormat}} currently allows to return values as a {{Tuple}} or a 
> {{Pojo}} type. As a consequence, the processing logic, which has to work for 
> both types, is overly complex. For example, the {{CsvInputFormat}} contains 
> fields which are only used when a Pojo is returned. Moreover, the pojo field 
> information are constructed by calling setter methods which have to be called 
> in a very specific order, otherwise they fail. E.g. one first has to call 
> {{setFieldTypes}} before calling {{setOrderOfPOJOFields}}, otherwise the 
> number of fields might be different. Furthermore, some of the methods can 
> only be called if the return type is a {{Pojo}} type, because they expect 
> that a {{PojoTypeInfo}} is present.
> I think the {{CsvInputFormat}} should be refactored to make the code more 
> easily maintainable. I propose to split it up into a 
> {{PojoTypeCsvInputFormat}} and a {{TupleTypeCsvInputFormat}} which take all 
> the required information via their constructors instead of using the 
> {{setFields}} and {{setOrderOfPOJOFields}} approach.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)