code:
val inpurtDS =
streamTableEnv.toAppendStream[BehaviorInfo](behaviorTable)inpurtDS.print()val
pattern = Pattern.begin[BehaviorInfo]("start")
.where(_.clickCount > 7)val patternStream = CEP.pattern(inpurtDS, pattern)
val result: DataStream[BehaviorInfo] = patternStream.process(
new PatternProcessFunction[BehaviorInfo, BehaviorInfo]() {
override def processMatch(
matchPattern: util.Map[String,
util.List[BehaviorInfo]],
ctx: PatternProcessFunction.Context,
out: Collector[BehaviorInfo]): Unit = {
try {
println(
s"""
|matchPattern: $matchPattern
|util.List[BehaviorInfo]: ${matchPattern.get("start")}
|""".stripMargin)
out.collect(matchPattern.get("start").get(0))
} catch {
case exception: Exception =>
println(exception)
}
}
})
result.print()
??????inpurtDS.print()????????????????????????????pattern??????result.print()????????PatternProcessFunction????processMatch??????????????????????????????????????????????????????????
Thanks a lot!