Hi Yik San,

by converting the rows to a Tuple3 you effectively lose the information
about the column names. You could also call `toRetractStream[Row]` which
will give you a `DataStream[Row]` where you keep the column names.

Cheers,
Till

On Tue, Mar 30, 2021 at 3:52 PM Yik San Chan <evan.chanyik...@gmail.com>
wrote:

> The question is cross-posted on Stack Overflow
> https://stackoverflow.com/questions/66872184/flink-table-to-datastream-how-to-access-column-name
> .
>
> I want to consume a Kafka topic into a table using Flink SQL, then convert
> it back to a DataStream.
>
> Here is the `SOURCE_DDL`:
>
> ```
> CREATE TABLE kafka_source (
>     user_id BIGINT,
>     datetime TIMESTAMP(3),
>     last_5_clicks STRING
> ) WITH (
>     'connector' = 'kafka',
>     'topic' = 'aiinfra.fct.userfeature.0',
>     'properties.bootstrap.servers' = 'localhost:9092',
>     'properties.group.id' = 'test-group',
>     'format' = 'json'
> )
> ```
>
> With Flink, I execute the DDL.
>
> ```scala
> val settings = EnvironmentSettings.newInstance.build
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
> tableEnv.executeSql(SOURCE_DDL)
> val table = tableEnv.from("kafka_source")
> ```
>
> Then, I convert it into DataStream, and do downstream logic in the `map(e
> => ...)` part.
>
> ```scala
> tableEnv.toRetractStream[(Long, java.sql.Timestamp, String)](table).map(e
> => ...)
> ```
>
> Inside the `map(e => ...)` part, I would like to access the column name,
> in this case, `last_5_clicks`. Why? Because I may have different sources
> with different columns names (such as `last_10min_page_view`), but I would
> like to reuse the code in `map(e => ...)`.
>
> Is there a way to do this? Thanks.
>
> Best,
> Yik San
>

Reply via email to