code:
val inpurtDS = streamTableEnv.toAppendStream[BehaviorInfo](behaviorTable)inpurtDS.print()val pattern = Pattern.begin[BehaviorInfo]("start") .where(_.clickCount > 7)val patternStream = CEP.pattern(inpurtDS, pattern) val result: DataStream[BehaviorInfo] = patternStream.process( new PatternProcessFunction[BehaviorInfo, BehaviorInfo]() { override def processMatch( matchPattern: util.Map[String, util.List[BehaviorInfo]], ctx: PatternProcessFunction.Context, out: Collector[BehaviorInfo]): Unit = { try { println( s""" |matchPattern: $matchPattern |util.List[BehaviorInfo]: ${matchPattern.get("start")} |""".stripMargin) out.collect(matchPattern.get("start").get(0)) } catch { case exception: Exception => println(exception) } } }) result.print() ??????inpurtDS.print()????????????????????????????pattern??????result.print()????????PatternProcessFunction????processMatch?????????????????????????????????????????????????????????? Thanks a lot!