大家好,
先说本人的理解,keyed(..).flatmap(mapFunc())
其中,每个具体mapFunc处理的数据,应该是相同的key数据。不知理解是否正确。
我的具体情况是
我对数据对校验处理。首先根据设备id (uuid) 分组,然后针对不同分组进行数据校验。
部分代码如下:
rowData.filter(legalData _)
.map(data => BehaviorComVO(getText(data, "id"), getText(data, "uuid"),
getText(data, "session_id"), getText(data, "source"), getText(data,
"product_version")))
* .keyBy(_.uuid)*
* .flatMap(new RepeatIdCheckDispatch())*
.addSink(....)
*RepeatIdCheckDispatch* 细节:
* override def flatMap(in: BehaviorComVO, out: Collector[String]): Unit =
{*
* in match {*
* case BehaviorComVO(_, _, _, "visit", _) =>*
* if (!repeatIdChecker.isOK) out.collect(repeatIdChecker.result)*
* repeatIdChecker = RepeatIdChecker(in)*
* case _: BehaviorComVO => repeatIdChecker.doCheck(in)*
* }*
* }*
"visit" 是一个周期数据的开始。。但是运行之后,我发现,有其他uuid的数据,进入到同一个 *RepeatIdChecker 中*,