Hi Chris,

First thing, FxRate is not POJO, a POJO should have a constructor without
arguments. In this way, you can read from a POJO DataStream directly.

Second, if you want get field from POJO, please use get function
like: fx.get('currency'), if you have a POJO field, you can use this way to
get nested field from POJO.

Best,
Jingsong Lee


On Wed, Dec 4, 2019 at 12:33 AM Chris Miller <chris...@gmail.com> wrote:

> I'm having trouble dealing with a DataStream of POJOs. In particular, when
> I perform SQL operations on it I can't figure out the syntax for referring
> to individual fields within the POJO.
>
> Below is an example that illustrates the problem and the various
> approaches I've tried. Can anyone please point me in the right direction?
>
> import java.util.Arrays;
> import java.util.List;
>
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
>
> public class PojoTest {
>   public static void main(String[] args) throws Exception {
>     StreamExecutionEnvironment streamEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(streamEnv);
>
>     // Using tuples, this works as expected
>     List<Tuple2<String, Double>> tupleData = Arrays.asList(
>         new Tuple2<>("USD", 1.0),
>         new Tuple2<>("GBP", 1.3),
>         new Tuple2<>("EUR", 1.11));
>     DataStreamSource<Tuple2<String, Double>> tupleStream = 
> streamEnv.fromCollection(tupleData);
>     tableEnv.fromDataStream(tupleStream, "currency, rate").printSchema();
>
>     // Using a DataStream of POJOs, how do I obtain an equivalent table to 
> the above?
>     List<FxRate> pojoData = Arrays.asList(
>         new FxRate("USD", 1.0),
>         new FxRate("GBP", 1.3),
>         new FxRate("EUR", 1.11));
>     DataStreamSource<FxRate> pojoStream = streamEnv.fromCollection(pojoData);
>
>     Table pojoTable = tableEnv.fromDataStream(pojoStream, "fx");
>     pojoTable.printSchema();
>
>     // This fails with "ValidationException: Cannot resolve field [currency], 
> input field list:[fx]"
>     pojoTable.select("currency, rate").printSchema();
>
>     // This fails with "ValidationException: Undefined function: currency"
>     pojoTable.select("fx.currency AS currency, fx.rate AS 
> rate").printSchema();
>
>     // This fails with "ValidationException: Too many fields referenced from 
> an atomic type"
>     tableEnv.fromDataStream(pojoStream, "currency, rate").printSchema();
>
>     // This fails with "ValidationException: Field reference expression 
> expected"
>     tableEnv.fromDataStream(pojoStream, "fx.currency, fx.rate").printSchema();
>
>     streamEnv.execute();
>   }
>
>   public static class FxRate {
>     public String currency;
>     public double rate;
>
>     public FxRate(String currency, double rate) {
>       this.currency = currency;
>       this.rate = rate;
>     }
>
>     @Override
>     public String toString() {
>       return "FxRate{currency='" + currency + '\'' + ", rate=" + rate + '}';
>     }
>   }
> }
>
>

-- 
Best, Jingsong Lee

Reply via email to