I'm using Flink 1.4.0
I'm trying to save the results of a Table API query to a CSV file, but I'm
getting an error.
Here are the details:
My Input file looks like this:
id,species,color,weight,name
311,canine,golden,75,dog1
312,canine,brown,22,dog2
313,feline,gray,8,cat1
I run a query on this to select canines only, and I want to save this to a
csv file:
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);
String inputPath = "location-of-source-file";
CsvTableSource petsTableSource =
CsvTableSource.builder()
.path(inputPath)
.ignoreFirstLine()
.fieldDelimiter(",")
.field("id", Types.INT())
.field("species", Types.STRING())
.field("color", Types.STRING())
.field("weight", Types.DOUBLE())
.field("name", Types.STRING())
.build();
// Register our table source
tableEnv.registerTableSource("pets", petsTableSource);
Table pets = tableEnv.scan("pets");
Table counts = pets
.groupBy("species")
.select("species, species.count as count")
.filter("species === 'canine'");
DataSet<Row> result = tableEnv.toDataSet(counts,
Row.class);
result.print();
// Write Results to File
TableSink<Row> sink = new
CsvTableSink("/home/hadoop/output/pets", ",");
counts.writeToSink(sink);
When I run this, I get the output from the result.print() call as this:
canine,2
but I do not see any results written
to the file, and I see the error below.
How can I save the results I'm seeing in stdout to a CSV file?
Thanks!
2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor
(TypeExtractor.java:1873) - class org.apache.flink.types.Row does not
contain a getter for field fields
2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor
(TypeExtractor.java:1876) - class org.apache.flink.types.Row does not
contain a setter for field fields
2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor
(TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a valid
POJO type because not all fields are valid POJO fields.
2018-05-27 12:49:44,411 INFO [main] typeutils.TypeExtractor
(TypeExtractor.java:1873) - class org.apache.flink.types.Row does not
contain a getter for field fields
2018-05-27 12:49:44,412 INFO [main] typeutils.TypeExtractor
(TypeExtractor.java:1876) - class org.apache.flink.types.Row does not
contain a setter for field fields
2018-05-27 12:49:44,412 INFO [main] typeutils.TypeExtractor
(TypeExtractor.java:1911) - class org.apache.flink.types.Row is not a valid
POJO type because not all fields are valid POJO fields.
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/