Hi Stefan, The problem is that the CsvParser does not know how to parse types other than the ones that are supported. It would be nice if it supported a custom parser which is either manually specified or included in the PoJo class itself.
You can either change your PoJo fields to be of a supported types (like you already did), or read your data into a Tuple<String, Double, Double,..> first and convert the Tuples in a Map operation to a Pojo. In the map operation you can specify your own parsing logic. Best, Max On Thu, Jul 30, 2015 at 11:40 AM, Stefan Winterstein < stefan.winterst...@dfki.de> wrote: > Hi, > > I'm new to Flink and just taking the first steps... > > I want to parse a CSV file that contains a date and time as the first > field, then some values: > > > 07.02.2015 49.9871 234.677 ... > > So I’d like to use this POJO: > > > import java.util.Date; > > > > public class DataPoint > > { > > private String dateStr; // String value of date > > private Date date; // the actual date > > ... > > > > private static SimpleDateFormat dateFormat = new > SimpleDateFormat("dd.MM.yyyy"); > > > > public DataPoint() {} > > > > // String setter, converts to Date > > public void setDateStr(String value) { > > this.dateStr = value; > > try { > > this.date = dateFormat.parse(dateStr); // parse string and > store date > > } catch (ParseException e) { > > e.printStackTrace(); > > } > > } > > > > public String getDateStr() { > > return this.dateStr; > > } > > > > > public Date getDate() { > > return this.date; > > } > > … > > } > > ...and pass it to the CSVReader: > > > DataSet<DataPoint> csvInput = env.readCsvFile(filename) > > .pojoType(DataPoint.class, "dateStr", > ...); > > However, this fails with an exception: > > > Exception in thread "main" java.lang.IllegalArgumentException: The type > 'java.util.Date' is not supported for the CSV input format. > > at > org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldTypesGeneric(GenericCsvInputFormat.java:236) > > at > org.apache.flink.api.java.io.CsvInputFormat.setFieldTypes(CsvInputFormat.java:115) > > at > org.apache.flink.api.java.io.CsvInputFormat.<init>(CsvInputFormat.java:77) > > at > org.apache.flink.api.java.io.CsvInputFormat.<init>(CsvInputFormat.java:61) > > at > org.apache.flink.api.java.io.CsvReader.pojoType(CsvReader.java:295) > > at de.dfki.iui.MyJob.main(MyJob.java:60) > > I managed to work around this by storing the long value of > Date.getTime() instead of Date, but: > > Does the POJO semantic really need to be that strict? Wouldn't it be > sufficient if there was an appropriate getter/setter for the member > names given to pojoType()? > > > Best regards, > > -Stefan >