import java.util.Collections
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.types.DataTypes
import org.apache.flink.table.sources.csv.CsvTableSource
import org.apache.flink.types.Row
import scala.collection.mutable
object LateralJoinList {
def procLateralJoinPrint(sql: String) = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
// rates_data:
// currency,rate
// US Dollar,102
// Euro,114
// Yen,1
// Euro,116
val tableRateSource = CsvTableSource
.builder
.path("/Users/xxx/Desktop/csv")
.field("currency", DataTypes.STRING)
.field("rate", DataTypes.DOUBLE)
.uniqueKeys(Collections.singleton(Collections.singleton("currency")))
.fieldDelimiter(",")
.ignoreFirstLine
.ignoreParseErrors
.build
// ????????????
val ordersData = new mutable.MutableList[(Int, String)]
ordersData.+=((2, "Euro"))
ordersData.+=((1, "US Dollar"))
ordersData.+=((50, "Yen"))
ordersData.+=((3, "Euro"))
ordersData.+=((3, "Euroxxx")) // not emit
val order_data = env
.fromCollection(ordersData)
.toTable(tEnv, 'amount, 'currency, 'proctime.proctime)
tEnv.registerTableSource("LatestRates", tableRateSource)
tEnv.registerTable("Orders", order_data)
tEnv.sqlQuery(sql).toRetractStream[Row].print()
env.execute()
}
def main(args: Array[String]): Unit = {
val sql1 =
"""
|SELECT
| o.amount, o.currency, r.rate, o.amount * r.rate
|FROM
| Orders AS o
|LEFT OUTER JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
|ON o.currency = r.currency
""".stripMargin
procLateralJoinPrint(sql1)
}
}