Hi Stefan,

The problem is that the CsvParser does not know how to parse types other
than the ones that are supported. It would be nice if it supported a custom
parser which is either manually specified or included in the PoJo class
itself.

You can either change your PoJo fields to be of a supported types (like you
already did), or read your data into a Tuple<String, Double, Double,..>
first and convert the Tuples in a Map operation to a Pojo. In the map
operation you can specify your own parsing logic.

Best,
Max

On Thu, Jul 30, 2015 at 11:40 AM, Stefan Winterstein <
stefan.winterst...@dfki.de> wrote:

> Hi,
>
> I'm new to Flink and just taking the first steps...
>
> I want to parse a CSV file that contains a date and time as the first
> field, then some values:
>
> > 07.02.2015    49.9871 234.677 ...
>
> So I’d like to use this POJO:
>
> > import java.util.Date;
> >
> > public class DataPoint
> > {
> >     private String dateStr; // String value of date
> >     private Date date;      // the actual date
> >       ...
> >
> >     private static SimpleDateFormat dateFormat = new
> SimpleDateFormat("dd.MM.yyyy");
> >
> >     public DataPoint() {}
> >
> >     // String setter, converts to Date
> >     public void setDateStr(String value) {
> >         this.dateStr = value;
> >         try {
> >             this.date = dateFormat.parse(dateStr); // parse string and
> store date
> >         } catch (ParseException e) {
> >             e.printStackTrace();
> >         }
> >     }
> >
> >     public String getDateStr() {
> >         return this.dateStr;
> >     }
> >
>
> >     public Date getDate() {
> >         return this.date;
> >     }
> >     …
> > }
>
> ...and pass it to the CSVReader:
>
> > DataSet<DataPoint> csvInput = env.readCsvFile(filename)
> >                                 .pojoType(DataPoint.class, "dateStr",
> ...);
>
> However, this fails with an exception:
>
> > Exception in thread "main" java.lang.IllegalArgumentException: The type
> 'java.util.Date' is not supported for the CSV input format.
> >       at
> org.apache.flink.api.common.io.GenericCsvInputFormat.setFieldTypesGeneric(GenericCsvInputFormat.java:236)
> >       at
> org.apache.flink.api.java.io.CsvInputFormat.setFieldTypes(CsvInputFormat.java:115)
> >       at
> org.apache.flink.api.java.io.CsvInputFormat.<init>(CsvInputFormat.java:77)
> >       at
> org.apache.flink.api.java.io.CsvInputFormat.<init>(CsvInputFormat.java:61)
> >       at
> org.apache.flink.api.java.io.CsvReader.pojoType(CsvReader.java:295)
> >       at de.dfki.iui.MyJob.main(MyJob.java:60)
>
> I managed to work around this by storing the long value of
> Date.getTime() instead of Date, but:
>
> Does the POJO semantic really need to be that strict? Wouldn't it be
> sufficient if there was an appropriate getter/setter for the member
> names given to pojoType()?
>
>
> Best regards,
>
> -Stefan
>

Reply via email to