Hi Yik San, by converting the rows to a Tuple3 you effectively lose the information about the column names. You could also call `toRetractStream[Row]` which will give you a `DataStream[Row]` where you keep the column names.
Cheers, Till On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan <evan.chanyik...@gmail.com> wrote: > The question is cross-posted on Stack Overflow > https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name > . > > I want to consume a Kafka topic into a table using Flink SQL, then convert > it back to a DataStream. > > Here is the `SOURCE_DDL`: > > ``` > CREATE TABLE kafka_source ( > user_id BIGINT, > datetime TIMESTAMP(3), > last_5_clicks STRING > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'aiinfra.fct.userfeature.0', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'test-group', > 'format' = 'json' > ) > ``` > > With Flink, I execute the DDL. > > ```scala > val settings = EnvironmentSettings.newInstance.build > val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment > val tableEnv = StreamTableEnvironment.create(streamEnv, settings) > tableEnv.executeSql(SOURCE_DDL) > val table = tableEnv.from("kafka_source") > ``` > > Then, I convert it into DataStream, and do downstream logic in the `map(e > => ...)` part. > > ```scala > tableEnv.toRetractStream[(Long, java.sql.Timestamp, String)](table).map(e > => ...) > ``` > > Inside the `map(e => ...)` part, I would like to access the column name, > in this case, `last_5_clicks`. Why? Because I may have different sources > with different columns names (such as `last_10min_page_view`), but I would > like to reuse the code in `map(e => ...)`. > > Is there a way to do this? Thanks. > > Best, > Yik San >