I'm having trouble dealing with a DataStream of POJOs. In particular,
when I perform SQL operations on it I can't figure out the syntax for
referring to individual fields within the POJO.
Below is an example that illustrates the problem and the various
approaches I've tried. Can anyone please point me in the right
direction?
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
public class PojoTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv);
// Using tuples, this works as expected
List<Tuple2<String, Double>> tupleData = Arrays.asList(
new Tuple2<>("USD", 1.0),
new Tuple2<>("GBP", 1.3),
new Tuple2<>("EUR", 1.11));
DataStreamSource<Tuple2<String, Double>> tupleStream =
streamEnv.fromCollection(tupleData);
tableEnv.fromDataStream(tupleStream, "currency, rate").printSchema();
// Using a DataStream of POJOs, how do I obtain an equivalent table to the
above?
List<FxRate> pojoData = Arrays.asList(
new FxRate("USD", 1.0),
new FxRate("GBP", 1.3),
new FxRate("EUR", 1.11));
DataStreamSource<FxRate> pojoStream = streamEnv.fromCollection(pojoData);
Table pojoTable = tableEnv.fromDataStream(pojoStream, "fx");
pojoTable.printSchema();
// This fails with "ValidationException: Cannot resolve field [currency], input
field list:[fx]"
pojoTable.select("currency, rate").printSchema();
// This fails with "ValidationException: Undefined function: currency"
pojoTable.select("fx.currency AS currency, fx.rate AS rate").printSchema();
// This fails with "ValidationException: Too many fields referenced from an atomic
type"
tableEnv.fromDataStream(pojoStream, "currency, rate").printSchema();
// This fails with "ValidationException: Field reference expression expected"
tableEnv.fromDataStream(pojoStream, "fx.currency, fx.rate").printSchema();
streamEnv.execute();
}
public static class FxRate {
public String currency;
public double rate;
public FxRate(String currency, double rate) {
this.currency = currency;
this.rate = rate;
}
@Override
public String toString() {
return "FxRate{currency='" + currency + '\'' + ", rate=" + rate + '}';
}
}
}