I define a `Transaction` class: ```scala case class Transaction(accountId: Long, amount: Long, timestamp: Long) ```
The `TransactionSource` simply emits `Transaction` with some time interval. Now I want to compute the last 2 transaction timestamp of each account id, see code below: ```scala import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _} import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.walkthrough.common.entity.Transaction import org.apache.flink.walkthrough.common.source.TransactionSource object LastNJob { final val QUERY = """ |WITH last_n AS ( | SELECT accountId, `timestamp` | FROM ( | SELECT *, | ROW_NUMBER() OVER (PARTITION BY accountId ORDER BY `timestamp` DESC) AS row_num | FROM transactions | ) | WHERE row_num <= 2 |) |SELECT accountId, LISTAGG(CAST(`timestamp` AS STRING)) last2_timestamp |FROM last_n |GROUP BY accountId |""".stripMargin def main(args: Array[String]): Unit = { val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(streamEnv, settings) val txnStream: DataStream[Transaction] = streamEnv .addSource(new TransactionSource) .name("transactions") tableEnv.createTemporaryView("transactions", txnStream) tableEnv.executeSql(QUERY).print() } } ``` When I run the program, I get: ``` +----+----------------------+--------------------------------+ | op | accountId | last2_timestamp | +----+----------------------+--------------------------------+ | +I | 1 | 1546272000000 | | +I | 2 | 1546272360000 | | +I | 3 | 1546272720000 | | +I | 4 | 1546273080000 | | +I | 5 | 1546273440000 | | -U | 1 | 1546272000000 | | +U | 1 | 1546272000000,1546273800000 | | -U | 2 | 1546272360000 | | +U | 2 | 1546272360000,1546274160000 | | -U | 3 | 1546272720000 | | +U | 3 | 1546272720000,1546274520000 | | -U | 4 | 1546273080000 | | +U | 4 | 1546273080000,1546274880000 | | -U | 5 | 1546273440000 | | +U | 5 | 1546273440000,1546275240000 | | -U | 1 | 1546272000000,1546273800000 | | +U | 1 | 1546273800000 | | -U | 1 | 1546273800000 | | +U | 1 | 1546273800000,1546275600000 | (to continue) ``` Let's focus on the last transaction (from above) of accountId=1. When there is a new transaction from account 1 that happens at timestamp=1546275600000, there are 4 operations in total. ``` +----+----------------------+--------------------------------+ | op | accountId | last2_timestamp | +----+----------------------+--------------------------------+ | -U | 1 | 1546272000000,1546273800000 | | +U | 1 | 1546273800000 | | -U | 1 | 1546273800000 | | +U | 1 | 1546273800000,1546275600000 | ``` While I only want to emit the below "new status" to my downstream (let's say another Kafka topic) via some sort of merging: ``` +----------------------+--------------------------------+ | accountId | last2_timestamp | +----------------------+--------------------------------+ | 1 | 1546273800000,1546275600000 | ``` So that my downstream is able to consume literally "the last 2 transaction timestamps of each account": ``` +----------------------+--------------------------------+ | accountId | last2_timestamp | +----------------------+--------------------------------+ | 1 | 1546272000000 | | 1 | 1546272000000,1546273800000 | | 1 | 1546273800000,1546275600000 | (to continue) ``` What is the right way to do this?