??????????Blinkplanner??????????????????????????????oldplanner??????????????????1.10
package test.table.sql
import java.util.Properties
import com.souhu.msns.huyou.PublicParams
import com.souhu.msns.huyou.utils.KafkaPbSchema
import org.apache.flink.api.common.time.Time
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
import
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.api.windowing.time.{Time => WindowTime}
import org.apache.flink.types.Row
object test {
def main(args: Array[String]): Unit = {
//----------------------------????????????------------------------------------------------
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
bsEnv.setNumberOfExecutionRetries(1)
bsEnv.setParallelism(1)
//bsEnv.getConfig.setAutoWatermarkInterval(10000)
bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
bsEnv.setStateBackend(new
FsStateBackend("hdfs://dc1:8020/user/msns/streaming/checkpoint/flink/Circ",
true))
bsEnv.getCheckpointConfig.setCheckpointInterval(300000)
bsEnv.getCheckpointConfig.setMinPauseBetweenCheckpoints(60000)
bsEnv.setParallelism(3)
bsEnv.setNumberOfExecutionRetries(1)
//----------------------------????TABLE????------------------------------------------------
val setting =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bstEnv = StreamTableEnvironment.create(bsEnv,setting)
val tConfig = bstEnv.getConfig
tConfig.setIdleStateRetentionTime(Time.minutes(10),Time.minutes(20))
val config = bstEnv.getConfig.getConfiguration()
config.setString("table.exec.mini-batch.enabled", "true") //
local-global aggregation depends on mini-batch is enabled
config.setString("table.exec.mini-batch.allow-latency", "5 s")
config.setString("table.exec.mini-batch.size", "5000")
config.setString("table.optimizer.agg-phase-strategy",
"TWO_PHASE") // enable two-phase, i.e. local-global aggregation
config.setString("table.optimizer.distinct-agg.split.enabled",
"true")
//bstEnv.getConfig.setLocalTimeZone(ZoneId.of("Etc/GMT+8"))
//----------------------------??????????????------------------------------------------------
val kafkaProps = new Properties()
kafkaProps.setProperty("bootstrap.servers", PublicParams.brokers)
val source = ....
.toTable(bstEnv,'userId,'createTime.rowtime,'action,'circleName,'flowName,'ts,'content,'feedid,'postfeedid,'sessionId)
bstEnv.createTemporaryView("source",source)
val q1=bstEnv.sqlQuery(
"""select sessionId from source
|where sessionId is not null
|and action='P_TIMELINE'""".stripMargin)
q1.toAppendStream[Row].print("source")
bstEnv.createTemporaryView("sourcefeed",q1)
val q2=bstEnv.sqlQuery(
"""select sessionId from source
|where sessionId is not null
|and action='V_TIMELINE_FEED'""".stripMargin)
bstEnv.createTemporaryView("postfeed",q2)
bstEnv.sqlQuery(
"""
|select count(b.sessionId) from
|sourcefeed a
|join postfeed b
|on a.sessionId=b.sessionId
""".stripMargin).toRetractStream[Row].print("")
bstEnv.execute("")
}
}
------------------ ???????? ------------------
??????: "Leonard Xu"<[email protected]>;
????????: 2020??6??11??(??????) ????2:40
??????: "user-zh"<[email protected]>;
????: Re: BLinkPlanner sql join????????
Hi??
????????????????????????flink??????????case, ????????????????????
Best??
Leonard Xu
> ?? 2020??6??11????14:30??op <[email protected]> ??????
>
> ????????????????????????
>
??????????????????oldPlanner????IdleStateRetentionTime??join??????????????????????????????????blinkplanner????????????????????????????????????????????bug????