Hi,

the translate() method is an internal method. You can use "toRetractStream(table, Row.class)" or "toAppendStream(table, Row.class)" to convert you table into a stream. Make sure to use the correct StreamTableEnvironment for your API: org.apache.flink.table.api.java.StreamTableEnvironment

Regards,
Timo

Am 10/29/17 um 5:53 AM schrieb PaulWu:
Please help how to "translate" table to DataStream in the fellowing code.

StreamTableEnvironment ste =
StreamTableEnvironment.getTableEnvironment(EXE_ENV);
         ste.registerDataStreamInternal("abc", stream);
         Table ts = ste.sql("select * from abc");
         ts = ts.as("count,word");
         System.out.println("ts=" + ts.getSchema());
         ts.printSchema();
         String[] names = new String[]{"count", "word"};
         TypeInformation[] types = new TypeInformation[]{Types.STRING,
Types.STRING};

         RowTypeInfo tpe = Types.ROW(types);
         DataStream<Row> ds = ste.translate(ts, ste.queryConfig(), true,
true, tpe);
         ds.print();

It throws an exception:
Exception in thread "main" scala.MatchError: Row(f0: String, f1: String) (of
class org.apache.flink.api.java.typeutils.RowTypeInfo)
        at
org.apache.flink.table.api.StreamTableEnvironment.getConversionMapperWithChanges(StreamTableEnvironment.scala:293)
        at
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:679)
        at
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:645)
        at
com.att.ariso.ReadFromKafkaGASFPBTable.main(ReadFromKafkaGASFPBTable.java:127)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply via email to