Hi Chris, First thing, FxRate is not POJO, a POJO should have a constructor without arguments. In this way, you can read from a POJO DataStream directly.
Second, if you want get field from POJO, please use get function like: fx.get('currency'), if you have a POJO field, you can use this way to get nested field from POJO. Best, Jingsong Lee On Wed, Dec 4, 2019 at 12:33 AM Chris Miller <chris...@gmail.com> wrote: > I'm having trouble dealing with a DataStream of POJOs. In particular, when > I perform SQL operations on it I can't figure out the syntax for referring > to individual fields within the POJO. > > Below is an example that illustrates the problem and the various > approaches I've tried. Can anyone please point me in the right direction? > > import java.util.Arrays; > import java.util.List; > > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.streaming.api.datastream.DataStreamSource; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.java.StreamTableEnvironment; > > public class PojoTest { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment streamEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(streamEnv); > > // Using tuples, this works as expected > List<Tuple2<String, Double>> tupleData = Arrays.asList( > new Tuple2<>("USD", 1.0), > new Tuple2<>("GBP", 1.3), > new Tuple2<>("EUR", 1.11)); > DataStreamSource<Tuple2<String, Double>> tupleStream = > streamEnv.fromCollection(tupleData); > tableEnv.fromDataStream(tupleStream, "currency, rate").printSchema(); > > // Using a DataStream of POJOs, how do I obtain an equivalent table to > the above? > List<FxRate> pojoData = Arrays.asList( > new FxRate("USD", 1.0), > new FxRate("GBP", 1.3), > new FxRate("EUR", 1.11)); > DataStreamSource<FxRate> pojoStream = streamEnv.fromCollection(pojoData); > > Table pojoTable = tableEnv.fromDataStream(pojoStream, "fx"); > pojoTable.printSchema(); > > // This fails with "ValidationException: Cannot resolve field [currency], > input field list:[fx]" > pojoTable.select("currency, rate").printSchema(); > > // This fails with "ValidationException: Undefined function: currency" > pojoTable.select("fx.currency AS currency, fx.rate AS > rate").printSchema(); > > // This fails with "ValidationException: Too many fields referenced from > an atomic type" > tableEnv.fromDataStream(pojoStream, "currency, rate").printSchema(); > > // This fails with "ValidationException: Field reference expression > expected" > tableEnv.fromDataStream(pojoStream, "fx.currency, fx.rate").printSchema(); > > streamEnv.execute(); > } > > public static class FxRate { > public String currency; > public double rate; > > public FxRate(String currency, double rate) { > this.currency = currency; > this.rate = rate; > } > > @Override > public String toString() { > return "FxRate{currency='" + currency + '\'' + ", rate=" + rate + '}'; > } > } > } > > -- Best, Jingsong Lee