[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-25 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-85866221
  
Cool, thanks. Will have a look shortly.

Did you rebase to the latest master? We had a few build issues with the
master branch a few days ago.

It is a good sign if it builds on your machine.
On Mar 25, 2015 5:47 AM, Chiwan Park notificati...@github.com wrote:

 Hi, I updated this PR.

- Remove pojoType(Class? targetType) method in CsvReader to force
the user to explicitly specify the fields order.
- Add checking the fields order existence routine in readCsvFile
method.
- Add two integration tests for above 2 modifications.

 By the way, I cannot find out why Travis fails. In my computer, mvn clean
 install -DskipTests and mvn verify succeed. From travis log
 https://travis-ci.org/chiwanpark/flink/jobs/55747686#L7074, It seems
 that the problem relates with Gelly. Although I read some codes in Gelly, 
I
 cannot find what is exactly problem.

 Could anyone help me with this problem?

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/flink/pull/426#issuecomment-85830275.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-25 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-86011099
  
Oops, I pushed a intermediate commit a8a5c37. I will fix it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-25 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-86011531
  
Sure, no problem :-) 
Can I check it now or do you need a bit more time?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-25 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-86011782
  
@fhueske You can check it now :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-25 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-86079595
  
@chiwanpark excellent job, thanks!
Will merge it after a final round of Travis tests passed.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/426


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r27018880
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
 ---
@@ -223,8 +224,11 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
* @param lenient Whether the parser should silently ignore malformed 
lines.
* @param includedFields The fields in the file that should be read. Per 
default all fields
*   are read.
+   * @param fieldsOrder The order information between CSV data and POJO 
fields. Without order
--- End diff --

change parameter name to `pojoFieldOrder`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r27018277
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -52,103 +50,86 @@

public static final String DEFAULT_FIELD_DELIMITER = ,;
 
-
private transient Object[] parsedValues;
-   
-   private byte[] commentPrefix = null;
-   
-   // To speed up readRecord processing. Used to find windows line endings.
-   // It is set when open so that readRecord does not have to evaluate it
-   private boolean lineDelimiterIsLinebreak = false;
-   
-   private transient int commentCount;
 
-   private transient int invalidLineCount;
-   
-   
-   public CsvInputFormat(Path filePath) {
-   super(filePath);
-   }   
-   
-   public CsvInputFormat(Path filePath, Class? ... types) {
-   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
types);
-   }   
+   private ClassOUT pojoTypeClass = null;
+   private String[] pojoFieldsName = null;
+   private transient Field[] pojoFields = null;
+   private transient PojoTypeInfoOUT pojoTypeInfo = null;
+
+   public CsvInputFormat(Path filePath, TypeInformationOUT 
typeInformation) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
typeInformation);
+   }

-   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, Class?... types) {
+   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, TypeInformationOUT typeInformation) {
super(filePath);
 
+   Preconditions.checkArgument(typeInformation instanceof 
CompositeType);
+   CompositeTypeOUT compositeType = (CompositeTypeOUT) 
typeInformation;
+
setDelimiter(lineDelimiter);
setFieldDelimiter(fieldDelimiter);
 
-   setFieldTypes(types);
-   }
-   
-   
-   public byte[] getCommentPrefix() {
-   return commentPrefix;
-   }
-   
-   public void setCommentPrefix(byte[] commentPrefix) {
-   this.commentPrefix = commentPrefix;
-   }
-   
-   public void setCommentPrefix(char commentPrefix) {
-   setCommentPrefix(String.valueOf(commentPrefix));
-   }
-   
-   public void setCommentPrefix(String commentPrefix) {
-   setCommentPrefix(commentPrefix, Charsets.UTF_8);
-   }
-   
-   public void setCommentPrefix(String commentPrefix, String charsetName) 
throws IllegalCharsetNameException, UnsupportedCharsetException {
-   if (charsetName == null) {
-   throw new IllegalArgumentException(Charset name must 
not be null);
+   Class?[] classes = new Class?[typeInformation.getArity()];
+   for (int i = 0, arity = typeInformation.getArity(); i  arity; 
i++) {
+   classes[i] = compositeType.getTypeAt(i).getTypeClass();
}
-   
-   if (commentPrefix != null) {
-   Charset charset = Charset.forName(charsetName);
-   setCommentPrefix(commentPrefix, charset);
-   } else {
-   this.commentPrefix = null;
+   setFieldTypes(classes);
+
+   if (typeInformation instanceof PojoTypeInfo) {
+   pojoTypeInfo = (PojoTypeInfoOUT) typeInformation;
+   pojoTypeClass = typeInformation.getTypeClass();
+   pojoFieldsName = compositeType.getFieldNames();
+   setFieldsOrder(pojoFieldsName);
}
}
-   
-   public void setCommentPrefix(String commentPrefix, Charset charset) {
-   if (charset == null) {
-   throw new IllegalArgumentException(Charset must not be 
null);
+
+   public void setFieldsOrder(String[] fieldsOrder) {
+   Preconditions.checkNotNull(pojoTypeClass, Field ordering 
feature can be used only with POJO fields.);
--- End diff --

How about `Field order can only be specified if output type is a POJO`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r27018316
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -52,103 +50,86 @@

public static final String DEFAULT_FIELD_DELIMITER = ,;
 
-
private transient Object[] parsedValues;
-   
-   private byte[] commentPrefix = null;
-   
-   // To speed up readRecord processing. Used to find windows line endings.
-   // It is set when open so that readRecord does not have to evaluate it
-   private boolean lineDelimiterIsLinebreak = false;
-   
-   private transient int commentCount;
 
-   private transient int invalidLineCount;
-   
-   
-   public CsvInputFormat(Path filePath) {
-   super(filePath);
-   }   
-   
-   public CsvInputFormat(Path filePath, Class? ... types) {
-   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
types);
-   }   
+   private ClassOUT pojoTypeClass = null;
+   private String[] pojoFieldsName = null;
+   private transient Field[] pojoFields = null;
+   private transient PojoTypeInfoOUT pojoTypeInfo = null;
+
+   public CsvInputFormat(Path filePath, TypeInformationOUT 
typeInformation) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
typeInformation);
+   }

-   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, Class?... types) {
+   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, TypeInformationOUT typeInformation) {
super(filePath);
 
+   Preconditions.checkArgument(typeInformation instanceof 
CompositeType);
+   CompositeTypeOUT compositeType = (CompositeTypeOUT) 
typeInformation;
+
setDelimiter(lineDelimiter);
setFieldDelimiter(fieldDelimiter);
 
-   setFieldTypes(types);
-   }
-   
-   
-   public byte[] getCommentPrefix() {
-   return commentPrefix;
-   }
-   
-   public void setCommentPrefix(byte[] commentPrefix) {
-   this.commentPrefix = commentPrefix;
-   }
-   
-   public void setCommentPrefix(char commentPrefix) {
-   setCommentPrefix(String.valueOf(commentPrefix));
-   }
-   
-   public void setCommentPrefix(String commentPrefix) {
-   setCommentPrefix(commentPrefix, Charsets.UTF_8);
-   }
-   
-   public void setCommentPrefix(String commentPrefix, String charsetName) 
throws IllegalCharsetNameException, UnsupportedCharsetException {
-   if (charsetName == null) {
-   throw new IllegalArgumentException(Charset name must 
not be null);
+   Class?[] classes = new Class?[typeInformation.getArity()];
+   for (int i = 0, arity = typeInformation.getArity(); i  arity; 
i++) {
+   classes[i] = compositeType.getTypeAt(i).getTypeClass();
}
-   
-   if (commentPrefix != null) {
-   Charset charset = Charset.forName(charsetName);
-   setCommentPrefix(commentPrefix, charset);
-   } else {
-   this.commentPrefix = null;
+   setFieldTypes(classes);
+
+   if (typeInformation instanceof PojoTypeInfo) {
+   pojoTypeInfo = (PojoTypeInfoOUT) typeInformation;
+   pojoTypeClass = typeInformation.getTypeClass();
+   pojoFieldsName = compositeType.getFieldNames();
+   setFieldsOrder(pojoFieldsName);
}
}
-   
-   public void setCommentPrefix(String commentPrefix, Charset charset) {
-   if (charset == null) {
-   throw new IllegalArgumentException(Charset must not be 
null);
+
+   public void setFieldsOrder(String[] fieldsOrder) {
--- End diff --

Change method name to `setOrderOfPOJOFields`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-24 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-85455383
  
Very good! Let me know, when you want me to have a look again :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r27018151
  
--- Diff: 
flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
 ---
@@ -98,98 +123,66 @@ public void setFields(int[] sourceFieldIndices, 
Class?[] fieldTypes) {
setFieldsGeneric(sourceFieldIndices, fieldTypes);
}
 
-   public byte[] getCommentPrefix() {
-   return commentPrefix;
-   }
-
-   public void setCommentPrefix(byte[] commentPrefix) {
-   this.commentPrefix = commentPrefix;
-   }
-
-   public void setCommentPrefix(char commentPrefix) {
-   setCommentPrefix(String.valueOf(commentPrefix));
-   }
+   public void setFields(boolean[] sourceFieldMask, Class?[] fieldTypes) 
{
+   Preconditions.checkNotNull(sourceFieldMask);
+   Preconditions.checkNotNull(fieldTypes);
 
-   public void setCommentPrefix(String commentPrefix) {
-   setCommentPrefix(commentPrefix, Charsets.UTF_8);
+   setFieldsGeneric(sourceFieldMask, fieldTypes);
}
 
-   public void setCommentPrefix(String commentPrefix, String charsetName) 
throws IllegalCharsetNameException, UnsupportedCharsetException {
-   if (charsetName == null) {
-   throw new IllegalArgumentException(Charset name must 
not be null);
-   }
-
-   if (commentPrefix != null) {
-   Charset charset = Charset.forName(charsetName);
-   setCommentPrefix(commentPrefix, charset);
-   } else {
-   this.commentPrefix = null;
-   }
+   public Class?[] getFieldTypes() {
+   return super.getGenericFieldTypes();
}
 
-   public void setCommentPrefix(String commentPrefix, Charset charset) {
-   if (charset == null) {
-   throw new IllegalArgumentException(Charset must not be 
null);
-   }
-   if (commentPrefix != null) {
-   this.commentPrefix = commentPrefix.getBytes(charset);
-   } else {
-   this.commentPrefix = null;
-   }
-   }
-
-   @Override
-   public void close() throws IOException {
--- End diff --

Did the `close()` method got lost in the refactoring?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-24 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-85447697
  
I see the issue with the non-deterministic field order and FLINK-1665 as 
follows. Both, FLINK-1665 and Option 3, solve the problem of non-deterministic 
field order. 

- FLINK-1665 by specifying the order in the POJO using annoations.
- Option 3 by forcing the user to explicitly specify the field order. 

While we don't have FLINK-1665 implemented, I would go with Option 3. Once 
we have FLINK-1665 we could relax it for POJOs with `@Position` annotations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r27018812
  
--- Diff: 
flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
 ---
@@ -19,66 +19,91 @@
 package org.apache.flink.api.scala.operators;
 
 
-import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.GenericCsvInputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.parser.FieldParser;
-import org.apache.flink.util.StringUtils;
 
+import org.apache.flink.types.parser.FieldParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.charset.IllegalCharsetNameException;
-import java.nio.charset.UnsupportedCharsetException;
-import java.util.Map;
-import java.util.TreeMap;
+import java.lang.reflect.Field;
+import java.util.Arrays;
 
-import scala.Product;
-
-public class ScalaCsvInputFormatOUT extends Product extends 
GenericCsvInputFormatOUT {
+public class ScalaCsvInputFormatOUT extends GenericCsvInputFormatOUT {
 
private static final long serialVersionUID = 1L;
 
private static final Logger LOG = 
LoggerFactory.getLogger(ScalaCsvInputFormat.class);
-   
-   private transient Object[] parsedValues;
-   
-   // To speed up readRecord processing. Used to find windows line endings.
-   // It is set when open so that readRecord does not have to evaluate it
-   private boolean lineDelimiterIsLinebreak = false;
 
-   private final TupleSerializerBaseOUT serializer;
+   private transient Object[] parsedValues;
 
-   private byte[] commentPrefix = null;
+   private final TupleSerializerBaseOUT tupleSerializer;
 
-   private transient int commentCount;
-   private transient int invalidLineCount;
+   private ClassOUT pojoTypeClass = null;
+   private String[] pojoFieldsName = null;
+   private transient Field[] pojoFields = null;
+   private transient PojoTypeInfoOUT pojoTypeInfo = null;
 
public ScalaCsvInputFormat(Path filePath, TypeInformationOUT 
typeInfo) {
super(filePath);
 
-   if (!(typeInfo.isTupleType())) {
-   throw new UnsupportedOperationException(This only 
works on tuple types.);
+   Class?[] classes = new Class[typeInfo.getArity()];
+
+   if (typeInfo instanceof TupleTypeInfoBase) {
+   TupleTypeInfoBaseOUT tupleType = 
(TupleTypeInfoBaseOUT) typeInfo;
+   // We can use an empty config here, since we only use 
the serializer to create
+   // the top-level case class
+   tupleSerializer = (TupleSerializerBaseOUT) 
tupleType.createSerializer(new ExecutionConfig());
+
+   for (int i = 0; i  tupleType.getArity(); i++) {
+   classes[i] = 
tupleType.getTypeAt(i).getTypeClass();
+   }
+
+   setFieldTypes(classes);
+   } else {
+   tupleSerializer = null;
+   pojoTypeInfo = (PojoTypeInfoOUT) typeInfo;
+   pojoTypeClass = typeInfo.getTypeClass();
+   pojoFieldsName = pojoTypeInfo.getFieldNames();
+
+   for (int i = 0, arity = pojoTypeInfo.getArity(); i  
arity; i++) {
+   classes[i] = 
pojoTypeInfo.getTypeAt(i).getTypeClass();
+   }
+
+   setFieldTypes(classes);
+   setFieldsOrder(pojoFieldsName);
+   }
+   }
+
+   public void setFieldsOrder(String[] fieldsOrder) {
--- End diff --

Rename method to `setOrderOfPOJOFields`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r27018473
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -52,103 +50,86 @@

public static final String DEFAULT_FIELD_DELIMITER = ,;
 
-
private transient Object[] parsedValues;
-   
-   private byte[] commentPrefix = null;
-   
-   // To speed up readRecord processing. Used to find windows line endings.
-   // It is set when open so that readRecord does not have to evaluate it
-   private boolean lineDelimiterIsLinebreak = false;
-   
-   private transient int commentCount;
 
-   private transient int invalidLineCount;
-   
-   
-   public CsvInputFormat(Path filePath) {
-   super(filePath);
-   }   
-   
-   public CsvInputFormat(Path filePath, Class? ... types) {
-   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
types);
-   }   
+   private ClassOUT pojoTypeClass = null;
+   private String[] pojoFieldsName = null;
+   private transient Field[] pojoFields = null;
+   private transient PojoTypeInfoOUT pojoTypeInfo = null;
+
+   public CsvInputFormat(Path filePath, TypeInformationOUT 
typeInformation) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
typeInformation);
+   }

-   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, Class?... types) {
+   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, TypeInformationOUT typeInformation) {
super(filePath);
 
+   Preconditions.checkArgument(typeInformation instanceof 
CompositeType);
+   CompositeTypeOUT compositeType = (CompositeTypeOUT) 
typeInformation;
+
setDelimiter(lineDelimiter);
setFieldDelimiter(fieldDelimiter);
 
-   setFieldTypes(types);
-   }
-   
-   
-   public byte[] getCommentPrefix() {
-   return commentPrefix;
-   }
-   
-   public void setCommentPrefix(byte[] commentPrefix) {
-   this.commentPrefix = commentPrefix;
-   }
-   
-   public void setCommentPrefix(char commentPrefix) {
-   setCommentPrefix(String.valueOf(commentPrefix));
-   }
-   
-   public void setCommentPrefix(String commentPrefix) {
-   setCommentPrefix(commentPrefix, Charsets.UTF_8);
-   }
-   
-   public void setCommentPrefix(String commentPrefix, String charsetName) 
throws IllegalCharsetNameException, UnsupportedCharsetException {
-   if (charsetName == null) {
-   throw new IllegalArgumentException(Charset name must 
not be null);
+   Class?[] classes = new Class?[typeInformation.getArity()];
+   for (int i = 0, arity = typeInformation.getArity(); i  arity; 
i++) {
+   classes[i] = compositeType.getTypeAt(i).getTypeClass();
}
-   
-   if (commentPrefix != null) {
-   Charset charset = Charset.forName(charsetName);
-   setCommentPrefix(commentPrefix, charset);
-   } else {
-   this.commentPrefix = null;
+   setFieldTypes(classes);
+
+   if (typeInformation instanceof PojoTypeInfo) {
+   pojoTypeInfo = (PojoTypeInfoOUT) typeInformation;
+   pojoTypeClass = typeInformation.getTypeClass();
+   pojoFieldsName = compositeType.getFieldNames();
+   setFieldsOrder(pojoFieldsName);
}
}
-   
-   public void setCommentPrefix(String commentPrefix, Charset charset) {
-   if (charset == null) {
-   throw new IllegalArgumentException(Charset must not be 
null);
+
+   public void setFieldsOrder(String[] fieldsOrder) {
+   Preconditions.checkNotNull(pojoTypeClass, Field ordering 
feature can be used only with POJO fields.);
+   Preconditions.checkNotNull(fieldsOrder);
+
+   int includedCount = 0;
+   for (boolean isIncluded : fieldIncluded) {
+   if (isIncluded) {
+   includedCount++;
+   }
}
-   if (commentPrefix != null) {
-   this.commentPrefix = commentPrefix.getBytes(charset);
-   } else {
-   this.commentPrefix = null;
+
+   Preconditions.checkArgument(includedCount == fieldsOrder.length,
+   The number of selected POJO fields should be 
the same as that of CSV fields.);
--- End diff --

How about 

[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r27018597
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -52,103 +50,86 @@

public static final String DEFAULT_FIELD_DELIMITER = ,;
 
-
private transient Object[] parsedValues;
-   
-   private byte[] commentPrefix = null;
-   
-   // To speed up readRecord processing. Used to find windows line endings.
-   // It is set when open so that readRecord does not have to evaluate it
-   private boolean lineDelimiterIsLinebreak = false;
-   
-   private transient int commentCount;
 
-   private transient int invalidLineCount;
-   
-   
-   public CsvInputFormat(Path filePath) {
-   super(filePath);
-   }   
-   
-   public CsvInputFormat(Path filePath, Class? ... types) {
-   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
types);
-   }   
+   private ClassOUT pojoTypeClass = null;
+   private String[] pojoFieldsName = null;
+   private transient Field[] pojoFields = null;
+   private transient PojoTypeInfoOUT pojoTypeInfo = null;
+
+   public CsvInputFormat(Path filePath, TypeInformationOUT 
typeInformation) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
typeInformation);
+   }

-   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, Class?... types) {
+   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, TypeInformationOUT typeInformation) {
super(filePath);
 
+   Preconditions.checkArgument(typeInformation instanceof 
CompositeType);
+   CompositeTypeOUT compositeType = (CompositeTypeOUT) 
typeInformation;
+
setDelimiter(lineDelimiter);
setFieldDelimiter(fieldDelimiter);
 
-   setFieldTypes(types);
-   }
-   
-   
-   public byte[] getCommentPrefix() {
-   return commentPrefix;
-   }
-   
-   public void setCommentPrefix(byte[] commentPrefix) {
-   this.commentPrefix = commentPrefix;
-   }
-   
-   public void setCommentPrefix(char commentPrefix) {
-   setCommentPrefix(String.valueOf(commentPrefix));
-   }
-   
-   public void setCommentPrefix(String commentPrefix) {
-   setCommentPrefix(commentPrefix, Charsets.UTF_8);
-   }
-   
-   public void setCommentPrefix(String commentPrefix, String charsetName) 
throws IllegalCharsetNameException, UnsupportedCharsetException {
-   if (charsetName == null) {
-   throw new IllegalArgumentException(Charset name must 
not be null);
+   Class?[] classes = new Class?[typeInformation.getArity()];
+   for (int i = 0, arity = typeInformation.getArity(); i  arity; 
i++) {
+   classes[i] = compositeType.getTypeAt(i).getTypeClass();
}
-   
-   if (commentPrefix != null) {
-   Charset charset = Charset.forName(charsetName);
-   setCommentPrefix(commentPrefix, charset);
-   } else {
-   this.commentPrefix = null;
+   setFieldTypes(classes);
+
+   if (typeInformation instanceof PojoTypeInfo) {
+   pojoTypeInfo = (PojoTypeInfoOUT) typeInformation;
+   pojoTypeClass = typeInformation.getTypeClass();
+   pojoFieldsName = compositeType.getFieldNames();
+   setFieldsOrder(pojoFieldsName);
}
}
-   
-   public void setCommentPrefix(String commentPrefix, Charset charset) {
-   if (charset == null) {
-   throw new IllegalArgumentException(Charset must not be 
null);
+
+   public void setFieldsOrder(String[] fieldsOrder) {
+   Preconditions.checkNotNull(pojoTypeClass, Field ordering 
feature can be used only with POJO fields.);
+   Preconditions.checkNotNull(fieldsOrder);
+
+   int includedCount = 0;
+   for (boolean isIncluded : fieldIncluded) {
+   if (isIncluded) {
+   includedCount++;
+   }
}
-   if (commentPrefix != null) {
-   this.commentPrefix = commentPrefix.getBytes(charset);
-   } else {
-   this.commentPrefix = null;
+
+   Preconditions.checkArgument(includedCount == fieldsOrder.length,
+   The number of selected POJO fields should be 
the same as that of CSV fields.);
+
+   for (String field : 

[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-24 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-85830275
  
Hi, I updated this PR.

* Remove `pojoType(Class? targetType)` method in `CsvReader` to force the 
user to explicitly specify the fields order.
* Add checking the fields order existence routine in `readCsvFile` method.
* Add two integration tests for above 2 modifications.

By the way, I cannot find out why Travis fails. In my computer, `mvn clean 
install -DskipTests` and `mvn verify` succeed. From [travis 
log](https://travis-ci.org/chiwanpark/flink/jobs/55747686#L7074), It seems that 
the problem relates with Gelly. Although I read some codes in Gelly, I cannot 
find what is exactly problem.

Could anyone help me with this problem?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-23 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-84977748
  
Hi @chiwanpark 
Thanks for updating the PR! :-)
I was gone for a few days. Will have a look at your PR shortly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r26991431
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -234,9 +301,23 @@ public OUT readRecord(OUT reuse, byte[] bytes, int 
offset, int numBytes) throws
}

if (parseRecord(parsedValues, bytes, offset, numBytes)) {
-   // valid parse, map values into pact record
-   for (int i = 0; i  parsedValues.length; i++) {
-   reuse.setField(parsedValues[i], i);
+   if (pojoTypeClass == null) {
+   // result type is tuple
+   Tuple result = (Tuple) reuse;
+   for (int i = 0; i  parsedValues.length; i++) {
+   result.setField(parsedValues[i], i);
+   }
+   } else {
+   // result type is POJO
+   for (int i = 0; i  parsedValues.length; i++) {
+   try {
+   pojoFields[i].set(reuse, 
parsedValues[i]);
+   } catch (IllegalAccessException e) {
+   LOG.error(Cannot set value to 
given type object!, e);
--- End diff --

This error will not be caused by invalid data.
You should rethrow this exception to kill the job. If the field cannot be 
set, something is seriously wrong and the job should not continue. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r26992871
  
--- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java 
---
@@ -684,4 +693,249 @@ private void testRemovingTrailingCR(String 
lineBreakerInFile, String lineBreaker
}
}
 
+   private void validatePojoItem(CsvInputFormatPojoItem format) throws 
Exception {
+   PojoItem item = new PojoItem();
+
+   format.nextRecord(item);
+
+   assertEquals(123, item.field1);
+   assertEquals(AAA, item.field2);
+   assertEquals(Double.valueOf(3.123), item.field3);
+   assertEquals(BBB, item.field4);
+
+   format.nextRecord(item);
+
+   assertEquals(456, item.field1);
+   assertEquals(BBB, item.field2);
+   assertEquals(Double.valueOf(1.123), item.field3);
+   assertEquals(AAA, item.field4);
+   }
+
+   @Test
+   public void testPojoType() throws Exception {
+   File tempFile = File.createTempFile(CsvReaderPojoType, tmp);
+   tempFile.deleteOnExit();
+   tempFile.setWritable(true);
+
+   OutputStreamWriter wrt = new OutputStreamWriter(new 
FileOutputStream(tempFile));
+   wrt.write(123,AAA,3.123,BBB\n);
+   wrt.write(456,BBB,1.123,AAA\n);
+   wrt.close();
+
+   @SuppressWarnings(unchecked)
+   TypeInformationPojoItem typeInfo = 
(TypeInformationPojoItem) TypeExtractor.createTypeInfo(PojoItem.class);
+   CsvInputFormatPojoItem inputFormat = new 
CsvInputFormatPojoItem(new Path(tempFile.toURI().toString()), typeInfo);
+
+   inputFormat.configure(new Configuration());
+   FileInputSplit[] splits = inputFormat.createInputSplits(1);
+
+   inputFormat.open(splits[0]);
+
+   validatePojoItem(inputFormat);
+   }
+
+   @Test
+   public void testPojoTypeWithPrivateField() throws Exception {
+   File tempFile = File.createTempFile(CsvReaderPojoType, tmp);
+   tempFile.deleteOnExit();
+   tempFile.setWritable(true);
+
+   OutputStreamWriter wrt = new OutputStreamWriter(new 
FileOutputStream(tempFile));
+   wrt.write(123,AAA,3.123,BBB\n);
+   wrt.write(456,BBB,1.123,AAA\n);
+   wrt.close();
+
+   @SuppressWarnings(unchecked)
+   TypeInformationPrivatePojoItem typeInfo = 
(TypeInformationPrivatePojoItem) 
TypeExtractor.createTypeInfo(PrivatePojoItem.class);
+   CsvInputFormatPrivatePojoItem inputFormat = new 
CsvInputFormatPrivatePojoItem(new Path(tempFile.toURI().toString()), 
typeInfo);
+
+   inputFormat.configure(new Configuration());
+
+   FileInputSplit[] splits = inputFormat.createInputSplits(1);
+   inputFormat.open(splits[0]);
+
+   PrivatePojoItem item = new PrivatePojoItem();
+   inputFormat.nextRecord(item);
--- End diff --

you can use the `validatePojoItem()` method here as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r26993897
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
 ---
@@ -247,16 +252,27 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   inputFormat.enableQuotedStringParsing(quoteCharacter);
 }
 
-val classes: Array[Class[_]] = new Array[Class[_]](typeInfo.getArity)
+val classesBuf: ArrayBuffer[Class[_]] = new ArrayBuffer[Class[_]]
 for (i - 0 until typeInfo.getArity) {
-  classes(i) = typeInfo.getTypeAt(i).getTypeClass
+  typeInfo match {
+case info: TupleTypeInfoBase[T] = classesBuf += 
info.getTypeAt(i).getTypeClass()
+case info: PojoTypeInfo[T] =
+  if (includedFields == null || includedFields.indexOf(i) != -1) {
--- End diff --

Just add all POJO fields without checking against includedFields. 
IncludedFields refer to the fields of the CSV file not to the fields of the 
POJO.
The later check ensures that the number of read fields and the number of 
POJO/Tuple fields is equal.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-23 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-85261367
  
Hi @chiwanpark, the PR improved a lot! Besides a few inline comments, the 
logic and test coverage looks pretty good. Unfortunately, the PR does not seem 
to build correctly (Check 
[Travis](https://travis-ci.org/apache/flink/builds/54853098)). I'll do another 
quick pass tomorrow and have a closer look at the error messages. 

IMO, there are just two things left to be discussed before we can test it 
on a cluster:

**Order of POJO fields**
This is actually a crucial point. We need to make sure, that the field 
order is deterministic for all setups. I think (not sure!) right now, the order 
of the fields depends on the order in which the fields are returned by Java's 
reflection tools. This order needs to be standardized to ensure that all JVMs 
that obey the standard are compatible. However, even a standardized order 
(lexicographic order) might not be very helpful.

There are several options here:
1. If there the order of fields is standardized, just use that by default.
2. If not, we can deterministically sort the fields ourselves in the 
PojoTypeInfo.
3. We make the definition of a field order mandatory until we can define 
the order of POJO fields (e.g., via the proposed `@Position` annotation).
I am leaning towards option 3. 

**Changed Inheritance of ScalaCsvInputFormat**
This is a very good observation, however I would keep the original 
inheritance for now. At the moment, GenericCsvInputFormat is extended by 
CsvInputFormat (for Java API), ScalaCsvInputFormat (for Scala API), 
CsvInputFormat (for Record API), and a test class. We will soon remove the 
RecordAPI such that only the Java and the Scala API will remain. I think we can 
then move all common operations to GenericCsvInputFormat (maybe we can do that 
already...). @chiwanpark does that make sense to you or do you disagree?

@chiwanpark What do you think? Any other opinions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r26993011
  
--- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java 
---
@@ -684,4 +693,249 @@ private void testRemovingTrailingCR(String 
lineBreakerInFile, String lineBreaker
}
}
 
+   private void validatePojoItem(CsvInputFormatPojoItem format) throws 
Exception {
+   PojoItem item = new PojoItem();
+
+   format.nextRecord(item);
+
+   assertEquals(123, item.field1);
+   assertEquals(AAA, item.field2);
+   assertEquals(Double.valueOf(3.123), item.field3);
+   assertEquals(BBB, item.field4);
+
+   format.nextRecord(item);
+
+   assertEquals(456, item.field1);
+   assertEquals(BBB, item.field2);
+   assertEquals(Double.valueOf(1.123), item.field3);
+   assertEquals(AAA, item.field4);
+   }
+
+   @Test
+   public void testPojoType() throws Exception {
+   File tempFile = File.createTempFile(CsvReaderPojoType, tmp);
+   tempFile.deleteOnExit();
+   tempFile.setWritable(true);
+
+   OutputStreamWriter wrt = new OutputStreamWriter(new 
FileOutputStream(tempFile));
+   wrt.write(123,AAA,3.123,BBB\n);
+   wrt.write(456,BBB,1.123,AAA\n);
+   wrt.close();
+
+   @SuppressWarnings(unchecked)
+   TypeInformationPojoItem typeInfo = 
(TypeInformationPojoItem) TypeExtractor.createTypeInfo(PojoItem.class);
+   CsvInputFormatPojoItem inputFormat = new 
CsvInputFormatPojoItem(new Path(tempFile.toURI().toString()), typeInfo);
+
+   inputFormat.configure(new Configuration());
+   FileInputSplit[] splits = inputFormat.createInputSplits(1);
+
+   inputFormat.open(splits[0]);
+
+   validatePojoItem(inputFormat);
+   }
+
+   @Test
+   public void testPojoTypeWithPrivateField() throws Exception {
+   File tempFile = File.createTempFile(CsvReaderPojoType, tmp);
+   tempFile.deleteOnExit();
+   tempFile.setWritable(true);
+
+   OutputStreamWriter wrt = new OutputStreamWriter(new 
FileOutputStream(tempFile));
+   wrt.write(123,AAA,3.123,BBB\n);
+   wrt.write(456,BBB,1.123,AAA\n);
+   wrt.close();
+
+   @SuppressWarnings(unchecked)
+   TypeInformationPrivatePojoItem typeInfo = 
(TypeInformationPrivatePojoItem) 
TypeExtractor.createTypeInfo(PrivatePojoItem.class);
+   CsvInputFormatPrivatePojoItem inputFormat = new 
CsvInputFormatPrivatePojoItem(new Path(tempFile.toURI().toString()), 
typeInfo);
+
+   inputFormat.configure(new Configuration());
+
+   FileInputSplit[] splits = inputFormat.createInputSplits(1);
+   inputFormat.open(splits[0]);
+
+   PrivatePojoItem item = new PrivatePojoItem();
+   inputFormat.nextRecord(item);
+
+   assertEquals(123, item.field1);
+   assertEquals(AAA, item.field2);
+   assertEquals(Double.valueOf(3.123), item.field3);
+   assertEquals(BBB, item.field4);
+
+   inputFormat.nextRecord(item);
+
+   assertEquals(456, item.field1);
+   assertEquals(BBB, item.field2);
+   assertEquals(Double.valueOf(1.123), item.field3);
+   assertEquals(AAA, item.field4);
+   }
+
+   @Test
+   public void testPojoTypeWithMappingInformation() throws Exception {
+   File tempFile = File.createTempFile(CsvReaderPojoType, tmp);
+   tempFile.deleteOnExit();
+   tempFile.setWritable(true);
+
+   OutputStreamWriter wrt = new OutputStreamWriter(new 
FileOutputStream(tempFile));
+   wrt.write(123,3.123,AAA,BBB\n);
+   wrt.write(456,1.123,BBB,AAA\n);
+   wrt.close();
+
+   @SuppressWarnings(unchecked)
+   TypeInformationPojoItem typeInfo = 
(TypeInformationPojoItem) TypeExtractor.createTypeInfo(PojoItem.class);
+   CsvInputFormatPojoItem inputFormat = new 
CsvInputFormatPojoItem(new Path(tempFile.toURI().toString()), typeInfo);
+   inputFormat.setFieldsOrder(new String[]{field1, field3, 
field2, field4});
+
+   inputFormat.configure(new Configuration());
+   FileInputSplit[] splits = inputFormat.createInputSplits(1);
+
+   inputFormat.open(splits[0]);
+
+   validatePojoItem(inputFormat);
+   }
+
+   @Test
+   public void testPojoTypeWithPartialFieldInCSV() throws Exception {
+   File tempFile = 

[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r26994167
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderWithPOJOITCase.java 
---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.io;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class CsvReaderWithPOJOITCase extends MultipleProgramsTestBase {
+   private String resultPath;
+   private String expected;
+
+   @Rule
+   public TemporaryFolder tempFolder = new TemporaryFolder();
+
+   public CsvReaderWithPOJOITCase(ExecutionMode mode) {
+   super(mode);
+   }
+
+   @Before
+   public void before() throws Exception {
+   resultPath = tempFolder.newFile(result).toURI().toString();
+   }
+
+   @After
+   public void after() throws Exception {
+   compareResultsByLinesInMemory(expected, resultPath);
+   }
+
+   private String createInputData(String data) throws Exception {
+   File file = tempFolder.newFile(input);
+   Files.write(data, file, Charsets.UTF_8);
+
+   return file.toURI().toString();
+   }
+
+   @Test
+   public void testPOJOType() throws Exception {
+   final String dataPath = 
createInputData(ABC,2.20,3\nDEF,5.1,5\nDEF,3.30,1\nGHI,3.30,10);
+   final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   DataSetPOJOItem data = 
env.readCsvFile(dataPath).pojoType(POJOItem.class, new String[]{f1, f3, 
f2});
+   DataSetTuple2String, Integer result = data.map(new 
POJOToTupleMapper()).groupBy(0).sum(1);
--- End diff --

I would remove the `POJOToTupleMapper` and the aggregation and instead 
directly emit the POJO data with `writeAsText()`. This will use the 
`toString()` method of the POJO to write the data.
Simplifies the test and restricts it to the essential part.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r26993339
  
--- Diff: 
flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
 ---
@@ -219,73 +92,24 @@ public OUT readRecord(OUT reuse, byte[] bytes, int 
offset, int numBytes) {
}
 
if (parseRecord(parsedValues, bytes, offset, numBytes)) {
-   OUT result = serializer.createInstance(parsedValues);
-   return result;
+   if (tupleSerializer != null) {
+   return 
tupleSerializer.createInstance(parsedValues);
+   } else {
+   try {
+   for (int i = 0; i  pojoFields.length; 
i++) {
+   pojoFields[i].set(reuse, 
parsedValues[i]);
+   }
+   } catch (IllegalAccessException e) {
+   LOG.error(Cannot set value to given 
type object!, e);
--- End diff --

same as in the `CsvInputFormat`. Forward the exception to kill the job.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-23 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-85285193
  
@fhueske Hi, Thanks for your kindly advice! I will fix them soon.

About the order of POJO fields, I think also that the option 3 is good. 
However, [FLINK-1665](https://issues.apache.org/jira/browse/FLINK-1665) is not 
implemented yet. I would implement the option 1 and 2 now. After FLINK-1665 
completed, we can implement the option 3.

About the inheritance of `ScalaCsvInputFormat`, I didn't think about Record 
API. Your opinion looks good. I will revert the changes and refactor 
`GenericCsvInputFormat` to contain duplicated methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-18 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-82885499
  
I updated this PR.

* Change method of obtaining `Field` object from using `PojoTypeInfo` to 
saving field names. (Thanks @fhueske for advice!)
* `ScalaCsvInputFormat` extends `CsvInputFormat` because there are many 
duplicated code between the two classes.
* Add integration tests for `CsvReader` (Java API) and 
`ExecutionEnvironment.readCsvFile` (Scala API)

Any feedback is welcome! (especially error message because of my poor 
english)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-13 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-78876571
  
Hello. I have a question about object reuse in `readRecord` method of 
`ScalaCsvInputFormat`. In java implementation, `CsvInputFormat` reuse result 
object. But in `ScalaCsvInputFormat`, we don't reuse object and create instance 
for each record. Why don't `ScalaCsvInputFormat` reuse object?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-13 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-79066240
  
@aljoscha Thanks! I understand about it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-12 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r26215940
  
--- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java 
---
@@ -684,4 +693,178 @@ private void testRemovingTrailingCR(String 
lineBreakerInFile, String lineBreaker
}
}
 
+   @Test
+   public void testPojoType() throws Exception {
+   File tempFile = File.createTempFile(CsvReaderPojoType, tmp);
+   tempFile.deleteOnExit();
+   tempFile.setWritable(true);
+
+   OutputStreamWriter wrt = new OutputStreamWriter(new 
FileOutputStream(tempFile));
+   wrt.write(123,AAA,3.123,BBB\n);
+   wrt.write(456,BBB,1.123,AAA\n);
+   wrt.close();
+
+   @SuppressWarnings(unchecked)
+   TypeInformationPojoItem typeInfo = 
(TypeInformationPojoItem) TypeExtractor.createTypeInfo(PojoItem.class);
+   CsvInputFormatPojoItem inputFormat = new 
CsvInputFormatPojoItem(new Path(tempFile.toURI().toString()), typeInfo);
+
+   Configuration parameters = new Configuration();
+   inputFormat.configure(parameters);
+
+   inputFormat.setDelimiter('\n');
+   inputFormat.setFieldDelimiter(',');
+
+   FileInputSplit[] splits = inputFormat.createInputSplits(1);
+
+   inputFormat.open(splits[0]);
+
+   PojoItem item = new PojoItem();
+   inputFormat.nextRecord(item);
+
+   assertEquals(123, item.field1);
+   assertEquals(AAA, item.field2);
+   assertEquals(Double.valueOf(3.123), item.field3);
+   assertEquals(BBB, item.field4);
+
+   inputFormat.nextRecord(item);
+
+   assertEquals(456, item.field1);
+   assertEquals(BBB, item.field2);
+   assertEquals(Double.valueOf(1.123), item.field3);
+   assertEquals(AAA, item.field4);
+   }
+
+   @Test
+   public void testPojoTypeWithPrivateField() throws Exception {
+   File tempFile = File.createTempFile(CsvReaderPojoType, tmp);
+   tempFile.deleteOnExit();
+   tempFile.setWritable(true);
+
+   OutputStreamWriter wrt = new OutputStreamWriter(new 
FileOutputStream(tempFile));
+   wrt.write(123,AAA,3.123,BBB\n);
+   wrt.write(456,BBB,1.123,AAA\n);
+   wrt.close();
+
+   @SuppressWarnings(unchecked)
+   TypeInformationPrivatePojoItem typeInfo = 
(TypeInformationPrivatePojoItem) 
TypeExtractor.createTypeInfo(PrivatePojoItem.class);
+   CsvInputFormatPrivatePojoItem inputFormat = new 
CsvInputFormatPrivatePojoItem(new Path(tempFile.toURI().toString()), 
typeInfo);
+
+   Configuration parameters = new Configuration();
+   inputFormat.configure(parameters);
+
+   inputFormat.setDelimiter('\n');
+   inputFormat.setFieldDelimiter(',');
+
+   FileInputSplit[] splits = inputFormat.createInputSplits(1);
+
+   inputFormat.open(splits[0]);
+
+   PrivatePojoItem item = new PrivatePojoItem();
+   inputFormat.nextRecord(item);
+
+   assertEquals(123, item.field1);
+   assertEquals(AAA, item.field2);
+   assertEquals(Double.valueOf(3.123), item.field3);
+   assertEquals(BBB, item.field4);
+
+   inputFormat.nextRecord(item);
+
+   assertEquals(456, item.field1);
+   assertEquals(BBB, item.field2);
+   assertEquals(Double.valueOf(1.123), item.field3);
+   assertEquals(AAA, item.field4);
+   }
+
+   @Test
+   public void testPojoTypeWithMappingInformation() throws Exception {
+   File tempFile = File.createTempFile(CsvReaderPojoType, tmp);
+   tempFile.deleteOnExit();
+   tempFile.setWritable(true);
+
+   OutputStreamWriter wrt = new OutputStreamWriter(new 
FileOutputStream(tempFile));
+   wrt.write(123,3.123,AAA,BBB\n);
+   wrt.write(456,1.123,BBB,AAA\n);
+   wrt.close();
+
+   @SuppressWarnings(unchecked)
+   TypeInformationPojoItem typeInfo = 
(TypeInformationPojoItem) TypeExtractor.createTypeInfo(PojoItem.class);
+   CsvInputFormatPojoItem inputFormat = new 
CsvInputFormatPojoItem(new Path(tempFile.toURI().toString()), typeInfo);
+   inputFormat.setFieldsMap(new String[]{field1, field3, 
field2, field4});
+
+   Configuration parameters = new Configuration();
+   inputFormat.configure(parameters);
+
+   inputFormat.setDelimiter('\n');
+   inputFormat.setFieldDelimiter(',');
+
  

[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r26203612
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -64,26 +70,45 @@
private transient int commentCount;
 
private transient int invalidLineCount;
+
+   private CompositeTypeOUT typeInformation = null;
+
+   private String[] fieldsMap = null;

+   public CsvInputFormat(Path filePath, TypeInformationOUT 
typeInformation) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
typeInformation);
+   }

-   public CsvInputFormat(Path filePath) {
-   super(filePath);
-   }   
-   
-   public CsvInputFormat(Path filePath, Class? ... types) {
-   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
types);
-   }   
-   
-   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, Class?... types) {
+   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, TypeInformationOUT typeInformation) {
super(filePath);
 
+   Preconditions.checkArgument(typeInformation instanceof 
CompositeType);
+   this.typeInformation = (CompositeTypeOUT) typeInformation;
+
setDelimiter(lineDelimiter);
setFieldDelimiter(fieldDelimiter);
 
-   setFieldTypes(types);
+   Class?[] classes = new Class?[typeInformation.getArity()];
+   for (int i = 0, arity = typeInformation.getArity(); i  arity; 
i++) {
+   classes[i] = 
this.typeInformation.getTypeAt(i).getTypeClass();
+   }
+   setFieldTypes(classes);
+
+   if (typeInformation instanceof PojoTypeInfo) {
+   setFieldsMap(this.typeInformation.getFieldNames());
+   setAccessibleToField();
+   }
}
-   
-   
+
+   public void setAccessibleToField() {
--- End diff --

Can we make this method private?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r26205234
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -152,6 +177,38 @@ public void setFields(boolean[] sourceFieldMask, 
Class?[] fieldTypes) {
public Class?[] getFieldTypes() {
return super.getGenericFieldTypes();
}
+
+   public void setFieldsMap(String[] fieldsMap) {
+   Preconditions.checkNotNull(fieldsMap);
+   Preconditions.checkState(typeInformation instanceof 
PojoTypeInfo);
+
+   PojoTypeInfoOUT pojoTypeInfo = (PojoTypeInfoOUT) 
typeInformation;
+
+   String[] fields = pojoTypeInfo.getFieldNames();
+   Class?[] fieldTypes = getFieldTypes();
+   this.fieldsMap = Arrays.copyOfRange(fieldsMap, 0, 
fieldsMap.length);
+
+   boolean[] includeMask = new boolean[fieldsMap.length];
--- End diff --

The ``includeMask`` refers to the fields in the CsvFile and allows to skip 
fields of the file.
For example if a line in your file looks like:
``Sam,Smith,09-15-1963,123.123``and you only want to read the first name 
and the date field, you would set the ``includeMask`` to ``[true, false, 
true]`` (missing fields are treated as ``false``). 
So the ``includeMask`` should not depend on the ``fieldsMap``, but the 
number of ``true`` entries in the ``includeMask`` must be equal to the number 
for fields in the ``fieldsMap``.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r26206307
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -234,9 +291,29 @@ public OUT readRecord(OUT reuse, byte[] bytes, int 
offset, int numBytes) throws
}

if (parseRecord(parsedValues, bytes, offset, numBytes)) {
-   // valid parse, map values into pact record
-   for (int i = 0; i  parsedValues.length; i++) {
-   reuse.setField(parsedValues[i], i);
+   if (typeInformation instanceof TupleTypeInfoBase) {
+   // result type is tuple
+   Tuple result = (Tuple) reuse;
+   for (int i = 0; i  parsedValues.length; i++) {
+   result.setField(parsedValues[i], i);
+   }
+   } else {
+   // result type is POJO
+   PojoTypeInfoOUT pojoTypeInfo = 
(PojoTypeInfoOUT) typeInformation;
+   for (int i = 0; i  parsedValues.length; i++) {
+   if (fieldsMap[i] == null) {
--- End diff --

May not be null.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r26206381
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -234,9 +291,29 @@ public OUT readRecord(OUT reuse, byte[] bytes, int 
offset, int numBytes) throws
}

if (parseRecord(parsedValues, bytes, offset, numBytes)) {
-   // valid parse, map values into pact record
-   for (int i = 0; i  parsedValues.length; i++) {
-   reuse.setField(parsedValues[i], i);
+   if (typeInformation instanceof TupleTypeInfoBase) {
+   // result type is tuple
+   Tuple result = (Tuple) reuse;
+   for (int i = 0; i  parsedValues.length; i++) {
+   result.setField(parsedValues[i], i);
+   }
+   } else {
+   // result type is POJO
+   PojoTypeInfoOUT pojoTypeInfo = 
(PojoTypeInfoOUT) typeInformation;
+   for (int i = 0; i  parsedValues.length; i++) {
+   if (fieldsMap[i] == null) {
+   continue;
+   }
+
+   try {
+   int fieldIndex = 
typeInformation.getFieldIndex(fieldsMap[i]);
+   
pojoTypeInfo.getPojoFieldAt(fieldIndex).field.set(reuse, parsedValues[i]);
--- End diff --

Same for the ``field``.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r26205969
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -152,6 +177,38 @@ public void setFields(boolean[] sourceFieldMask, 
Class?[] fieldTypes) {
public Class?[] getFieldTypes() {
return super.getGenericFieldTypes();
}
+
+   public void setFieldsMap(String[] fieldsMap) {
+   Preconditions.checkNotNull(fieldsMap);
+   Preconditions.checkState(typeInformation instanceof 
PojoTypeInfo);
+
+   PojoTypeInfoOUT pojoTypeInfo = (PojoTypeInfoOUT) 
typeInformation;
+
+   String[] fields = pojoTypeInfo.getFieldNames();
+   Class?[] fieldTypes = getFieldTypes();
+   this.fieldsMap = Arrays.copyOfRange(fieldsMap, 0, 
fieldsMap.length);
+
+   boolean[] includeMask = new boolean[fieldsMap.length];
+   Class?[] newFieldTypes = new Class?[fieldsMap.length];
+
+   for (int i = 0; i  fieldsMap.length; i++) {
+   if (fieldsMap[i] == null) {
+   includeMask[i] = false;
+   newFieldTypes[i] = null;
+   continue;
+   }
+
+   for (int j = 0; j  fields.length; j++) {
+   if (fields[j].equals(fieldsMap[i])) {
+   includeMask[i] = true;
+   newFieldTypes[i] = fieldTypes[j];
+   break;
+   }
+   }
--- End diff --

Can you throw an exception if the provided field name was not found in the 
POJO type information?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r26206133
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -152,6 +177,38 @@ public void setFields(boolean[] sourceFieldMask, 
Class?[] fieldTypes) {
public Class?[] getFieldTypes() {
return super.getGenericFieldTypes();
}
+
+   public void setFieldsMap(String[] fieldsMap) {
+   Preconditions.checkNotNull(fieldsMap);
+   Preconditions.checkState(typeInformation instanceof 
PojoTypeInfo);
+
+   PojoTypeInfoOUT pojoTypeInfo = (PojoTypeInfoOUT) 
typeInformation;
+
+   String[] fields = pojoTypeInfo.getFieldNames();
+   Class?[] fieldTypes = getFieldTypes();
+   this.fieldsMap = Arrays.copyOfRange(fieldsMap, 0, 
fieldsMap.length);
+
+   boolean[] includeMask = new boolean[fieldsMap.length];
+   Class?[] newFieldTypes = new Class?[fieldsMap.length];
+
+   for (int i = 0; i  fieldsMap.length; i++) {
+   if (fieldsMap[i] == null) {
--- End diff --

IMO, ``null`` values should not be allowed in the ``fieldsMap``. Can you 
throw an exception in that case?
The ``fieldsMap`` should be a list of fields that are mapped to columns in 
the CSV file. As said before, the columns that are read by the format are 
defined by the ``includeMask``.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r26206329
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -234,9 +291,29 @@ public OUT readRecord(OUT reuse, byte[] bytes, int 
offset, int numBytes) throws
}

if (parseRecord(parsedValues, bytes, offset, numBytes)) {
-   // valid parse, map values into pact record
-   for (int i = 0; i  parsedValues.length; i++) {
-   reuse.setField(parsedValues[i], i);
+   if (typeInformation instanceof TupleTypeInfoBase) {
+   // result type is tuple
+   Tuple result = (Tuple) reuse;
+   for (int i = 0; i  parsedValues.length; i++) {
+   result.setField(parsedValues[i], i);
+   }
+   } else {
+   // result type is POJO
+   PojoTypeInfoOUT pojoTypeInfo = 
(PojoTypeInfoOUT) typeInformation;
+   for (int i = 0; i  parsedValues.length; i++) {
+   if (fieldsMap[i] == null) {
+   continue;
+   }
+
+   try {
+   int fieldIndex = 
typeInformation.getFieldIndex(fieldsMap[i]);
--- End diff --

We could compute the index upfront and avoid the String look-up.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-11 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-78401359
  
@fhueske Thanks for your kindly advice. I will fix as soon as possible. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-09 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-77912526
  
@teabot: We've created this JIRA for the feature you've suggested: 
https://issues.apache.org/jira/browse/FLINK-1665


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r25846058
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -64,15 +66,25 @@
private transient int commentCount;
 
private transient int invalidLineCount;
+
+   private PojoTypeInfoOUT pojoTypeInfo = null;


public CsvInputFormat(Path filePath) {
super(filePath);
-   }   
+   }
+
+   public CsvInputFormat(Path filePath, PojoTypeInfoOUT pojoTypeInfo) {
+   super(filePath);
+
+   Preconditions.checkNotNull(pojoTypeInfo, The TypeInformation 
is required for getting the POJO fields.);
+   this.pojoTypeInfo = pojoTypeInfo;
+   setAccessibleToField();
+   }

public CsvInputFormat(Path filePath, Class? ... types) {
--- End diff --

Yes, if we go for the more general TypeInformation we need to check whether 
it is a TupleTypeInfo or PojoTypeInfo and can get rid of one pair of 
constructors. Actually, I think both solutions are fine. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-05 Thread teabot
Github user teabot commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-77347164
  
I like that this request adds what I would consider to be a very useful 
feature. Data is often stored in text delimited formats and one of the nice 
features of Flink is the ability to work with POJOs. Therefore some mechanism 
to map from delimited files to POJOs is useful.

However, I have found that such mappings are rarely restricted to the 
simple cases. Therefore might I suggest that some other factors be considered 
as part of this request so as not to limit the usefulness of and to provide 
some extensibility with this API change:

* The position fields within the delimited file and how they are mapped to 
POJO properties could indeed be modelled with a `String[]` as suggested. 
However, this will be arduous to maintain and prone to human error for types 
with a large number of fields. An alternative could be to have a `@Position` 
annotation on the POJO object fields to indicate the CSV column index.
```
  public class MyPojo {
@Position(0)
public int id;

@Position(1)
public String name;
...
```
* Flink + POJOs bring the benefit of richer types to data processing 
pipelines. But it seems to me that this implementation imposes a restriction on 
the range of types that can be used within the target POJO type negating this 
benefit somewhat. If I want to use a POJO with a `DateTime` field then I must 
still create my own mapping function to do so. Therefore It would be useful to 
provide some hook into the CSV-POJO mapping process to allow the specification 
of user declared/defined type converters. This can then enable users to easily 
map to field types of their choosing. Again, this could be modelled as an 
annotation:
```
  public class MyPojo {
// DateTimeConverter implemented by the user
@Converter(type=DateTimeConverter.class, 
properties=format=/MM/dd:HH;timezone=Europe/London)
public DateTime id;
...
```
You might consider that these additional features are better served by some 
separate type-mapping component or API (and I'd probably agree). But in that 
case is it then wise to also add a simpler, less flexible form to the core 
Flink API?

Thank you for your time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-04 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-77293135
  
@fhueske Oh, you are right. Currently, users cannot decide order of fields. 
I will add a parameter to set order of fields.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-04 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-77272091
  
How are fields in the CSV file mapped to POJO fields? I assume it is the 
order of fields in the POJO type information, right? Is that order the same as 
in the POJO definition or some other such as alphanumeric ordering of the field 
name? This might not be obvious for users.
Would it make sense to add a `String[]` to map POJO fields to field 
positions in the CSV file, i.e., `String[] {name, age, zip}` would map 
the POJO field `name` to the first CSV field, and so on.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-03-04 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r25760453
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -235,8 +252,21 @@ public OUT readRecord(OUT reuse, byte[] bytes, int 
offset, int numBytes) throws

if (parseRecord(parsedValues, bytes, offset, numBytes)) {
// valid parse, map values into pact record
-   for (int i = 0; i  parsedValues.length; i++) {
-   reuse.setField(parsedValues[i], i);
+   if (pojoTypeInfo == null) {
+   Tuple result = (Tuple) reuse;
+   for (int i = 0; i  parsedValues.length; i++) {
+   result.setField(parsedValues[i], i);
+   }
+   } else {
+   for (int i = 0; i  parsedValues.length; i++) {
+   try {
+   
pojoTypeInfo.getPojoFieldAt(i).field.set(reuse, parsedValues[i]);
--- End diff --

@rmetzger Thanks! I modify my implementation to set the fields accessible 
in `CsvInputFormat` and `ScalaCsvInputFormat` and add a test case with private 
fields.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-02-24 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r25237152
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -82,6 +92,13 @@ public CsvInputFormat(Path filePath, String 
lineDelimiter, String fieldDelimiter
 
setFieldTypes(types);
}
+
+   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, PojoTypeInfoOUT pojoTypeInfo) {
+   this(filePath, lineDelimiter, fieldDelimiter);
+
+   Preconditions.checkNotNull(pojoTypeInfo, Type information must 
be required to set values to pojo);
--- End diff --

same here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-02-24 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r25237861
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -235,8 +252,21 @@ public OUT readRecord(OUT reuse, byte[] bytes, int 
offset, int numBytes) throws

if (parseRecord(parsedValues, bytes, offset, numBytes)) {
// valid parse, map values into pact record
-   for (int i = 0; i  parsedValues.length; i++) {
-   reuse.setField(parsedValues[i], i);
+   if (pojoTypeInfo == null) {
+   Tuple result = (Tuple) reuse;
+   for (int i = 0; i  parsedValues.length; i++) {
+   result.setField(parsedValues[i], i);
+   }
+   } else {
+   for (int i = 0; i  parsedValues.length; i++) {
+   try {
+   
pojoTypeInfo.getPojoFieldAt(i).field.set(reuse, parsedValues[i]);
--- End diff --

I'm not 100% sure if this also works for pojo's with private fields.
In your testcase, all the fields are public. I'm not sure where exactly we 
are setting the fields accessible, but it might be the case that they are not 
set accessible here. 
Can you add a private field with a getter/setter to the test pojo to 
validate this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1512] Add CsvReader for reading into PO...

2015-02-19 Thread chiwanpark
GitHub user chiwanpark opened a pull request:

https://github.com/apache/flink/pull/426

[FLINK-1512] Add CsvReader for reading into POJOs.

This PR contains following changes.

* `CsvInputFormat` and `ScalaCsvInputFormat` can receive POJO type as 
generic parameter
* Add `pojoType(ClassT targetType)` into `CsvReader` (Java API)
* Modify `readCsvFile` method in `ExecutionEnvironment` (Scala API)
* Add unit tests for `CsvInputFormat` and `ScalaCsvInputFormat`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chiwanpark/flink FLINK-1512

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/426.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #426


commit 2463d6b7d244528e1625288e1b780335769f14ee
Author: Chiwan Park chiwanp...@icloud.com
Date:   2015-02-18T18:27:59Z

[FLINK-1512] [java api] Add CsvReader for reading into POJOs

commit 8fe5f8d1bd402382e6fa93014c5b2fec8e22cbd0
Author: Chiwan Park chiwanp...@icloud.com
Date:   2015-02-19T17:23:56Z

[FLINK-1512] [scala api] Add CsvReader for reading into POJOs




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---