Hi,
the translate() method is an internal method. You can use
"toRetractStream(table, Row.class)" or "toAppendStream(table,
Row.class)" to convert you table into a stream. Make sure to use the
correct StreamTableEnvironment for your API:
org.apache.flink.table.api.java.StreamTableEnvironment
Regards,
Timo
Am 10/29/17 um 5:53 AM schrieb PaulWu:
Please help how to "translate" table to DataStream in the fellowing code.
StreamTableEnvironment ste =
StreamTableEnvironment.getTableEnvironment(EXE_ENV);
ste.registerDataStreamInternal("abc", stream);
Table ts = ste.sql("select * from abc");
ts = ts.as("count,word");
System.out.println("ts=" + ts.getSchema());
ts.printSchema();
String[] names = new String[]{"count", "word"};
TypeInformation[] types = new TypeInformation[]{Types.STRING,
Types.STRING};
RowTypeInfo tpe = Types.ROW(types);
DataStream<Row> ds = ste.translate(ts, ste.queryConfig(), true,
true, tpe);
ds.print();
It throws an exception:
Exception in thread "main" scala.MatchError: Row(f0: String, f1: String) (of
class org.apache.flink.api.java.typeutils.RowTypeInfo)
at
org.apache.flink.table.api.StreamTableEnvironment.getConversionMapperWithChanges(StreamTableEnvironment.scala:293)
at
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:679)
at
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:645)
at
com.att.ariso.ReadFromKafkaGASFPBTable.main(ReadFromKafkaGASFPBTable.java:127)
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/