Hi folks,
I have a question Flink SQL. What I want to do is this:
* Join a simple lookup table (a few rows) to a stream of data to enrich the
stream by adding a column from the lookup table.
For example, a simple lookup table:
CREATE TABLE LookupTable (
`computeClass` STRING,
`multiplier` FLOAT
) WITH (
'connector' = 'filesystem',
'path' = 'fpu-multipliers.csv',
'format' = 'csv'
)
And I’ve got a Kafka connector table with rowtime semantics that has a
`computeClass` field. I simply want to join (in a streaming fashion) the
`multiplier` field above.
SELECT
`timestamp`,
// ...
ks.computeClass,
lt.`multiplier`
FROM KafkaStream ks
JOIN LookupTable lt ON ks.computeClass = lt.computeClass
Doing a simple join like that gives me this error:
“org.apache.flink.table.api.TableException: Rowtime attributes must not be in
the input rows of a regular join. As a workaround you can cast the time
attributes of input tables to TIMESTAMP before.”
Which leads me to believe that I should use an Interval Join instead, but that
doesn’t seem to be appropriate since my table is static and has no concept of
time. Basically, I want to hold the entire lookup table in memory, and simply
enrich the Kafka stream (which need not be held in memory).
Any ideas on how to accomplish what I’m trying to do?
Thanks!
Kelly