Hi, community!
When dealing with retractable stream, i meet a problem about converting Table
to DataSet / DataStream on batch mode in Flink-1.13.5.
scenario and process:
- 1. Database CDC to Kafka
- 2. Sync data into Hive with HoodieTableFormat(Apache Hudi)
- 3. Incremental processing hoodie table in Streaming mode, or full processing
in Batch mode.
Because it's more difficult to implement UDAF / UDTF on retractable table than
on retractable stream, we choose to convert the Table to DataStream for
processing. But we find that
- 1. Only BatchTableEnvironment can convert Table to DataSet, other
implementations of TableEnvironment will throw Exception as code below.
- 2. BatchTableEnvironment will be dropped in 1.14 because it only support the
old planner.
- 3. TableEnvironment#create API returns the TableEnvironmentImpl instance,
TableEnvironmentImpl works exclusively with Table API and can't convert Table
to DataSet.
org.apache.flink.table.api.bridge.scala.TableConversions.scala
so, how to convert Table to DataStream on batch mode? Thanks for any replies or
suggestions.
in streaming mode,
```
val tenv: StreamTableEnvironment = StreanTableEnvironment.create(senv, setting);
val table = tenv.sqlQuery("select ...")
val dStream: DataStream[Row] = tenv.toChangelogStream(table)
dStream.map(..)...
```
in batch mode,
```
val tenv: TableEnvironment = TableEnvironment.create(setting) // actually,
return TableEnvironmentImpl
val table = tenv.sqlQuery("select ...")
// how to convert Table to DataSet / DataStream
val dStream: DataSet[Row] = ?
```
Best Regards!
fecb192f-3915-4958-8600-7b9c1afebfec.png
Description: Binary data
